Spaces:
Sleeping
Sleeping
import os | |
import re | |
import openai | |
import logging | |
import json | |
import shutil | |
import argparse | |
import numpy as np | |
from moviepy.editor import VideoFileClip | |
from PIL import Image | |
from datetime import datetime, timedelta | |
from math import ceil | |
from dotenv import load_dotenv | |
from pydub import AudioSegment | |
from tqdm import tqdm | |
from google.oauth2 import service_account | |
from googleapiclient.discovery import build | |
from googleapiclient.http import MediaFileUpload | |
from googleapiclient.http import MediaIoBaseDownload | |
import shutil | |
parser = argparse.ArgumentParser(description='Process KT videos in a given folder to generate transcripts and summaries of what was discussed.') | |
parser.add_argument( | |
'input_folder', | |
nargs='?', # Optional | |
default='.', # Use the current working directory if no folder is specified | |
help='The folder containing videos to process relative to the current working directory.' | |
) | |
parser.add_argument( | |
'--topic', | |
nargs='?', # Optional | |
default=False, | |
help='If set to True, will generate topic-specific summaries in addition to the high-level summary.' | |
) | |
parser.add_argument( | |
'--transcribe', | |
nargs='?', # Optional | |
default=True, | |
help='If set to False, will skip transcribing and leverage an existing _full_transcript.txt file to generate outputs.' | |
) | |
args = parser.parse_args() | |
log_file_path = os.path.join(os.path.abspath(args.input_folder), "processing.log") | |
logging.basicConfig( | |
level=logging.INFO, | |
handlers=[ | |
logging.StreamHandler(), | |
logging.FileHandler(log_file_path, mode='a') | |
], | |
format='%(asctime)s - %(levelname)s - %(funcName)s - %(message)s', | |
datefmt='%Y-%m-%d %H:%M:%S' | |
) | |
class KnowledgeTranscriber(object): | |
MAX_SIZE = 5000000 # 5 MB | |
MAX_SIZE_MB = MAX_SIZE / (1024 * 1024) # Convert bytes to MB | |
BITRATE = 128000 # 128 kbps | |
def __init__(self, api_key): | |
self.client = openai.OpenAI(api_key=api_key) | |
script_dir = os.path.dirname(os.path.abspath(__file__)) # Absolute directory of the script | |
prompts = { | |
"summary_prompt": os.path.join(script_dir, "prompts", "1-summary_prompt.txt"), | |
"topic_prompt": os.path.join(script_dir, "prompts", "2-topic_prompt.txt"), | |
"troubleshooting_prompt": os.path.join(script_dir, "prompts", "3-troubleshooting_prompt.txt"), | |
"glossary_prompt": os.path.join(script_dir, "prompts", "4-glossary_prompt.txt"), | |
"tags_prompt": os.path.join(script_dir, "prompts", "5-tags_prompt.txt"), | |
"article_prompt": os.path.join(script_dir, "prompts", "6-article_prompt.txt") | |
} | |
def process_folder(self, folder_path, transcribe_flag, drive_folder_link): | |
""" | |
Takes a folder path and processes all videos or transcripts in the folder. | |
First downloads all videos from the specified Google Drive folder to the local folder path. | |
:param folder_path: The path to the folder containing videos or transcripts to process. | |
:param transcribe_flag: Flag to indicate if transcription is needed. | |
:param drive_folder_id: The ID of the Google Drive folder containing the videos. | |
""" | |
###Added from here Initialize Google Drive service | |
try: | |
drive_service = self.get_drive_service() | |
except Exception as e: | |
logging.error(f"Failed to initialize Google Drive service: {e}") | |
raise ValueError("Failed to initialize Google Drive service") | |
# Extract the folder ID from the URL. | |
folder_id = self.extract_drive_folder_id(drive_folder_link) | |
# Check if the folder is publicly accessible and has write access | |
if not self.check_folder_accessibility(drive_service, folder_id): | |
raise ValueError("The Google Drive folder is not publicly accessible or does not have write access.") | |
# List all files in the Google Drive folder | |
drive_files = self.list_files_in_folder(drive_service, folder_id) | |
logging.info(f"Downloading files from Google Drive folder: {folder_id}") | |
# Download each file to the local folder_path | |
for file in tqdm(drive_files, desc="Downloading Files"): | |
file_name = file['name'] | |
file_id = file['id'] | |
local_file_path = os.path.join(folder_path, file_name) | |
if not os.path.exists(local_file_path): # Avoid re-downloading files | |
logging.info(f"Downloading file: {file_name}") | |
self.download_file(drive_service, file_id, local_file_path) | |
else: | |
logging.info(f"File already exists: {file_name}") | |
###End of Added | |
### All below under this line is same with the original script. Until Next Added from here | |
logging.info(f"Processing files in folder: {folder_path}") | |
for filename in tqdm(os.listdir(folder_path), desc="Processing Files"): | |
if transcribe_flag == "False": | |
if filename.endswith("_full_transcript.txt"): | |
# Processing for transcript files generated by Zoom/Loom/etc. already. | |
logging.info(f"Processing transcript: {filename}") | |
base_name = filename.replace("_full_transcript.txt", "") | |
new_folder_path = os.path.join(folder_path, base_name) | |
logging.info(f"New folder path: {new_folder_path}") | |
# Folder handling outside main video processing functions. | |
if not os.path.exists(new_folder_path): | |
os.makedirs(new_folder_path) | |
original_path = os.path.join(folder_path, filename) | |
new_path = os.path.join(new_folder_path, filename) | |
logging.info(f"Moving file from {original_path} to {new_path}") | |
shutil.move(original_path, new_path) | |
# Generate our ouputs | |
self.generate_transcript_outputs(new_path) | |
self.process_articles(new_folder_path) | |
logging.info(f"Processing complete for: {filename}.") | |
else: | |
if filename.endswith(".mp4"): | |
# Process for video files | |
logging.info(f"Processing video: {filename}") | |
video_path = os.path.join(folder_path, filename) | |
self.process_video(video_path, folder_path) | |
logging.info(f"Processing complete for: {filename}.") | |
###Added from here | |
# After processing files | |
logging.info(f"Processing complete for all files in folder: {folder_path}. Uploading processed files to Google Drive.") | |
# Iterate over files in the input folder and upload each to Google Drive | |
self.sync_folder_to_drive(drive_service, folder_path, folder_id, is_root=True) | |
logging.info(f"Uploading processed files to Google Drive complete for all files in folder: {folder_path}. Success.") | |
# Calling the cleanup function | |
input_folder_path = os.path.abspath(folder_path) | |
self.cleanup_input_folder(input_folder_path) | |
###End of Added | |
def check_and_process(self, file_path, process_func, file_description): | |
""" | |
Validates if a file exists and processes it if it doesn't. | |
:param file_path: The path to the file to check. | |
:param process_func: The function to call to process the file if it doesn't exist. | |
:param file_description: A description of the file to use in logging. | |
""" | |
if not os.path.exists(file_path): | |
logging.info(f"Processing {file_description}: {file_path}") | |
process_func() | |
else: | |
logging.info(f"{file_description} already exists.") | |
def process_video(self, input_video, folder_path): | |
""" | |
Takes a video path, processes the video into a transcript, and a collection of knowledge outputs. | |
:param input_video: The path to the video to process. | |
:param folder_path: The path to the folder containing the video to process. | |
""" | |
base_name = os.path.splitext(os.path.basename(input_video))[0] | |
output_folder = os.path.join(folder_path, f"{base_name}_output") | |
processed_folder = os.path.join(folder_path, "Processed") | |
os.makedirs(output_folder, exist_ok=True) | |
os.makedirs(processed_folder, exist_ok=True) | |
output_audio = os.path.join(output_folder, f"{base_name}.mp3") | |
processed_audio = os.path.join(processed_folder, f"{base_name}.mp3") | |
processed_video = os.path.join(processed_folder, f"{base_name}.mp4") | |
transcript_file = os.path.join(output_folder, f"{base_name}_full_transcript.txt") | |
# Checks to avoid re-processing to save time and calls to GPT. | |
self.check_and_process( | |
output_audio, | |
lambda: self.video_to_audio(input_video, output_audio), | |
"Audio file" | |
) | |
self.check_and_process( | |
transcript_file, | |
lambda: self.transcribe_and_combine_audio(output_audio), | |
"Transcript file" | |
) | |
transcript_outputs_exist = os.path.exists(os.path.join(output_folder, f"{base_name}_summary.txt")) or \ | |
os.path.exists(os.path.join(output_folder, f"{base_name}_troubleshooting_steps.txt")) or \ | |
os.path.exists(os.path.join(output_folder, f"{base_name}_glossary.txt")) or \ | |
os.path.exists(os.path.join(output_folder, f"{base_name}_tags_and_symptoms.txt")) | |
if not transcript_outputs_exist: | |
logging.info(f"Generating transcript outputs for: {transcript_file}") | |
self.generate_transcript_outputs(transcript_file) | |
else: | |
logging.info("Transcript-related outputs already exist.") | |
# Handling the screenshot capture and processing | |
logging.info(f"Checking summary file for timestamps: {transcript_file}") | |
summary_file = os.path.join(output_folder, f"{base_name}_full_transcript_summary.txt") | |
troubleshooting_file = os.path.join(output_folder, f"{base_name}_full_transcript_troubleshooting_steps.txt") | |
timestamp_list = self.find_timestamps(summary_file) + self.find_timestamps(troubleshooting_file) | |
if timestamp_list: | |
logging.info(f"Timestamps found in summary file: {summary_file}") | |
screenshot_folder = os.path.join(output_folder, "Screenshots") | |
os.makedirs(screenshot_folder, exist_ok=True) | |
#self.parse_and_extract_frames(input_video, screenshot_folder, timestamp_list) | |
else: | |
logging.info(f"No timestamps found in summary file: {summary_file}") | |
self.check_and_process( | |
processed_audio, | |
lambda: shutil.move(output_audio, processed_audio), | |
"Processed audio" | |
) | |
self.check_and_process( | |
processed_video, | |
lambda: shutil.move(input_video, processed_video), | |
"Processed video" | |
) | |
# Generate final articles from summary and troubleshooting steps. | |
self.process_articles(output_folder) | |
logging.info(f"Files saved to: {output_folder}") | |
logging.info(f"Processing complete for: {input_video}.") | |
def transcribe_and_combine_audio(self, audio_file_path): | |
""" | |
Takes an audio file path, splits the audio into parts if needed, transcribes the audio parts, and combines the transcriptions into a single transcript file. | |
:param audio_file_path: The path to the audio file to process. | |
:return: The path to the transcript file. | |
""" | |
base_file_path = os.path.splitext(audio_file_path)[0] | |
transcript_file_path = f"{base_file_path}_full_transcript.txt" | |
manifest_file_path = f"{base_file_path}_manifest.txt" | |
# Load or initialize the manifest for keeping track of processed parts | |
if os.path.exists(manifest_file_path): | |
with open(manifest_file_path, "r", encoding="utf-8") as manifest_file: | |
processed_parts = set(manifest_file.read().splitlines()) | |
else: | |
processed_parts = set() | |
# Transcribe each part of the audio file, as needed | |
parts_to_transcribe = sorted(self.get_or_create_audio_parts(audio_file_path)) | |
for part in parts_to_transcribe: | |
part_transcript_file = f"{part}_transcript.txt" | |
if part in processed_parts: | |
logging.info(f"Transcription part already exists: {part_transcript_file}") | |
else: | |
logging.info(f"Transcribing audio part: {part}") | |
transcription = self.transcribe_audio_part(part) | |
with open(part_transcript_file, "w", encoding="utf-8") as part_file: | |
part_file.write(transcription) | |
processed_parts.add(part) | |
logging.info(f"Transcription complete for: {part}") | |
with open(manifest_file_path, "w", encoding="utf-8") as manifest_file: | |
manifest_file.write("\n".join(sorted(processed_parts))) | |
# Check if the part is not the main audio file before removing | |
if part != audio_file_path: | |
os.remove(part) | |
logging.info(f"Removed audio part: {part}") | |
# Once all parts have been transcribed, combine them into the full transcript file | |
with open(transcript_file_path, "w", encoding="utf-8") as transcript_file: | |
for part in parts_to_transcribe: | |
logging.info(f"Combining transcript part: {part}") | |
part_transcript_file = f"{part}_transcript.txt" | |
with open(part_transcript_file, "r", encoding="utf-8") as part_file: | |
transcript_file.write(part_file.read() + "\n") | |
os.remove(part_transcript_file) | |
logging.info(f"Removed transcript part: {part_transcript_file}") | |
# Now, we need to take the transcript file and adjust the timestamps to account for the audio parts | |
with open(transcript_file_path, 'r', encoding="utf-8") as file: | |
full_transcript = file.read() | |
adjusted_content = self.adjust_timestamps(full_transcript) | |
with open(transcript_file_path, 'w', encoding="utf-8") as file: | |
file.write(adjusted_content) | |
logging.info(f"Transcript saved to: {transcript_file_path}") | |
os.remove(manifest_file_path) | |
return transcript_file_path | |
#def parse_time(self, time_str): | |
# """Convert a timestamp string to seconds.""" | |
# h, m, s = map(float, time_str.split(':')) | |
# return h * 3600 + m * 60 + s | |
def parse_time(self, time_str): | |
"""Convert a timestamp string to seconds, supporting both colon and hyphen separators.""" | |
if '-' in time_str: | |
h, m, s = map(float, time_str.split('-')) | |
else: | |
h, m, s = map(float, time_str.split(':')) | |
return h * 3600 + m * 60 + s | |
#def format_time(self, seconds): | |
# """Convert seconds back to a timestamp string.""" | |
# h = int(seconds // 3600) | |
# m = int((seconds % 3600) // 60) | |
# s = seconds % 60 | |
# return f"{h:02}:{m:02}:{s:06.3f}" | |
def format_time(self, seconds): | |
"""Convert seconds back to a timestamp string, using hyphens instead of colons.""" | |
h = int(seconds // 3600) | |
m = int((seconds % 3600) // 60) | |
s = seconds % 60 | |
return f"{h:02}-{m:02}-{s:06.3f}" | |
def adjust_timestamps(self, vtt_content): | |
""" | |
Takes a VTT content string and adjusts the timestamps to account for the audio parts. | |
:param vtt_content: The VTT content to process. | |
:return: The adjusted VTT content. | |
""" | |
sections = vtt_content.split("WEBVTT") | |
adjusted_sections = [] | |
time_offset = 0 | |
for section in sections[1:]: # Skip the first section as it's likely the header | |
lines = section.strip().split("\n") | |
adjusted_lines = [] | |
for line in lines: | |
if '-->' in line: | |
start, end = line.split(' --> ') | |
start_sec = self.parse_time(start) + time_offset | |
end_sec = self.parse_time(end) + time_offset | |
adjusted_line = f"{self.format_time(start_sec)} --> {self.format_time(end_sec)}" | |
adjusted_lines.append(adjusted_line) | |
else: | |
adjusted_lines.append(line) | |
# Update the time offset using the last timestamp of the current section | |
if adjusted_lines: | |
last_time = adjusted_lines[-2] # The second last line contains the last timestamp | |
time_split = last_time.split(' --> ') | |
if len(time_split) == 2: | |
_, end = time_split | |
time_offset = self.parse_time(end) | |
adjusted_sections.append('\n'.join(adjusted_lines)) | |
return "WEBVTT\n\n".join(adjusted_sections) | |
def extract_frames_by_range(self, video_path, target_folder, start_time, end_time, fps=1): | |
""" | |
Takes a video path, a start time, and an end time, and extracts frames from the video between the given timestamps. | |
:param video_path: The path to the video to process. | |
:param target_folder: The path to the folder to save the extracted frames to. | |
:param start_time: The start time to extract frames from in HH:MM:SS.mmm format. | |
:param end_time: The end time to extract frames to in HH:MM:SS.mmm format. | |
:param fps: The frames per second to extract from the video. | |
""" | |
# Convert start_time and end_time from HH:MM:SS.mmm to seconds | |
start_seconds = sum(x * float(t) for x, t in zip([3600, 60, 1, 0.001], start_time.split(":"))) | |
end_seconds = sum(x * float(t) for x, t in zip([3600, 60, 1, 0.001], end_time.split(":"))) | |
# Create the target folder if it doesn't exist | |
if not os.path.exists(target_folder): | |
os.makedirs(target_folder) | |
with VideoFileClip(video_path) as video: | |
# Calculate the interval between frames based on the desired fps | |
interval = 1 / fps | |
# Adjust the loop to iterate over the desired timestamp range | |
current_time = start_seconds | |
while current_time < end_seconds: | |
frame = video.get_frame(current_time) | |
timestamp = self.format_time(int(current_time)) | |
frame_path = os.path.join(target_folder, f"{timestamp}.png") | |
Image.fromarray(np.uint8(frame)).save(frame_path) | |
current_time += interval | |
def find_timestamps(self, file_path): | |
""" | |
Takes a file path and finds the timestamps within the file. | |
Searches for timestamps in the format "at 00:00:20.360" and "[00:00:28.559]". | |
:param file_path: The path to the file to process. | |
:return: A list of timestamps. | |
""" | |
# Updated pattern to match both "at 00:00:20.360" and "[00:00:28.559]" | |
timestamp_pattern = r'at (\d{2}:\d{2}:\d{2}\.\d{3})|\[(\d{2}:\d{2}:\d{2}\.\d{3})\]' | |
timestamps = [] | |
with open(file_path, 'r', encoding='utf-8') as file: | |
content = file.read() | |
# Find all matches and process them to flatten the list and remove None | |
raw_matches = re.findall(timestamp_pattern, content) | |
for match in raw_matches: | |
# match is a tuple where one group is the timestamp and the other is empty | |
timestamp = match[0] if match[0] else match[1] | |
timestamps.append(timestamp) | |
return timestamps | |
def parse_and_extract_frames(self, video_path, target_path, timestamps): | |
""" | |
Takes a video path and a list of timestamps, and extracts frames from the video around the given timestamps. | |
:param video_path: The path to the video to process. | |
:param target_path: The path to the folder to save the extracted frames to. | |
:param timestamps: A list of timestamps to extract frames around. | |
""" | |
# Function to adjust the timestamp by a given number of seconds | |
def adjust_timestamp(timestamp, seconds): | |
timestamp_dt = datetime.strptime(timestamp, "%H:%M:%S.%f") | |
adjusted_timestamp = timestamp_dt + timedelta(seconds=seconds) | |
return adjusted_timestamp.strftime("%H:%M:%S.%f")[:-3] | |
for timestamp in timestamps: | |
start_timestamp = adjust_timestamp(timestamp, -5) | |
end_timestamp = adjust_timestamp(timestamp, 5) | |
self.extract_frames_by_range(video_path, target_path, start_timestamp, end_timestamp) | |
def get_or_create_audio_parts(self, audio_file_path): | |
""" | |
Takes an audio file path and splits the audio into parts if needed. | |
:param audio_file_path: The path to the audio file to process. | |
:return: A list of paths to the audio parts. | |
""" | |
# Check if the audio needs to be split by checking its file size - this is approximate, but close enough for gov work | |
file_size_mb = os.path.getsize(audio_file_path) / (1024 * 1024) | |
parts_directory = os.path.join(os.path.dirname(audio_file_path), "parts") | |
os.makedirs(parts_directory, exist_ok=True) | |
# If the audio file has already been split, return the existing parts - else, split the audio file | |
existing_parts = [os.path.join(parts_directory, f) for f in os.listdir(parts_directory) if os.path.isfile(os.path.join(parts_directory, f))] | |
if existing_parts: | |
logging.info("Found existing audio parts. Resuming transcription.") | |
return existing_parts | |
logging.info(f"Audio file size: {file_size_mb} MB") | |
if file_size_mb > self.MAX_SIZE_MB: | |
logging.info(f"Audio file size exceeds maximum size of {self.MAX_SIZE_MB} MB. Splitting audio file into parts.") | |
return self.split_audio_file_by_size(audio_file_path) | |
else: | |
logging.info(f"Audio file size is within maximum size of {self.MAX_SIZE_MB} MB. No need to split the audio file.") | |
return [audio_file_path] | |
def split_audio_file_by_size(self, audio_file_path): | |
""" | |
Takes an audio file path and splits the audio into parts based on the maximum size. | |
:param audio_file_path: The path to the audio file to process. | |
:return: A list of paths to the audio parts. | |
""" | |
logging.info(f"Splitting audio file: {audio_file_path}") | |
audio = AudioSegment.from_file(audio_file_path) | |
max_chunk_duration_ms = ((self.MAX_SIZE * 8) / self.BITRATE) * 1000 | |
logging.info(f"Max chunk duration: {max_chunk_duration_ms} ms") | |
num_chunks = ceil(len(audio) / max_chunk_duration_ms) | |
logging.info(f"Number of chunks: {num_chunks}") | |
chunk_length = len(audio) // num_chunks | |
chunks = [audio[i * chunk_length: (i + 1) * chunk_length] for i in range(num_chunks)] | |
chunk_files = [] | |
for i, chunk in enumerate(chunks): | |
chunk_name = f"{os.path.splitext(audio_file_path)[0]}_part{i}.mp3" | |
logging.info(f"Exporting audio chunk: {chunk_name}") | |
chunk.export(chunk_name, format="mp3") | |
chunk_files.append(chunk_name) | |
logging.info(f"Audio file split into {len(chunk_files)} parts.") | |
return chunk_files | |
def video_to_audio(self, input_video, output_audio): | |
""" | |
Takes a video file path and strips out the audio to save as an MP3 file. | |
:param input_video: The path to the video file to process. | |
:param output_audio: The path to the audio file to save the converted audio to. | |
""" | |
if not os.path.exists(output_audio): | |
video = AudioSegment.from_file(input_video, "mp4") | |
video.export(output_audio, format="mp3", bitrate="128k") | |
logging.info(f"Audio file exported: {output_audio}") | |
else: | |
logging.info("Audio file already exists") | |
def transcribe_audio_part(self, part): | |
""" | |
Takes an audio file part path and transcribes the audio into text via whisper LLM. | |
:param part: The path to the audio file to process. | |
:return: The transcribed text. | |
""" | |
try: | |
logging.info(f"Transcribing audio part: {part}") | |
with open(part, "rb") as audio_file: | |
transcript = self.client.audio.transcriptions.create( | |
model="whisper-1", | |
file=audio_file, | |
response_format="vtt" | |
# This prompt can be used to help the LLM understand the context of the audio and certain terms of art that may be used. | |
#,prompt="UCaaS, CPaaS, STaaS, DRaaS, BLF, CDR, CIM, GCCH, GVBM, HEPIC, SBC, PSTN, SMB, OrecX, Prov" | |
) | |
return transcript | |
except Exception as e: | |
logging.error(f"Failed to transcribe audio part {part}: {e}") | |
raise ValueError("Failed to transcribe audio part. Check your OpenAI Key") | |
def audio_to_transcript(self, input_audio): | |
""" | |
Takes an audio file path and transcribes the audio into text via whisper LLM. | |
:param input_audio: The path to the audio file to process. | |
:return: The path to the transcript file. | |
""" | |
logging.info(f"Transcribing audio: {input_audio}") | |
with open(input_audio, "rb") as audio_file: | |
transcript = self.client.audio.transcriptions.create( | |
model="whisper-1", | |
file=audio_file, | |
response_format="vtt" | |
# This prompt can be used to help the LLM understand the context of the audio and certain terms of art that may be used. | |
#,prompt="UCaaS, CPaaS, STaaS, DRaaS, BLF, CDR, CIM, GCCH, GVBM, HEPIC, SBC, PSTN, SMB, OrecX, Prov" | |
) | |
logging.info("Transcript created") | |
base_name = os.path.splitext(input_audio)[0] | |
output_file = f"{base_name}_transcript.txt" | |
with open(output_file, "w", encoding="utf-8") as f: | |
json.dump(transcript, f, indent=4) | |
logging.info(f"Transcript saved to: {output_file}") | |
return output_file | |
def process_and_save_output(self, base_name, prompt_key, transcript_content, additional_content=None, file_suffix=""): | |
""" | |
Takes a transcript and generates the output for a given prompt. | |
:param base_name: The base name of the transcript file. | |
:param prompt_key: The key of the prompt to use. | |
:param transcript_content: The content of the transcript to process. | |
:param additional_content: Additional content to use in the prompt. | |
:param file_suffix: The suffix to use for the output file. | |
:return: The path to the output file. | |
""" | |
file_name = f"{base_name}_{file_suffix}.txt" | |
# Check if the file already exists | |
if os.path.exists(file_name): | |
logging.info(f"{file_suffix.replace('_', ' ').capitalize()} file already exists: {file_name}") | |
return file_name | |
# Load and process the prompt | |
prompt = self.load_prompt(prompt_key) | |
if additional_content: | |
conversation_history = self.build_conversation_history(self.load_prompt("summary_prompt"), transcript_content, additional_content['summary'], additional_content['topic']) | |
else: | |
conversation_history = self.build_conversation_history(prompt, transcript_content) | |
response = self.send_conversation(conversation_history) | |
content = response.choices[0].message.content | |
clean_lines = [line.strip() for line in content.split('\n') if line.strip() != ''] | |
clean_content = '\n\n'.join(clean_lines) | |
# Write the processed content to the file | |
with open(file_name, "w", encoding="utf-8") as f: | |
f.write(clean_content) | |
logging.info(f"{file_suffix.replace('_', ' ').capitalize()} saved to: {file_name}") | |
return file_name | |
def generate_transcript_outputs(self, transcript_file): | |
""" | |
Takes a transcript file and generates the summary outputs. | |
:param transcript_file: The path to the transcript file to process. | |
""" | |
with open(transcript_file, "r", encoding="utf-8") as file: | |
transcript_content = file.read() | |
base_name = os.path.splitext(transcript_file)[0] | |
# Generate the summary | |
self.process_and_save_output(base_name, "summary_prompt", transcript_content, file_suffix="summary") | |
with open(f"{base_name}_summary.txt", "r", encoding="utf-8") as file: | |
summary_file = file.read() | |
# Generate topic specific summaries | |
topic_prompts = self.generate_topic_prompts(summary_file) | |
# if script run with --topic, generate topic specific summaries. | |
# if extract_topics: | |
#Topic extraction assumed to be true | |
for i, topic_prompt in enumerate(topic_prompts): | |
additional_content = {"summary": summary_file, "topic": topic_prompt} | |
self.process_and_save_output(base_name, "summary_prompt", transcript_content, additional_content, file_suffix=f"topic{i}_summary") | |
# Generate the troubleshooting steps | |
self.process_and_save_output(base_name, "troubleshooting_prompt", transcript_content, file_suffix="troubleshooting_steps") | |
# Generate the glossary | |
self.process_and_save_output(base_name, "glossary_prompt", transcript_content, file_suffix="glossary") | |
# Generate the tags and symptoms | |
self.process_and_save_output(base_name, "tags_prompt", transcript_content, file_suffix="tags_and_symptoms") | |
logging.info(f"Transcript outputs saved to: {os.path.splitext(transcript_file)[0]}") | |
def extract_topics(self, response_text): | |
""" | |
Takes a response text and extracts the topics from it. | |
:param response_text: The response text to process. | |
:return: A list of topics. | |
""" | |
# Regular expression to match the pattern "Topic X: Title" | |
pattern = r"Topic \d+: .+" | |
topics = re.findall(pattern, response_text) | |
return topics | |
# OpenAI Functions | |
def generate_topic_prompts(self, response_text): | |
""" | |
Takes a response text and generates the topic prompts. | |
:param response_text: The response text to process. | |
:return: A list of topic prompts. | |
""" | |
topics = self.extract_topics(response_text) | |
base_prompt = self.load_prompt("topic_prompt") | |
topic_prompts = [] | |
for topic in topics: | |
modified_prompt = base_prompt.replace("[REPLACE_ME]", topic) | |
topic_prompts.append(modified_prompt) | |
return topic_prompts | |
def load_prompt(self,prompt_key): | |
""" | |
Takes a prompt key and loads the prompt from the prompts folder. | |
:param prompt_key: The key of the prompt to load. | |
:return: The prompt content. | |
""" | |
prompt_path = self.prompts[prompt_key] | |
print(f"Loading prompt from: {prompt_path}") # Debugging line | |
with open(self.prompts[prompt_key], 'r', encoding='utf-8') as file: | |
return file.read() | |
def send_conversation(self, conversation_history): | |
""" | |
Takes a conversation history and sends it to the OpenAI API to generate a response. | |
:param conversation_history: The conversation history to send. | |
:return: The response from the LLM | |
""" | |
response = openai.chat.completions.create( | |
model="gpt-4-1106-preview", | |
#model="gpt-3.5-turbo-1106", | |
messages=conversation_history, | |
max_tokens=4096, | |
temperature=0.00, | |
) | |
return response | |
def build_conversation_history(self, system_prompt, user_prompt1, assistant_response=None, user_prompt2=None): | |
""" | |
Takes a system prompt, user prompt, and optional assistant response and user prompt and builds a conversation history. | |
:param system_prompt: The system prompt to use. | |
:param user_prompt1: The first user prompt to use. | |
:param assistant_response: The assistant response to use. | |
:param user_prompt2: The second user prompt to use. | |
:return: The conversation history. | |
""" | |
conversation_history = [ | |
{"role": "system", "content": system_prompt}, | |
{"role": "user", "content": user_prompt1} | |
] | |
# Check if both or none of the optional parameters are provided | |
if (assistant_response is not None and user_prompt2 is not None) or (assistant_response is None and user_prompt2 is None): | |
# Append the optional prompts if both are provided | |
if assistant_response is not None: | |
conversation_history.append({"role": "assistant", "content": assistant_response}) | |
conversation_history.append({"role": "user", "content": user_prompt2}) | |
else: | |
raise ValueError("Both 'assistant_response' and 'user_prompt2' must be provided together or not at all.") | |
return conversation_history | |
def generate_article(self, input_file): | |
""" | |
Takes an input file path and generates a article from it. | |
:param input_file_path: The path to the input file to process. | |
:return: The article. | |
""" | |
article_prompt = self.load_prompt("article_prompt") | |
with open(input_file, "r", encoding="utf-8") as file: | |
file_content = file.read() | |
article_convo = self.build_conversation_history(article_prompt, file_content) | |
response = self.send_conversation(article_convo) | |
content = response.choices[0].message.content | |
clean_lines = [line.strip() for line in content.split('\n') if line.strip() != ''] | |
clean_content = '\n\n'.join(clean_lines) | |
return clean_content | |
def process_articles(self, input_path): | |
""" | |
Takes a path to a folder containing input files and generates articles from them. | |
:param input_path: The path to the folder containing input files to process. | |
""" | |
logging.info(f"Processing article inputs in folder: {input_path}") | |
for filename in tqdm(os.listdir(input_path), desc="Processing Files"): | |
if filename.endswith("_summary.txt") or filename.endswith("_troubleshooting_steps.txt"): | |
logging.info(f"Processing article input: {filename}") | |
input_file = os.path.join(input_path, filename) | |
article = self.generate_article(input_file) | |
output_file = os.path.join(input_path, f"{os.path.splitext(filename)[0]}_article.txt") | |
with open(output_file, "w", encoding="utf-8") as f: | |
f.write(article) | |
logging.info(f"Article saved to: {output_file}") | |
# Everything below is added to adjust existing script to run self service. | |
# Until load_dotenv() | |
def get_drive_service(self): | |
SCOPES = ['https://www.googleapis.com/auth/drive'] | |
credentials_json = os.getenv('GOOGLE_APPLICATION_CREDENTIALS_JSON') | |
if credentials_json is None: | |
raise ValueError("Environment variable 'GOOGLE_APPLICATION_CREDENTIALS_JSON' is not set") | |
credentials_info = json.loads(credentials_json) | |
credentials = service_account.Credentials.from_service_account_info(credentials_info, scopes=SCOPES) | |
return build('drive', 'v3', credentials=credentials) | |
def extract_drive_folder_id(self, drive_link): | |
# This can be expanded to handle various Google Drive link formats | |
match = re.search(r'folders/([^/?]+)', drive_link) | |
if match: | |
return match.group(1) | |
else: | |
raise ValueError("Invalid Google Drive folder link.") | |
def list_files_in_folder(self, service, folder_id): | |
try: | |
results = service.files().list( | |
q=f"'{folder_id}' in parents and trashed=false", | |
pageSize=100, | |
fields="nextPageToken, files(id, name)").execute() | |
return results.get('files', []) | |
except Exception as e: | |
logging.error(f"Failed to list files in Google Drive folder {folder_id}: {e}") | |
raise ValueError("Failed to list files in Google Drive folder. Check folder permissions") | |
def download_file(self, service, file_id, file_path): | |
try: | |
# Ensure the directory where the file will be saved exists | |
os.makedirs(os.path.dirname(file_path), exist_ok=True) | |
# Check if the file is an mp4 file before downloading | |
if not file_path.endswith('.mp4'): | |
logging.info(f"Skipping non-mp4 file: {file_path}") | |
return | |
request = service.files().get_media(fileId=file_id) | |
with open(file_path, 'wb') as fh: | |
downloader = MediaIoBaseDownload(fh, request) | |
done = False | |
while done is False: | |
status, done = downloader.next_chunk() | |
except Exception as e: | |
logging.error(f"Failed to download file {file_id} to {file_path}: {e}") | |
raise ValueError("Failed to download file") | |
def find_or_create_drive_folder(self, service, folder_name, parent_folder_id): | |
try: | |
# Check if folder exists | |
query = f"mimeType='application/vnd.google-apps.folder' and name='{folder_name}' and '{parent_folder_id}' in parents and trashed=false" | |
response = service.files().list(q=query, spaces='drive', fields='files(id, name)').execute() | |
files = response.get('files', []) | |
if files: | |
# Folder exists, return its ID | |
return files[0]['id'] | |
else: | |
# Folder doesn't exist, create it | |
folder_metadata = { | |
'name': folder_name, | |
'mimeType': 'application/vnd.google-apps.folder', | |
'parents': [parent_folder_id] | |
} | |
folder = service.files().create(body=folder_metadata, fields='id').execute() | |
return folder.get('id') | |
except Exception as e: | |
logging.error(f"Failed to find or create Google Drive folder '{folder_name}': {e}") | |
raise ValueError("Failed to find or create Google Drive folder. Check permissions") | |
def upload_file(self, service, file_path, drive_folder_id): | |
try: | |
file_metadata = {'name': os.path.basename(file_path), 'parents': [drive_folder_id]} | |
media = MediaFileUpload(file_path, resumable=True) | |
file = service.files().create(body=file_metadata, media_body=media, fields='id').execute() | |
logging.info(f"Uploaded {file_path} to Google Drive with ID {file.get('id')}") | |
except Exception as e: | |
logging.error(f"Failed to upload file {file_path} to Google Drive: {e}") | |
raise ValueError("Failed to upload file to Google Drive. Check folder permissions") | |
def sync_folder_to_drive(self, service, local_folder_path, drive_parent_folder_id, is_root=True): | |
""" | |
Synchronize a local folder structure and its files with Google Drive. | |
:param service: Authenticated Google Drive service instance. | |
:param local_folder_path: Path to the local folder to sync. | |
:param drive_parent_folder_id: The Google Drive folder ID to sync with. | |
:param is_root: Boolean indicating if the current folder is the root of the sync operation. | |
""" | |
# If it's the root directory, upload files directly in it, then handle directories | |
if is_root: | |
for item_name in os.listdir(local_folder_path): | |
item_path = os.path.join(local_folder_path, item_name) | |
if os.path.isfile(item_path): | |
# Uploads 'processing.log' and any other files directly under the root | |
self.upload_file(service, item_path, drive_parent_folder_id) | |
# Process directories and their contents | |
for item_name in os.listdir(local_folder_path): | |
item_path = os.path.join(local_folder_path, item_name) | |
if os.path.isdir(item_path): | |
# It's a directory, find or create a corresponding folder on Drive | |
drive_folder_id = self.find_or_create_drive_folder(service, item_name, drive_parent_folder_id) | |
# Recursively sync the subfolder | |
self.sync_folder_to_drive(service, item_path, drive_folder_id, is_root=False) | |
elif os.path.isfile(item_path) and not is_root: | |
# For files in subdirectories, upload them to their respective folder on Google Drive | |
self.upload_file(service, item_path, drive_parent_folder_id) | |
def cleanup_input_folder(self, folder_path): | |
""" | |
Deletes all files and folders under the specified folder_path. | |
:param folder_path: Path to the folder to clean up. | |
""" | |
try: | |
# Safety check to prevent accidental deletion of unintended directories | |
if "Input-Folder" in folder_path: | |
# List all items in the folder | |
for item_name in os.listdir(folder_path): | |
item_path = os.path.join(folder_path, item_name) | |
try: | |
# Check if it's a file and delete it | |
if os.path.isfile(item_path) or os.path.islink(item_path): | |
os.unlink(item_path) | |
# Else, it's a directory, delete the directory tree | |
elif os.path.isdir(item_path): | |
shutil.rmtree(item_path) | |
logging.info(f"Deleted {item_path}") | |
except Exception as e: | |
logging.error(f"Failed to delete {item_path}. Reason: {e}") | |
else: | |
logging.error("Safety check failed. The folder path does not seem to be correct.") | |
except Exception as e: | |
logging.error(f"Failed to clean up input folder {folder_path}: {e}") | |
raise ValueError("Failed to clean up input folder") | |
def check_folder_accessibility(self, service, folder_id): | |
""" | |
Checks if the specified Google Drive folder is publicly accessible and has write access. | |
:param service: The Google Drive service instance. | |
:param folder_id: The ID of the Google Drive folder to check. | |
:return: True if the folder is publicly accessible and has write access, False otherwise. | |
""" | |
try: | |
permissions = service.permissions().list(fileId=folder_id).execute() | |
for permission in permissions.get('permissions', []): | |
# Check if the permission type is anyone (public) and the role includes writer or owner | |
if permission.get('type') == 'anyone' and permission.get('role') in ['writer', 'owner']: | |
return True | |
return False | |
except Exception as e: | |
logging.error(f"Failed to check folder accessibility for folder {folder_id}: {e}") | |
return False | |
# Above is newly added codes | |
# Load environment variables and API key via .env file | |
load_dotenv() | |
api_key = os.getenv("OPENAI_API_KEY") | |
# Example usage | |
if __name__ == "__main__": | |
input_folder_path = os.path.abspath(args.input_folder) | |
transcribe = args.transcribe | |
extract_topics = args.topic | |
processor = KnowledgeTranscriber(api_key) | |
processor.process_folder(input_folder_path, transcribe) |