import os import re import openai import logging import json import shutil import argparse import numpy as np from moviepy.editor import VideoFileClip from PIL import Image from datetime import datetime, timedelta from math import ceil from dotenv import load_dotenv from pydub import AudioSegment from tqdm import tqdm from google.oauth2 import service_account from googleapiclient.discovery import build from googleapiclient.http import MediaFileUpload from googleapiclient.http import MediaIoBaseDownload import shutil parser = argparse.ArgumentParser(description='Process KT videos in a given folder to generate transcripts and summaries of what was discussed.') parser.add_argument( 'input_folder', nargs='?', # Optional default='.', # Use the current working directory if no folder is specified help='The folder containing videos to process relative to the current working directory.' ) parser.add_argument( '--topic', nargs='?', # Optional default=False, help='If set to True, will generate topic-specific summaries in addition to the high-level summary.' ) parser.add_argument( '--transcribe', nargs='?', # Optional default=True, help='If set to False, will skip transcribing and leverage an existing _full_transcript.txt file to generate outputs.' ) args = parser.parse_args() log_file_path = os.path.join(os.path.abspath(args.input_folder), "processing.log") logging.basicConfig( level=logging.INFO, handlers=[ logging.StreamHandler(), logging.FileHandler(log_file_path, mode='a') ], format='%(asctime)s - %(levelname)s - %(funcName)s - %(message)s', datefmt='%Y-%m-%d %H:%M:%S' ) class KnowledgeTranscriber(object): MAX_SIZE = 5000000 # 5 MB MAX_SIZE_MB = MAX_SIZE / (1024 * 1024) # Convert bytes to MB BITRATE = 128000 # 128 kbps def __init__(self, api_key): self.client = openai.OpenAI(api_key=api_key) script_dir = os.path.dirname(os.path.abspath(__file__)) # Absolute directory of the script prompts = { "summary_prompt": os.path.join(script_dir, "prompts", "1-summary_prompt.txt"), "topic_prompt": os.path.join(script_dir, "prompts", "2-topic_prompt.txt"), "troubleshooting_prompt": os.path.join(script_dir, "prompts", "3-troubleshooting_prompt.txt"), "glossary_prompt": os.path.join(script_dir, "prompts", "4-glossary_prompt.txt"), "tags_prompt": os.path.join(script_dir, "prompts", "5-tags_prompt.txt"), "article_prompt": os.path.join(script_dir, "prompts", "6-article_prompt.txt") } def process_folder(self, folder_path, transcribe_flag, drive_folder_link): """ Takes a folder path and processes all videos or transcripts in the folder. First downloads all videos from the specified Google Drive folder to the local folder path. :param folder_path: The path to the folder containing videos or transcripts to process. :param transcribe_flag: Flag to indicate if transcription is needed. :param drive_folder_id: The ID of the Google Drive folder containing the videos. """ ###Added from here Initialize Google Drive service try: drive_service = self.get_drive_service() except Exception as e: logging.error(f"Failed to initialize Google Drive service: {e}") raise ValueError("Failed to initialize Google Drive service") # Extract the folder ID from the URL. folder_id = self.extract_drive_folder_id(drive_folder_link) # Check if the folder is publicly accessible and has write access if not self.check_folder_accessibility(drive_service, folder_id): raise ValueError("The Google Drive folder is not publicly accessible or does not have write access.") # List all files in the Google Drive folder drive_files = self.list_files_in_folder(drive_service, folder_id) logging.info(f"Downloading files from Google Drive folder: {folder_id}") # Download each file to the local folder_path for file in tqdm(drive_files, desc="Downloading Files"): file_name = file['name'] file_id = file['id'] local_file_path = os.path.join(folder_path, file_name) if not os.path.exists(local_file_path): # Avoid re-downloading files logging.info(f"Downloading file: {file_name}") self.download_file(drive_service, file_id, local_file_path) else: logging.info(f"File already exists: {file_name}") ###End of Added ### All below under this line is same with the original script. Until Next Added from here logging.info(f"Processing files in folder: {folder_path}") for filename in tqdm(os.listdir(folder_path), desc="Processing Files"): if transcribe_flag == "False": if filename.endswith("_full_transcript.txt"): # Processing for transcript files generated by Zoom/Loom/etc. already. logging.info(f"Processing transcript: {filename}") base_name = filename.replace("_full_transcript.txt", "") new_folder_path = os.path.join(folder_path, base_name) logging.info(f"New folder path: {new_folder_path}") # Folder handling outside main video processing functions. if not os.path.exists(new_folder_path): os.makedirs(new_folder_path) original_path = os.path.join(folder_path, filename) new_path = os.path.join(new_folder_path, filename) logging.info(f"Moving file from {original_path} to {new_path}") shutil.move(original_path, new_path) # Generate our ouputs self.generate_transcript_outputs(new_path) self.process_articles(new_folder_path) logging.info(f"Processing complete for: {filename}.") else: if filename.endswith(".mp4"): # Process for video files logging.info(f"Processing video: {filename}") video_path = os.path.join(folder_path, filename) self.process_video(video_path, folder_path) logging.info(f"Processing complete for: {filename}.") ###Added from here # After processing files logging.info(f"Processing complete for all files in folder: {folder_path}. Uploading processed files to Google Drive.") # Iterate over files in the input folder and upload each to Google Drive self.sync_folder_to_drive(drive_service, folder_path, folder_id, is_root=True) logging.info(f"Uploading processed files to Google Drive complete for all files in folder: {folder_path}. Success.") # Calling the cleanup function input_folder_path = os.path.abspath(folder_path) self.cleanup_input_folder(input_folder_path) ###End of Added def check_and_process(self, file_path, process_func, file_description): """ Validates if a file exists and processes it if it doesn't. :param file_path: The path to the file to check. :param process_func: The function to call to process the file if it doesn't exist. :param file_description: A description of the file to use in logging. """ if not os.path.exists(file_path): logging.info(f"Processing {file_description}: {file_path}") process_func() else: logging.info(f"{file_description} already exists.") def process_video(self, input_video, folder_path): """ Takes a video path, processes the video into a transcript, and a collection of knowledge outputs. :param input_video: The path to the video to process. :param folder_path: The path to the folder containing the video to process. """ base_name = os.path.splitext(os.path.basename(input_video))[0] output_folder = os.path.join(folder_path, f"{base_name}_output") processed_folder = os.path.join(folder_path, "Processed") os.makedirs(output_folder, exist_ok=True) os.makedirs(processed_folder, exist_ok=True) output_audio = os.path.join(output_folder, f"{base_name}.mp3") processed_audio = os.path.join(processed_folder, f"{base_name}.mp3") processed_video = os.path.join(processed_folder, f"{base_name}.mp4") transcript_file = os.path.join(output_folder, f"{base_name}_full_transcript.txt") # Checks to avoid re-processing to save time and calls to GPT. self.check_and_process( output_audio, lambda: self.video_to_audio(input_video, output_audio), "Audio file" ) self.check_and_process( transcript_file, lambda: self.transcribe_and_combine_audio(output_audio), "Transcript file" ) transcript_outputs_exist = os.path.exists(os.path.join(output_folder, f"{base_name}_summary.txt")) or \ os.path.exists(os.path.join(output_folder, f"{base_name}_troubleshooting_steps.txt")) or \ os.path.exists(os.path.join(output_folder, f"{base_name}_glossary.txt")) or \ os.path.exists(os.path.join(output_folder, f"{base_name}_tags_and_symptoms.txt")) if not transcript_outputs_exist: logging.info(f"Generating transcript outputs for: {transcript_file}") self.generate_transcript_outputs(transcript_file) else: logging.info("Transcript-related outputs already exist.") # Handling the screenshot capture and processing logging.info(f"Checking summary file for timestamps: {transcript_file}") summary_file = os.path.join(output_folder, f"{base_name}_full_transcript_summary.txt") troubleshooting_file = os.path.join(output_folder, f"{base_name}_full_transcript_troubleshooting_steps.txt") timestamp_list = self.find_timestamps(summary_file) + self.find_timestamps(troubleshooting_file) if timestamp_list: logging.info(f"Timestamps found in summary file: {summary_file}") screenshot_folder = os.path.join(output_folder, "Screenshots") os.makedirs(screenshot_folder, exist_ok=True) #self.parse_and_extract_frames(input_video, screenshot_folder, timestamp_list) else: logging.info(f"No timestamps found in summary file: {summary_file}") self.check_and_process( processed_audio, lambda: shutil.move(output_audio, processed_audio), "Processed audio" ) self.check_and_process( processed_video, lambda: shutil.move(input_video, processed_video), "Processed video" ) # Generate final articles from summary and troubleshooting steps. self.process_articles(output_folder) logging.info(f"Files saved to: {output_folder}") logging.info(f"Processing complete for: {input_video}.") def transcribe_and_combine_audio(self, audio_file_path): """ Takes an audio file path, splits the audio into parts if needed, transcribes the audio parts, and combines the transcriptions into a single transcript file. :param audio_file_path: The path to the audio file to process. :return: The path to the transcript file. """ base_file_path = os.path.splitext(audio_file_path)[0] transcript_file_path = f"{base_file_path}_full_transcript.txt" manifest_file_path = f"{base_file_path}_manifest.txt" # Load or initialize the manifest for keeping track of processed parts if os.path.exists(manifest_file_path): with open(manifest_file_path, "r", encoding="utf-8") as manifest_file: processed_parts = set(manifest_file.read().splitlines()) else: processed_parts = set() # Transcribe each part of the audio file, as needed parts_to_transcribe = sorted(self.get_or_create_audio_parts(audio_file_path)) for part in parts_to_transcribe: part_transcript_file = f"{part}_transcript.txt" if part in processed_parts: logging.info(f"Transcription part already exists: {part_transcript_file}") else: logging.info(f"Transcribing audio part: {part}") transcription = self.transcribe_audio_part(part) with open(part_transcript_file, "w", encoding="utf-8") as part_file: part_file.write(transcription) processed_parts.add(part) logging.info(f"Transcription complete for: {part}") with open(manifest_file_path, "w", encoding="utf-8") as manifest_file: manifest_file.write("\n".join(sorted(processed_parts))) # Check if the part is not the main audio file before removing if part != audio_file_path: os.remove(part) logging.info(f"Removed audio part: {part}") # Once all parts have been transcribed, combine them into the full transcript file with open(transcript_file_path, "w", encoding="utf-8") as transcript_file: for part in parts_to_transcribe: logging.info(f"Combining transcript part: {part}") part_transcript_file = f"{part}_transcript.txt" with open(part_transcript_file, "r", encoding="utf-8") as part_file: transcript_file.write(part_file.read() + "\n") os.remove(part_transcript_file) logging.info(f"Removed transcript part: {part_transcript_file}") # Now, we need to take the transcript file and adjust the timestamps to account for the audio parts with open(transcript_file_path, 'r', encoding="utf-8") as file: full_transcript = file.read() adjusted_content = self.adjust_timestamps(full_transcript) with open(transcript_file_path, 'w', encoding="utf-8") as file: file.write(adjusted_content) logging.info(f"Transcript saved to: {transcript_file_path}") os.remove(manifest_file_path) return transcript_file_path #def parse_time(self, time_str): # """Convert a timestamp string to seconds.""" # h, m, s = map(float, time_str.split(':')) # return h * 3600 + m * 60 + s def parse_time(self, time_str): """Convert a timestamp string to seconds, supporting both colon and hyphen separators.""" if '-' in time_str: h, m, s = map(float, time_str.split('-')) else: h, m, s = map(float, time_str.split(':')) return h * 3600 + m * 60 + s #def format_time(self, seconds): # """Convert seconds back to a timestamp string.""" # h = int(seconds // 3600) # m = int((seconds % 3600) // 60) # s = seconds % 60 # return f"{h:02}:{m:02}:{s:06.3f}" def format_time(self, seconds): """Convert seconds back to a timestamp string, using hyphens instead of colons.""" h = int(seconds // 3600) m = int((seconds % 3600) // 60) s = seconds % 60 return f"{h:02}-{m:02}-{s:06.3f}" def adjust_timestamps(self, vtt_content): """ Takes a VTT content string and adjusts the timestamps to account for the audio parts. :param vtt_content: The VTT content to process. :return: The adjusted VTT content. """ sections = vtt_content.split("WEBVTT") adjusted_sections = [] time_offset = 0 for section in sections[1:]: # Skip the first section as it's likely the header lines = section.strip().split("\n") adjusted_lines = [] for line in lines: if '-->' in line: start, end = line.split(' --> ') start_sec = self.parse_time(start) + time_offset end_sec = self.parse_time(end) + time_offset adjusted_line = f"{self.format_time(start_sec)} --> {self.format_time(end_sec)}" adjusted_lines.append(adjusted_line) else: adjusted_lines.append(line) # Update the time offset using the last timestamp of the current section if adjusted_lines: last_time = adjusted_lines[-2] # The second last line contains the last timestamp time_split = last_time.split(' --> ') if len(time_split) == 2: _, end = time_split time_offset = self.parse_time(end) adjusted_sections.append('\n'.join(adjusted_lines)) return "WEBVTT\n\n".join(adjusted_sections) def extract_frames_by_range(self, video_path, target_folder, start_time, end_time, fps=1): """ Takes a video path, a start time, and an end time, and extracts frames from the video between the given timestamps. :param video_path: The path to the video to process. :param target_folder: The path to the folder to save the extracted frames to. :param start_time: The start time to extract frames from in HH:MM:SS.mmm format. :param end_time: The end time to extract frames to in HH:MM:SS.mmm format. :param fps: The frames per second to extract from the video. """ # Convert start_time and end_time from HH:MM:SS.mmm to seconds start_seconds = sum(x * float(t) for x, t in zip([3600, 60, 1, 0.001], start_time.split(":"))) end_seconds = sum(x * float(t) for x, t in zip([3600, 60, 1, 0.001], end_time.split(":"))) # Create the target folder if it doesn't exist if not os.path.exists(target_folder): os.makedirs(target_folder) with VideoFileClip(video_path) as video: # Calculate the interval between frames based on the desired fps interval = 1 / fps # Adjust the loop to iterate over the desired timestamp range current_time = start_seconds while current_time < end_seconds: frame = video.get_frame(current_time) timestamp = self.format_time(int(current_time)) frame_path = os.path.join(target_folder, f"{timestamp}.png") Image.fromarray(np.uint8(frame)).save(frame_path) current_time += interval def find_timestamps(self, file_path): """ Takes a file path and finds the timestamps within the file. Searches for timestamps in the format "at 00:00:20.360" and "[00:00:28.559]". :param file_path: The path to the file to process. :return: A list of timestamps. """ # Updated pattern to match both "at 00:00:20.360" and "[00:00:28.559]" timestamp_pattern = r'at (\d{2}:\d{2}:\d{2}\.\d{3})|\[(\d{2}:\d{2}:\d{2}\.\d{3})\]' timestamps = [] with open(file_path, 'r', encoding='utf-8') as file: content = file.read() # Find all matches and process them to flatten the list and remove None raw_matches = re.findall(timestamp_pattern, content) for match in raw_matches: # match is a tuple where one group is the timestamp and the other is empty timestamp = match[0] if match[0] else match[1] timestamps.append(timestamp) return timestamps def parse_and_extract_frames(self, video_path, target_path, timestamps): """ Takes a video path and a list of timestamps, and extracts frames from the video around the given timestamps. :param video_path: The path to the video to process. :param target_path: The path to the folder to save the extracted frames to. :param timestamps: A list of timestamps to extract frames around. """ # Function to adjust the timestamp by a given number of seconds def adjust_timestamp(timestamp, seconds): timestamp_dt = datetime.strptime(timestamp, "%H:%M:%S.%f") adjusted_timestamp = timestamp_dt + timedelta(seconds=seconds) return adjusted_timestamp.strftime("%H:%M:%S.%f")[:-3] for timestamp in timestamps: start_timestamp = adjust_timestamp(timestamp, -5) end_timestamp = adjust_timestamp(timestamp, 5) self.extract_frames_by_range(video_path, target_path, start_timestamp, end_timestamp) def get_or_create_audio_parts(self, audio_file_path): """ Takes an audio file path and splits the audio into parts if needed. :param audio_file_path: The path to the audio file to process. :return: A list of paths to the audio parts. """ # Check if the audio needs to be split by checking its file size - this is approximate, but close enough for gov work file_size_mb = os.path.getsize(audio_file_path) / (1024 * 1024) parts_directory = os.path.join(os.path.dirname(audio_file_path), "parts") os.makedirs(parts_directory, exist_ok=True) # If the audio file has already been split, return the existing parts - else, split the audio file existing_parts = [os.path.join(parts_directory, f) for f in os.listdir(parts_directory) if os.path.isfile(os.path.join(parts_directory, f))] if existing_parts: logging.info("Found existing audio parts. Resuming transcription.") return existing_parts logging.info(f"Audio file size: {file_size_mb} MB") if file_size_mb > self.MAX_SIZE_MB: logging.info(f"Audio file size exceeds maximum size of {self.MAX_SIZE_MB} MB. Splitting audio file into parts.") return self.split_audio_file_by_size(audio_file_path) else: logging.info(f"Audio file size is within maximum size of {self.MAX_SIZE_MB} MB. No need to split the audio file.") return [audio_file_path] def split_audio_file_by_size(self, audio_file_path): """ Takes an audio file path and splits the audio into parts based on the maximum size. :param audio_file_path: The path to the audio file to process. :return: A list of paths to the audio parts. """ logging.info(f"Splitting audio file: {audio_file_path}") audio = AudioSegment.from_file(audio_file_path) max_chunk_duration_ms = ((self.MAX_SIZE * 8) / self.BITRATE) * 1000 logging.info(f"Max chunk duration: {max_chunk_duration_ms} ms") num_chunks = ceil(len(audio) / max_chunk_duration_ms) logging.info(f"Number of chunks: {num_chunks}") chunk_length = len(audio) // num_chunks chunks = [audio[i * chunk_length: (i + 1) * chunk_length] for i in range(num_chunks)] chunk_files = [] for i, chunk in enumerate(chunks): chunk_name = f"{os.path.splitext(audio_file_path)[0]}_part{i}.mp3" logging.info(f"Exporting audio chunk: {chunk_name}") chunk.export(chunk_name, format="mp3") chunk_files.append(chunk_name) logging.info(f"Audio file split into {len(chunk_files)} parts.") return chunk_files def video_to_audio(self, input_video, output_audio): """ Takes a video file path and strips out the audio to save as an MP3 file. :param input_video: The path to the video file to process. :param output_audio: The path to the audio file to save the converted audio to. """ if not os.path.exists(output_audio): video = AudioSegment.from_file(input_video, "mp4") video.export(output_audio, format="mp3", bitrate="128k") logging.info(f"Audio file exported: {output_audio}") else: logging.info("Audio file already exists") def transcribe_audio_part(self, part): """ Takes an audio file part path and transcribes the audio into text via whisper LLM. :param part: The path to the audio file to process. :return: The transcribed text. """ try: logging.info(f"Transcribing audio part: {part}") with open(part, "rb") as audio_file: transcript = self.client.audio.transcriptions.create( model="whisper-1", file=audio_file, response_format="vtt" # This prompt can be used to help the LLM understand the context of the audio and certain terms of art that may be used. #,prompt="UCaaS, CPaaS, STaaS, DRaaS, BLF, CDR, CIM, GCCH, GVBM, HEPIC, SBC, PSTN, SMB, OrecX, Prov" ) return transcript except Exception as e: logging.error(f"Failed to transcribe audio part {part}: {e}") raise ValueError("Failed to transcribe audio part. Check your OpenAI Key") def audio_to_transcript(self, input_audio): """ Takes an audio file path and transcribes the audio into text via whisper LLM. :param input_audio: The path to the audio file to process. :return: The path to the transcript file. """ logging.info(f"Transcribing audio: {input_audio}") with open(input_audio, "rb") as audio_file: transcript = self.client.audio.transcriptions.create( model="whisper-1", file=audio_file, response_format="vtt" # This prompt can be used to help the LLM understand the context of the audio and certain terms of art that may be used. #,prompt="UCaaS, CPaaS, STaaS, DRaaS, BLF, CDR, CIM, GCCH, GVBM, HEPIC, SBC, PSTN, SMB, OrecX, Prov" ) logging.info("Transcript created") base_name = os.path.splitext(input_audio)[0] output_file = f"{base_name}_transcript.txt" with open(output_file, "w", encoding="utf-8") as f: json.dump(transcript, f, indent=4) logging.info(f"Transcript saved to: {output_file}") return output_file def process_and_save_output(self, base_name, prompt_key, transcript_content, additional_content=None, file_suffix=""): """ Takes a transcript and generates the output for a given prompt. :param base_name: The base name of the transcript file. :param prompt_key: The key of the prompt to use. :param transcript_content: The content of the transcript to process. :param additional_content: Additional content to use in the prompt. :param file_suffix: The suffix to use for the output file. :return: The path to the output file. """ file_name = f"{base_name}_{file_suffix}.txt" # Check if the file already exists if os.path.exists(file_name): logging.info(f"{file_suffix.replace('_', ' ').capitalize()} file already exists: {file_name}") return file_name # Load and process the prompt prompt = self.load_prompt(prompt_key) if additional_content: conversation_history = self.build_conversation_history(self.load_prompt("summary_prompt"), transcript_content, additional_content['summary'], additional_content['topic']) else: conversation_history = self.build_conversation_history(prompt, transcript_content) response = self.send_conversation(conversation_history) content = response.choices[0].message.content clean_lines = [line.strip() for line in content.split('\n') if line.strip() != ''] clean_content = '\n\n'.join(clean_lines) # Write the processed content to the file with open(file_name, "w", encoding="utf-8") as f: f.write(clean_content) logging.info(f"{file_suffix.replace('_', ' ').capitalize()} saved to: {file_name}") return file_name def generate_transcript_outputs(self, transcript_file): """ Takes a transcript file and generates the summary outputs. :param transcript_file: The path to the transcript file to process. """ with open(transcript_file, "r", encoding="utf-8") as file: transcript_content = file.read() base_name = os.path.splitext(transcript_file)[0] # Generate the summary self.process_and_save_output(base_name, "summary_prompt", transcript_content, file_suffix="summary") with open(f"{base_name}_summary.txt", "r", encoding="utf-8") as file: summary_file = file.read() # Generate topic specific summaries topic_prompts = self.generate_topic_prompts(summary_file) # if script run with --topic, generate topic specific summaries. # if extract_topics: #Topic extraction assumed to be true for i, topic_prompt in enumerate(topic_prompts): additional_content = {"summary": summary_file, "topic": topic_prompt} self.process_and_save_output(base_name, "summary_prompt", transcript_content, additional_content, file_suffix=f"topic{i}_summary") # Generate the troubleshooting steps self.process_and_save_output(base_name, "troubleshooting_prompt", transcript_content, file_suffix="troubleshooting_steps") # Generate the glossary self.process_and_save_output(base_name, "glossary_prompt", transcript_content, file_suffix="glossary") # Generate the tags and symptoms self.process_and_save_output(base_name, "tags_prompt", transcript_content, file_suffix="tags_and_symptoms") logging.info(f"Transcript outputs saved to: {os.path.splitext(transcript_file)[0]}") def extract_topics(self, response_text): """ Takes a response text and extracts the topics from it. :param response_text: The response text to process. :return: A list of topics. """ # Regular expression to match the pattern "Topic X: Title" pattern = r"Topic \d+: .+" topics = re.findall(pattern, response_text) return topics # OpenAI Functions def generate_topic_prompts(self, response_text): """ Takes a response text and generates the topic prompts. :param response_text: The response text to process. :return: A list of topic prompts. """ topics = self.extract_topics(response_text) base_prompt = self.load_prompt("topic_prompt") topic_prompts = [] for topic in topics: modified_prompt = base_prompt.replace("[REPLACE_ME]", topic) topic_prompts.append(modified_prompt) return topic_prompts def load_prompt(self,prompt_key): """ Takes a prompt key and loads the prompt from the prompts folder. :param prompt_key: The key of the prompt to load. :return: The prompt content. """ prompt_path = self.prompts[prompt_key] print(f"Loading prompt from: {prompt_path}") # Debugging line with open(self.prompts[prompt_key], 'r', encoding='utf-8') as file: return file.read() def send_conversation(self, conversation_history): """ Takes a conversation history and sends it to the OpenAI API to generate a response. :param conversation_history: The conversation history to send. :return: The response from the LLM """ response = openai.chat.completions.create( model="gpt-4-1106-preview", #model="gpt-3.5-turbo-1106", messages=conversation_history, max_tokens=4096, temperature=0.00, ) return response def build_conversation_history(self, system_prompt, user_prompt1, assistant_response=None, user_prompt2=None): """ Takes a system prompt, user prompt, and optional assistant response and user prompt and builds a conversation history. :param system_prompt: The system prompt to use. :param user_prompt1: The first user prompt to use. :param assistant_response: The assistant response to use. :param user_prompt2: The second user prompt to use. :return: The conversation history. """ conversation_history = [ {"role": "system", "content": system_prompt}, {"role": "user", "content": user_prompt1} ] # Check if both or none of the optional parameters are provided if (assistant_response is not None and user_prompt2 is not None) or (assistant_response is None and user_prompt2 is None): # Append the optional prompts if both are provided if assistant_response is not None: conversation_history.append({"role": "assistant", "content": assistant_response}) conversation_history.append({"role": "user", "content": user_prompt2}) else: raise ValueError("Both 'assistant_response' and 'user_prompt2' must be provided together or not at all.") return conversation_history def generate_article(self, input_file): """ Takes an input file path and generates a article from it. :param input_file_path: The path to the input file to process. :return: The article. """ article_prompt = self.load_prompt("article_prompt") with open(input_file, "r", encoding="utf-8") as file: file_content = file.read() article_convo = self.build_conversation_history(article_prompt, file_content) response = self.send_conversation(article_convo) content = response.choices[0].message.content clean_lines = [line.strip() for line in content.split('\n') if line.strip() != ''] clean_content = '\n\n'.join(clean_lines) return clean_content def process_articles(self, input_path): """ Takes a path to a folder containing input files and generates articles from them. :param input_path: The path to the folder containing input files to process. """ logging.info(f"Processing article inputs in folder: {input_path}") for filename in tqdm(os.listdir(input_path), desc="Processing Files"): if filename.endswith("_summary.txt") or filename.endswith("_troubleshooting_steps.txt"): logging.info(f"Processing article input: {filename}") input_file = os.path.join(input_path, filename) article = self.generate_article(input_file) output_file = os.path.join(input_path, f"{os.path.splitext(filename)[0]}_article.txt") with open(output_file, "w", encoding="utf-8") as f: f.write(article) logging.info(f"Article saved to: {output_file}") # Everything below is added to adjust existing script to run self service. # Until load_dotenv() def get_drive_service(self): SCOPES = ['https://www.googleapis.com/auth/drive'] credentials_json = os.getenv('GOOGLE_APPLICATION_CREDENTIALS_JSON') if credentials_json is None: raise ValueError("Environment variable 'GOOGLE_APPLICATION_CREDENTIALS_JSON' is not set") credentials_info = json.loads(credentials_json) credentials = service_account.Credentials.from_service_account_info(credentials_info, scopes=SCOPES) return build('drive', 'v3', credentials=credentials) def extract_drive_folder_id(self, drive_link): # This can be expanded to handle various Google Drive link formats match = re.search(r'folders/([^/?]+)', drive_link) if match: return match.group(1) else: raise ValueError("Invalid Google Drive folder link.") def list_files_in_folder(self, service, folder_id): try: results = service.files().list( q=f"'{folder_id}' in parents and trashed=false", pageSize=100, fields="nextPageToken, files(id, name)").execute() return results.get('files', []) except Exception as e: logging.error(f"Failed to list files in Google Drive folder {folder_id}: {e}") raise ValueError("Failed to list files in Google Drive folder. Check folder permissions") def download_file(self, service, file_id, file_path): try: # Ensure the directory where the file will be saved exists os.makedirs(os.path.dirname(file_path), exist_ok=True) # Check if the file is an mp4 file before downloading if not file_path.endswith('.mp4'): logging.info(f"Skipping non-mp4 file: {file_path}") return request = service.files().get_media(fileId=file_id) with open(file_path, 'wb') as fh: downloader = MediaIoBaseDownload(fh, request) done = False while done is False: status, done = downloader.next_chunk() except Exception as e: logging.error(f"Failed to download file {file_id} to {file_path}: {e}") raise ValueError("Failed to download file") def find_or_create_drive_folder(self, service, folder_name, parent_folder_id): try: # Check if folder exists query = f"mimeType='application/vnd.google-apps.folder' and name='{folder_name}' and '{parent_folder_id}' in parents and trashed=false" response = service.files().list(q=query, spaces='drive', fields='files(id, name)').execute() files = response.get('files', []) if files: # Folder exists, return its ID return files[0]['id'] else: # Folder doesn't exist, create it folder_metadata = { 'name': folder_name, 'mimeType': 'application/vnd.google-apps.folder', 'parents': [parent_folder_id] } folder = service.files().create(body=folder_metadata, fields='id').execute() return folder.get('id') except Exception as e: logging.error(f"Failed to find or create Google Drive folder '{folder_name}': {e}") raise ValueError("Failed to find or create Google Drive folder. Check permissions") def upload_file(self, service, file_path, drive_folder_id): try: file_metadata = {'name': os.path.basename(file_path), 'parents': [drive_folder_id]} media = MediaFileUpload(file_path, resumable=True) file = service.files().create(body=file_metadata, media_body=media, fields='id').execute() logging.info(f"Uploaded {file_path} to Google Drive with ID {file.get('id')}") except Exception as e: logging.error(f"Failed to upload file {file_path} to Google Drive: {e}") raise ValueError("Failed to upload file to Google Drive. Check folder permissions") def sync_folder_to_drive(self, service, local_folder_path, drive_parent_folder_id, is_root=True): """ Synchronize a local folder structure and its files with Google Drive. :param service: Authenticated Google Drive service instance. :param local_folder_path: Path to the local folder to sync. :param drive_parent_folder_id: The Google Drive folder ID to sync with. :param is_root: Boolean indicating if the current folder is the root of the sync operation. """ # If it's the root directory, upload files directly in it, then handle directories if is_root: for item_name in os.listdir(local_folder_path): item_path = os.path.join(local_folder_path, item_name) if os.path.isfile(item_path): # Uploads 'processing.log' and any other files directly under the root self.upload_file(service, item_path, drive_parent_folder_id) # Process directories and their contents for item_name in os.listdir(local_folder_path): item_path = os.path.join(local_folder_path, item_name) if os.path.isdir(item_path): # It's a directory, find or create a corresponding folder on Drive drive_folder_id = self.find_or_create_drive_folder(service, item_name, drive_parent_folder_id) # Recursively sync the subfolder self.sync_folder_to_drive(service, item_path, drive_folder_id, is_root=False) elif os.path.isfile(item_path) and not is_root: # For files in subdirectories, upload them to their respective folder on Google Drive self.upload_file(service, item_path, drive_parent_folder_id) def cleanup_input_folder(self, folder_path): """ Deletes all files and folders under the specified folder_path. :param folder_path: Path to the folder to clean up. """ try: # Safety check to prevent accidental deletion of unintended directories if "Input-Folder" in folder_path: # List all items in the folder for item_name in os.listdir(folder_path): item_path = os.path.join(folder_path, item_name) try: # Check if it's a file and delete it if os.path.isfile(item_path) or os.path.islink(item_path): os.unlink(item_path) # Else, it's a directory, delete the directory tree elif os.path.isdir(item_path): shutil.rmtree(item_path) logging.info(f"Deleted {item_path}") except Exception as e: logging.error(f"Failed to delete {item_path}. Reason: {e}") else: logging.error("Safety check failed. The folder path does not seem to be correct.") except Exception as e: logging.error(f"Failed to clean up input folder {folder_path}: {e}") raise ValueError("Failed to clean up input folder") def check_folder_accessibility(self, service, folder_id): """ Checks if the specified Google Drive folder is publicly accessible and has write access. :param service: The Google Drive service instance. :param folder_id: The ID of the Google Drive folder to check. :return: True if the folder is publicly accessible and has write access, False otherwise. """ try: permissions = service.permissions().list(fileId=folder_id).execute() for permission in permissions.get('permissions', []): # Check if the permission type is anyone (public) and the role includes writer or owner if permission.get('type') == 'anyone' and permission.get('role') in ['writer', 'owner']: return True return False except Exception as e: logging.error(f"Failed to check folder accessibility for folder {folder_id}: {e}") return False # Above is newly added codes # Load environment variables and API key via .env file load_dotenv() api_key = os.getenv("OPENAI_API_KEY") # Example usage if __name__ == "__main__": input_folder_path = os.path.abspath(args.input_folder) transcribe = args.transcribe extract_topics = args.topic processor = KnowledgeTranscriber(api_key) processor.process_folder(input_folder_path, transcribe)