Spaces:
Sleeping
Sleeping
File size: 44,033 Bytes
6718307 eb62b94 6718307 eb62b94 6718307 eb62b94 6718307 54e0bdf 6718307 54e0bdf 6718307 54e0bdf 6718307 54e0bdf 6718307 54e0bdf 6718307 54e0bdf 6718307 54e0bdf 6718307 54e0bdf 6718307 54e0bdf 6718307 54e0bdf 6718307 54e0bdf 6718307 54e0bdf 6718307 54e0bdf 6718307 54e0bdf 6718307 54e0bdf 6718307 eb62b94 6718307 54e0bdf 6718307 54e0bdf 6718307 54e0bdf 6718307 54e0bdf 6718307 54e0bdf 6718307 54e0bdf 6718307 54e0bdf 6718307 6f82c92 6718307 6f82c92 6718307 eb62b94 6718307 eb62b94 1773c97 eb62b94 6718307 eb62b94 6718307 eb62b94 6718307 eb62b94 6718307 eb62b94 6718307 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630 631 632 633 634 635 636 637 638 639 640 641 642 643 644 645 646 647 648 649 650 651 652 653 654 655 656 657 658 659 660 661 662 663 664 665 666 667 668 669 670 671 672 673 674 675 676 677 678 679 680 681 682 683 684 685 686 687 688 689 690 691 692 693 694 695 696 697 698 699 700 701 702 703 704 705 706 707 708 709 710 711 712 713 714 715 716 717 718 719 720 721 722 723 724 725 726 727 728 729 730 731 732 733 734 735 736 737 738 739 740 741 742 743 744 745 746 747 748 749 750 751 752 753 754 755 756 757 758 759 760 761 762 763 764 765 766 767 768 769 770 771 772 773 774 775 776 777 778 779 780 781 782 783 784 785 786 787 788 789 790 791 792 793 794 795 796 797 798 799 800 801 802 803 804 805 806 807 808 809 810 811 812 813 814 815 816 817 818 819 820 821 822 823 824 825 826 827 828 829 830 831 832 833 834 835 836 837 838 839 840 841 842 843 844 845 846 847 848 849 850 851 852 853 854 855 856 857 858 859 860 861 862 863 864 865 866 867 868 869 870 871 872 873 874 875 876 877 878 879 880 881 882 883 884 885 886 887 888 889 890 891 892 893 894 895 896 897 898 899 900 901 902 903 904 905 906 907 908 909 910 911 912 913 914 |
import os
import re
import openai
import logging
import json
import shutil
import argparse
import numpy as np
from moviepy.editor import VideoFileClip
from PIL import Image
from datetime import datetime, timedelta
from math import ceil
from dotenv import load_dotenv
from pydub import AudioSegment
from tqdm import tqdm
from google.oauth2 import service_account
from googleapiclient.discovery import build
from googleapiclient.http import MediaFileUpload
from googleapiclient.http import MediaIoBaseDownload
import shutil
parser = argparse.ArgumentParser(description='Process KT videos in a given folder to generate transcripts and summaries of what was discussed.')
parser.add_argument(
'input_folder',
nargs='?', # Optional
default='.', # Use the current working directory if no folder is specified
help='The folder containing videos to process relative to the current working directory.'
)
parser.add_argument(
'--topic',
nargs='?', # Optional
default=False,
help='If set to True, will generate topic-specific summaries in addition to the high-level summary.'
)
parser.add_argument(
'--transcribe',
nargs='?', # Optional
default=True,
help='If set to False, will skip transcribing and leverage an existing _full_transcript.txt file to generate outputs.'
)
args = parser.parse_args()
log_file_path = os.path.join(os.path.abspath(args.input_folder), "processing.log")
logging.basicConfig(
level=logging.INFO,
handlers=[
logging.StreamHandler(),
logging.FileHandler(log_file_path, mode='a')
],
format='%(asctime)s - %(levelname)s - %(funcName)s - %(message)s',
datefmt='%Y-%m-%d %H:%M:%S'
)
class KnowledgeTranscriber(object):
MAX_SIZE = 5000000 # 5 MB
MAX_SIZE_MB = MAX_SIZE / (1024 * 1024) # Convert bytes to MB
BITRATE = 128000 # 128 kbps
def __init__(self, api_key):
self.client = openai.OpenAI(api_key=api_key)
script_dir = os.path.dirname(os.path.abspath(__file__)) # Absolute directory of the script
prompts = {
"summary_prompt": os.path.join(script_dir, "prompts", "1-summary_prompt.txt"),
"topic_prompt": os.path.join(script_dir, "prompts", "2-topic_prompt.txt"),
"troubleshooting_prompt": os.path.join(script_dir, "prompts", "3-troubleshooting_prompt.txt"),
"glossary_prompt": os.path.join(script_dir, "prompts", "4-glossary_prompt.txt"),
"tags_prompt": os.path.join(script_dir, "prompts", "5-tags_prompt.txt"),
"article_prompt": os.path.join(script_dir, "prompts", "6-article_prompt.txt")
}
def process_folder(self, folder_path, transcribe_flag, drive_folder_link):
"""
Takes a folder path and processes all videos or transcripts in the folder.
First downloads all videos from the specified Google Drive folder to the local folder path.
:param folder_path: The path to the folder containing videos or transcripts to process.
:param transcribe_flag: Flag to indicate if transcription is needed.
:param drive_folder_id: The ID of the Google Drive folder containing the videos.
"""
###Added from here Initialize Google Drive service
try:
drive_service = self.get_drive_service()
except Exception as e:
logging.error(f"Failed to initialize Google Drive service: {e}")
raise ValueError("Failed to initialize Google Drive service")
# Extract the folder ID from the URL.
folder_id = self.extract_drive_folder_id(drive_folder_link)
# Check if the folder is publicly accessible and has write access
if not self.check_folder_accessibility(drive_service, folder_id):
raise ValueError("The Google Drive folder is not publicly accessible or does not have write access.")
# List all files in the Google Drive folder
drive_files = self.list_files_in_folder(drive_service, folder_id)
logging.info(f"Downloading files from Google Drive folder: {folder_id}")
# Download each file to the local folder_path
for file in tqdm(drive_files, desc="Downloading Files"):
file_name = file['name']
file_id = file['id']
local_file_path = os.path.join(folder_path, file_name)
if not os.path.exists(local_file_path): # Avoid re-downloading files
logging.info(f"Downloading file: {file_name}")
self.download_file(drive_service, file_id, local_file_path)
else:
logging.info(f"File already exists: {file_name}")
###End of Added
### All below under this line is same with the original script. Until Next Added from here
logging.info(f"Processing files in folder: {folder_path}")
for filename in tqdm(os.listdir(folder_path), desc="Processing Files"):
if transcribe_flag == "False":
if filename.endswith("_full_transcript.txt"):
# Processing for transcript files generated by Zoom/Loom/etc. already.
logging.info(f"Processing transcript: {filename}")
base_name = filename.replace("_full_transcript.txt", "")
new_folder_path = os.path.join(folder_path, base_name)
logging.info(f"New folder path: {new_folder_path}")
# Folder handling outside main video processing functions.
if not os.path.exists(new_folder_path):
os.makedirs(new_folder_path)
original_path = os.path.join(folder_path, filename)
new_path = os.path.join(new_folder_path, filename)
logging.info(f"Moving file from {original_path} to {new_path}")
shutil.move(original_path, new_path)
# Generate our ouputs
self.generate_transcript_outputs(new_path)
self.process_articles(new_folder_path)
logging.info(f"Processing complete for: {filename}.")
else:
if filename.endswith(".mp4"):
# Process for video files
logging.info(f"Processing video: {filename}")
video_path = os.path.join(folder_path, filename)
self.process_video(video_path, folder_path)
logging.info(f"Processing complete for: {filename}.")
###Added from here
# After processing files
logging.info(f"Processing complete for all files in folder: {folder_path}. Uploading processed files to Google Drive.")
# Iterate over files in the input folder and upload each to Google Drive
self.sync_folder_to_drive(drive_service, folder_path, folder_id, is_root=True)
logging.info(f"Uploading processed files to Google Drive complete for all files in folder: {folder_path}. Success.")
# Calling the cleanup function
input_folder_path = os.path.abspath(folder_path)
self.cleanup_input_folder(input_folder_path)
###End of Added
def check_and_process(self, file_path, process_func, file_description):
"""
Validates if a file exists and processes it if it doesn't.
:param file_path: The path to the file to check.
:param process_func: The function to call to process the file if it doesn't exist.
:param file_description: A description of the file to use in logging.
"""
if not os.path.exists(file_path):
logging.info(f"Processing {file_description}: {file_path}")
process_func()
else:
logging.info(f"{file_description} already exists.")
def process_video(self, input_video, folder_path):
"""
Takes a video path, processes the video into a transcript, and a collection of knowledge outputs.
:param input_video: The path to the video to process.
:param folder_path: The path to the folder containing the video to process.
"""
base_name = os.path.splitext(os.path.basename(input_video))[0]
output_folder = os.path.join(folder_path, f"{base_name}_output")
processed_folder = os.path.join(folder_path, "Processed")
os.makedirs(output_folder, exist_ok=True)
os.makedirs(processed_folder, exist_ok=True)
output_audio = os.path.join(output_folder, f"{base_name}.mp3")
processed_audio = os.path.join(processed_folder, f"{base_name}.mp3")
processed_video = os.path.join(processed_folder, f"{base_name}.mp4")
transcript_file = os.path.join(output_folder, f"{base_name}_full_transcript.txt")
# Checks to avoid re-processing to save time and calls to GPT.
self.check_and_process(
output_audio,
lambda: self.video_to_audio(input_video, output_audio),
"Audio file"
)
self.check_and_process(
transcript_file,
lambda: self.transcribe_and_combine_audio(output_audio),
"Transcript file"
)
transcript_outputs_exist = os.path.exists(os.path.join(output_folder, f"{base_name}_summary.txt")) or \
os.path.exists(os.path.join(output_folder, f"{base_name}_troubleshooting_steps.txt")) or \
os.path.exists(os.path.join(output_folder, f"{base_name}_glossary.txt")) or \
os.path.exists(os.path.join(output_folder, f"{base_name}_tags_and_symptoms.txt"))
if not transcript_outputs_exist:
logging.info(f"Generating transcript outputs for: {transcript_file}")
self.generate_transcript_outputs(transcript_file)
else:
logging.info("Transcript-related outputs already exist.")
# Handling the screenshot capture and processing
logging.info(f"Checking summary file for timestamps: {transcript_file}")
summary_file = os.path.join(output_folder, f"{base_name}_full_transcript_summary.txt")
troubleshooting_file = os.path.join(output_folder, f"{base_name}_full_transcript_troubleshooting_steps.txt")
timestamp_list = self.find_timestamps(summary_file) + self.find_timestamps(troubleshooting_file)
if timestamp_list:
logging.info(f"Timestamps found in summary file: {summary_file}")
screenshot_folder = os.path.join(output_folder, "Screenshots")
os.makedirs(screenshot_folder, exist_ok=True)
#self.parse_and_extract_frames(input_video, screenshot_folder, timestamp_list)
else:
logging.info(f"No timestamps found in summary file: {summary_file}")
self.check_and_process(
processed_audio,
lambda: shutil.move(output_audio, processed_audio),
"Processed audio"
)
self.check_and_process(
processed_video,
lambda: shutil.move(input_video, processed_video),
"Processed video"
)
# Generate final articles from summary and troubleshooting steps.
self.process_articles(output_folder)
logging.info(f"Files saved to: {output_folder}")
logging.info(f"Processing complete for: {input_video}.")
def transcribe_and_combine_audio(self, audio_file_path):
"""
Takes an audio file path, splits the audio into parts if needed, transcribes the audio parts, and combines the transcriptions into a single transcript file.
:param audio_file_path: The path to the audio file to process.
:return: The path to the transcript file.
"""
base_file_path = os.path.splitext(audio_file_path)[0]
transcript_file_path = f"{base_file_path}_full_transcript.txt"
manifest_file_path = f"{base_file_path}_manifest.txt"
# Load or initialize the manifest for keeping track of processed parts
if os.path.exists(manifest_file_path):
with open(manifest_file_path, "r", encoding="utf-8") as manifest_file:
processed_parts = set(manifest_file.read().splitlines())
else:
processed_parts = set()
# Transcribe each part of the audio file, as needed
parts_to_transcribe = sorted(self.get_or_create_audio_parts(audio_file_path))
for part in parts_to_transcribe:
part_transcript_file = f"{part}_transcript.txt"
if part in processed_parts:
logging.info(f"Transcription part already exists: {part_transcript_file}")
else:
logging.info(f"Transcribing audio part: {part}")
transcription = self.transcribe_audio_part(part)
with open(part_transcript_file, "w", encoding="utf-8") as part_file:
part_file.write(transcription)
processed_parts.add(part)
logging.info(f"Transcription complete for: {part}")
with open(manifest_file_path, "w", encoding="utf-8") as manifest_file:
manifest_file.write("\n".join(sorted(processed_parts)))
# Check if the part is not the main audio file before removing
if part != audio_file_path:
os.remove(part)
logging.info(f"Removed audio part: {part}")
# Once all parts have been transcribed, combine them into the full transcript file
with open(transcript_file_path, "w", encoding="utf-8") as transcript_file:
for part in parts_to_transcribe:
logging.info(f"Combining transcript part: {part}")
part_transcript_file = f"{part}_transcript.txt"
with open(part_transcript_file, "r", encoding="utf-8") as part_file:
transcript_file.write(part_file.read() + "\n")
os.remove(part_transcript_file)
logging.info(f"Removed transcript part: {part_transcript_file}")
# Now, we need to take the transcript file and adjust the timestamps to account for the audio parts
with open(transcript_file_path, 'r', encoding="utf-8") as file:
full_transcript = file.read()
adjusted_content = self.adjust_timestamps(full_transcript)
with open(transcript_file_path, 'w', encoding="utf-8") as file:
file.write(adjusted_content)
logging.info(f"Transcript saved to: {transcript_file_path}")
os.remove(manifest_file_path)
return transcript_file_path
#def parse_time(self, time_str):
# """Convert a timestamp string to seconds."""
# h, m, s = map(float, time_str.split(':'))
# return h * 3600 + m * 60 + s
def parse_time(self, time_str):
"""Convert a timestamp string to seconds, supporting both colon and hyphen separators."""
if '-' in time_str:
h, m, s = map(float, time_str.split('-'))
else:
h, m, s = map(float, time_str.split(':'))
return h * 3600 + m * 60 + s
#def format_time(self, seconds):
# """Convert seconds back to a timestamp string."""
# h = int(seconds // 3600)
# m = int((seconds % 3600) // 60)
# s = seconds % 60
# return f"{h:02}:{m:02}:{s:06.3f}"
def format_time(self, seconds):
"""Convert seconds back to a timestamp string, using hyphens instead of colons."""
h = int(seconds // 3600)
m = int((seconds % 3600) // 60)
s = seconds % 60
return f"{h:02}-{m:02}-{s:06.3f}"
def adjust_timestamps(self, vtt_content):
"""
Takes a VTT content string and adjusts the timestamps to account for the audio parts.
:param vtt_content: The VTT content to process.
:return: The adjusted VTT content.
"""
sections = vtt_content.split("WEBVTT")
adjusted_sections = []
time_offset = 0
for section in sections[1:]: # Skip the first section as it's likely the header
lines = section.strip().split("\n")
adjusted_lines = []
for line in lines:
if '-->' in line:
start, end = line.split(' --> ')
start_sec = self.parse_time(start) + time_offset
end_sec = self.parse_time(end) + time_offset
adjusted_line = f"{self.format_time(start_sec)} --> {self.format_time(end_sec)}"
adjusted_lines.append(adjusted_line)
else:
adjusted_lines.append(line)
# Update the time offset using the last timestamp of the current section
if adjusted_lines:
last_time = adjusted_lines[-2] # The second last line contains the last timestamp
time_split = last_time.split(' --> ')
if len(time_split) == 2:
_, end = time_split
time_offset = self.parse_time(end)
adjusted_sections.append('\n'.join(adjusted_lines))
return "WEBVTT\n\n".join(adjusted_sections)
def extract_frames_by_range(self, video_path, target_folder, start_time, end_time, fps=1):
"""
Takes a video path, a start time, and an end time, and extracts frames from the video between the given timestamps.
:param video_path: The path to the video to process.
:param target_folder: The path to the folder to save the extracted frames to.
:param start_time: The start time to extract frames from in HH:MM:SS.mmm format.
:param end_time: The end time to extract frames to in HH:MM:SS.mmm format.
:param fps: The frames per second to extract from the video.
"""
# Convert start_time and end_time from HH:MM:SS.mmm to seconds
start_seconds = sum(x * float(t) for x, t in zip([3600, 60, 1, 0.001], start_time.split(":")))
end_seconds = sum(x * float(t) for x, t in zip([3600, 60, 1, 0.001], end_time.split(":")))
# Create the target folder if it doesn't exist
if not os.path.exists(target_folder):
os.makedirs(target_folder)
with VideoFileClip(video_path) as video:
# Calculate the interval between frames based on the desired fps
interval = 1 / fps
# Adjust the loop to iterate over the desired timestamp range
current_time = start_seconds
while current_time < end_seconds:
frame = video.get_frame(current_time)
timestamp = self.format_time(int(current_time))
frame_path = os.path.join(target_folder, f"{timestamp}.png")
Image.fromarray(np.uint8(frame)).save(frame_path)
current_time += interval
def find_timestamps(self, file_path):
"""
Takes a file path and finds the timestamps within the file.
Searches for timestamps in the format "at 00:00:20.360" and "[00:00:28.559]".
:param file_path: The path to the file to process.
:return: A list of timestamps.
"""
# Updated pattern to match both "at 00:00:20.360" and "[00:00:28.559]"
timestamp_pattern = r'at (\d{2}:\d{2}:\d{2}\.\d{3})|\[(\d{2}:\d{2}:\d{2}\.\d{3})\]'
timestamps = []
with open(file_path, 'r', encoding='utf-8') as file:
content = file.read()
# Find all matches and process them to flatten the list and remove None
raw_matches = re.findall(timestamp_pattern, content)
for match in raw_matches:
# match is a tuple where one group is the timestamp and the other is empty
timestamp = match[0] if match[0] else match[1]
timestamps.append(timestamp)
return timestamps
def parse_and_extract_frames(self, video_path, target_path, timestamps):
"""
Takes a video path and a list of timestamps, and extracts frames from the video around the given timestamps.
:param video_path: The path to the video to process.
:param target_path: The path to the folder to save the extracted frames to.
:param timestamps: A list of timestamps to extract frames around.
"""
# Function to adjust the timestamp by a given number of seconds
def adjust_timestamp(timestamp, seconds):
timestamp_dt = datetime.strptime(timestamp, "%H:%M:%S.%f")
adjusted_timestamp = timestamp_dt + timedelta(seconds=seconds)
return adjusted_timestamp.strftime("%H:%M:%S.%f")[:-3]
for timestamp in timestamps:
start_timestamp = adjust_timestamp(timestamp, -5)
end_timestamp = adjust_timestamp(timestamp, 5)
self.extract_frames_by_range(video_path, target_path, start_timestamp, end_timestamp)
def get_or_create_audio_parts(self, audio_file_path):
"""
Takes an audio file path and splits the audio into parts if needed.
:param audio_file_path: The path to the audio file to process.
:return: A list of paths to the audio parts.
"""
# Check if the audio needs to be split by checking its file size - this is approximate, but close enough for gov work
file_size_mb = os.path.getsize(audio_file_path) / (1024 * 1024)
parts_directory = os.path.join(os.path.dirname(audio_file_path), "parts")
os.makedirs(parts_directory, exist_ok=True)
# If the audio file has already been split, return the existing parts - else, split the audio file
existing_parts = [os.path.join(parts_directory, f) for f in os.listdir(parts_directory) if os.path.isfile(os.path.join(parts_directory, f))]
if existing_parts:
logging.info("Found existing audio parts. Resuming transcription.")
return existing_parts
logging.info(f"Audio file size: {file_size_mb} MB")
if file_size_mb > self.MAX_SIZE_MB:
logging.info(f"Audio file size exceeds maximum size of {self.MAX_SIZE_MB} MB. Splitting audio file into parts.")
return self.split_audio_file_by_size(audio_file_path)
else:
logging.info(f"Audio file size is within maximum size of {self.MAX_SIZE_MB} MB. No need to split the audio file.")
return [audio_file_path]
def split_audio_file_by_size(self, audio_file_path):
"""
Takes an audio file path and splits the audio into parts based on the maximum size.
:param audio_file_path: The path to the audio file to process.
:return: A list of paths to the audio parts.
"""
logging.info(f"Splitting audio file: {audio_file_path}")
audio = AudioSegment.from_file(audio_file_path)
max_chunk_duration_ms = ((self.MAX_SIZE * 8) / self.BITRATE) * 1000
logging.info(f"Max chunk duration: {max_chunk_duration_ms} ms")
num_chunks = ceil(len(audio) / max_chunk_duration_ms)
logging.info(f"Number of chunks: {num_chunks}")
chunk_length = len(audio) // num_chunks
chunks = [audio[i * chunk_length: (i + 1) * chunk_length] for i in range(num_chunks)]
chunk_files = []
for i, chunk in enumerate(chunks):
chunk_name = f"{os.path.splitext(audio_file_path)[0]}_part{i}.mp3"
logging.info(f"Exporting audio chunk: {chunk_name}")
chunk.export(chunk_name, format="mp3")
chunk_files.append(chunk_name)
logging.info(f"Audio file split into {len(chunk_files)} parts.")
return chunk_files
def video_to_audio(self, input_video, output_audio):
"""
Takes a video file path and strips out the audio to save as an MP3 file.
:param input_video: The path to the video file to process.
:param output_audio: The path to the audio file to save the converted audio to.
"""
if not os.path.exists(output_audio):
video = AudioSegment.from_file(input_video, "mp4")
video.export(output_audio, format="mp3", bitrate="128k")
logging.info(f"Audio file exported: {output_audio}")
else:
logging.info("Audio file already exists")
def transcribe_audio_part(self, part):
"""
Takes an audio file part path and transcribes the audio into text via whisper LLM.
:param part: The path to the audio file to process.
:return: The transcribed text.
"""
try:
logging.info(f"Transcribing audio part: {part}")
with open(part, "rb") as audio_file:
transcript = self.client.audio.transcriptions.create(
model="whisper-1",
file=audio_file,
response_format="vtt"
# This prompt can be used to help the LLM understand the context of the audio and certain terms of art that may be used.
#,prompt="UCaaS, CPaaS, STaaS, DRaaS, BLF, CDR, CIM, GCCH, GVBM, HEPIC, SBC, PSTN, SMB, OrecX, Prov"
)
return transcript
except Exception as e:
logging.error(f"Failed to transcribe audio part {part}: {e}")
raise ValueError("Failed to transcribe audio part. Check your OpenAI Key")
def audio_to_transcript(self, input_audio):
"""
Takes an audio file path and transcribes the audio into text via whisper LLM.
:param input_audio: The path to the audio file to process.
:return: The path to the transcript file.
"""
logging.info(f"Transcribing audio: {input_audio}")
with open(input_audio, "rb") as audio_file:
transcript = self.client.audio.transcriptions.create(
model="whisper-1",
file=audio_file,
response_format="vtt"
# This prompt can be used to help the LLM understand the context of the audio and certain terms of art that may be used.
#,prompt="UCaaS, CPaaS, STaaS, DRaaS, BLF, CDR, CIM, GCCH, GVBM, HEPIC, SBC, PSTN, SMB, OrecX, Prov"
)
logging.info("Transcript created")
base_name = os.path.splitext(input_audio)[0]
output_file = f"{base_name}_transcript.txt"
with open(output_file, "w", encoding="utf-8") as f:
json.dump(transcript, f, indent=4)
logging.info(f"Transcript saved to: {output_file}")
return output_file
def process_and_save_output(self, base_name, prompt_key, transcript_content, additional_content=None, file_suffix=""):
"""
Takes a transcript and generates the output for a given prompt.
:param base_name: The base name of the transcript file.
:param prompt_key: The key of the prompt to use.
:param transcript_content: The content of the transcript to process.
:param additional_content: Additional content to use in the prompt.
:param file_suffix: The suffix to use for the output file.
:return: The path to the output file.
"""
file_name = f"{base_name}_{file_suffix}.txt"
# Check if the file already exists
if os.path.exists(file_name):
logging.info(f"{file_suffix.replace('_', ' ').capitalize()} file already exists: {file_name}")
return file_name
# Load and process the prompt
prompt = self.load_prompt(prompt_key)
if additional_content:
conversation_history = self.build_conversation_history(self.load_prompt("summary_prompt"), transcript_content, additional_content['summary'], additional_content['topic'])
else:
conversation_history = self.build_conversation_history(prompt, transcript_content)
response = self.send_conversation(conversation_history)
content = response.choices[0].message.content
clean_lines = [line.strip() for line in content.split('\n') if line.strip() != '']
clean_content = '\n\n'.join(clean_lines)
# Write the processed content to the file
with open(file_name, "w", encoding="utf-8") as f:
f.write(clean_content)
logging.info(f"{file_suffix.replace('_', ' ').capitalize()} saved to: {file_name}")
return file_name
def generate_transcript_outputs(self, transcript_file):
"""
Takes a transcript file and generates the summary outputs.
:param transcript_file: The path to the transcript file to process.
"""
with open(transcript_file, "r", encoding="utf-8") as file:
transcript_content = file.read()
base_name = os.path.splitext(transcript_file)[0]
# Generate the summary
self.process_and_save_output(base_name, "summary_prompt", transcript_content, file_suffix="summary")
with open(f"{base_name}_summary.txt", "r", encoding="utf-8") as file:
summary_file = file.read()
# Generate topic specific summaries
topic_prompts = self.generate_topic_prompts(summary_file)
# if script run with --topic, generate topic specific summaries.
# if extract_topics:
#Topic extraction assumed to be true
for i, topic_prompt in enumerate(topic_prompts):
additional_content = {"summary": summary_file, "topic": topic_prompt}
self.process_and_save_output(base_name, "summary_prompt", transcript_content, additional_content, file_suffix=f"topic{i}_summary")
# Generate the troubleshooting steps
self.process_and_save_output(base_name, "troubleshooting_prompt", transcript_content, file_suffix="troubleshooting_steps")
# Generate the glossary
self.process_and_save_output(base_name, "glossary_prompt", transcript_content, file_suffix="glossary")
# Generate the tags and symptoms
self.process_and_save_output(base_name, "tags_prompt", transcript_content, file_suffix="tags_and_symptoms")
logging.info(f"Transcript outputs saved to: {os.path.splitext(transcript_file)[0]}")
def extract_topics(self, response_text):
"""
Takes a response text and extracts the topics from it.
:param response_text: The response text to process.
:return: A list of topics.
"""
# Regular expression to match the pattern "Topic X: Title"
pattern = r"Topic \d+: .+"
topics = re.findall(pattern, response_text)
return topics
# OpenAI Functions
def generate_topic_prompts(self, response_text):
"""
Takes a response text and generates the topic prompts.
:param response_text: The response text to process.
:return: A list of topic prompts.
"""
topics = self.extract_topics(response_text)
base_prompt = self.load_prompt("topic_prompt")
topic_prompts = []
for topic in topics:
modified_prompt = base_prompt.replace("[REPLACE_ME]", topic)
topic_prompts.append(modified_prompt)
return topic_prompts
def load_prompt(self,prompt_key):
"""
Takes a prompt key and loads the prompt from the prompts folder.
:param prompt_key: The key of the prompt to load.
:return: The prompt content.
"""
prompt_path = self.prompts[prompt_key]
print(f"Loading prompt from: {prompt_path}") # Debugging line
with open(self.prompts[prompt_key], 'r', encoding='utf-8') as file:
return file.read()
def send_conversation(self, conversation_history):
"""
Takes a conversation history and sends it to the OpenAI API to generate a response.
:param conversation_history: The conversation history to send.
:return: The response from the LLM
"""
response = openai.chat.completions.create(
model="gpt-4-1106-preview",
#model="gpt-3.5-turbo-1106",
messages=conversation_history,
max_tokens=4096,
temperature=0.00,
)
return response
def build_conversation_history(self, system_prompt, user_prompt1, assistant_response=None, user_prompt2=None):
"""
Takes a system prompt, user prompt, and optional assistant response and user prompt and builds a conversation history.
:param system_prompt: The system prompt to use.
:param user_prompt1: The first user prompt to use.
:param assistant_response: The assistant response to use.
:param user_prompt2: The second user prompt to use.
:return: The conversation history.
"""
conversation_history = [
{"role": "system", "content": system_prompt},
{"role": "user", "content": user_prompt1}
]
# Check if both or none of the optional parameters are provided
if (assistant_response is not None and user_prompt2 is not None) or (assistant_response is None and user_prompt2 is None):
# Append the optional prompts if both are provided
if assistant_response is not None:
conversation_history.append({"role": "assistant", "content": assistant_response})
conversation_history.append({"role": "user", "content": user_prompt2})
else:
raise ValueError("Both 'assistant_response' and 'user_prompt2' must be provided together or not at all.")
return conversation_history
def generate_article(self, input_file):
"""
Takes an input file path and generates a article from it.
:param input_file_path: The path to the input file to process.
:return: The article.
"""
article_prompt = self.load_prompt("article_prompt")
with open(input_file, "r", encoding="utf-8") as file:
file_content = file.read()
article_convo = self.build_conversation_history(article_prompt, file_content)
response = self.send_conversation(article_convo)
content = response.choices[0].message.content
clean_lines = [line.strip() for line in content.split('\n') if line.strip() != '']
clean_content = '\n\n'.join(clean_lines)
return clean_content
def process_articles(self, input_path):
"""
Takes a path to a folder containing input files and generates articles from them.
:param input_path: The path to the folder containing input files to process.
"""
logging.info(f"Processing article inputs in folder: {input_path}")
for filename in tqdm(os.listdir(input_path), desc="Processing Files"):
if filename.endswith("_summary.txt") or filename.endswith("_troubleshooting_steps.txt"):
logging.info(f"Processing article input: {filename}")
input_file = os.path.join(input_path, filename)
article = self.generate_article(input_file)
output_file = os.path.join(input_path, f"{os.path.splitext(filename)[0]}_article.txt")
with open(output_file, "w", encoding="utf-8") as f:
f.write(article)
logging.info(f"Article saved to: {output_file}")
# Everything below is added to adjust existing script to run self service.
# Until load_dotenv()
def get_drive_service(self):
SCOPES = ['https://www.googleapis.com/auth/drive']
credentials_json = os.getenv('GOOGLE_APPLICATION_CREDENTIALS_JSON')
if credentials_json is None:
raise ValueError("Environment variable 'GOOGLE_APPLICATION_CREDENTIALS_JSON' is not set")
credentials_info = json.loads(credentials_json)
credentials = service_account.Credentials.from_service_account_info(credentials_info, scopes=SCOPES)
return build('drive', 'v3', credentials=credentials)
def extract_drive_folder_id(self, drive_link):
# This can be expanded to handle various Google Drive link formats
match = re.search(r'folders/([^/?]+)', drive_link)
if match:
return match.group(1)
else:
raise ValueError("Invalid Google Drive folder link.")
def list_files_in_folder(self, service, folder_id):
try:
results = service.files().list(
q=f"'{folder_id}' in parents and trashed=false",
pageSize=100,
fields="nextPageToken, files(id, name)").execute()
return results.get('files', [])
except Exception as e:
logging.error(f"Failed to list files in Google Drive folder {folder_id}: {e}")
raise ValueError("Failed to list files in Google Drive folder. Check folder permissions")
def download_file(self, service, file_id, file_path):
try:
# Ensure the directory where the file will be saved exists
os.makedirs(os.path.dirname(file_path), exist_ok=True)
# Check if the file is an mp4 file before downloading
if not file_path.endswith('.mp4'):
logging.info(f"Skipping non-mp4 file: {file_path}")
return
request = service.files().get_media(fileId=file_id)
with open(file_path, 'wb') as fh:
downloader = MediaIoBaseDownload(fh, request)
done = False
while done is False:
status, done = downloader.next_chunk()
except Exception as e:
logging.error(f"Failed to download file {file_id} to {file_path}: {e}")
raise ValueError("Failed to download file")
def find_or_create_drive_folder(self, service, folder_name, parent_folder_id):
try:
# Check if folder exists
query = f"mimeType='application/vnd.google-apps.folder' and name='{folder_name}' and '{parent_folder_id}' in parents and trashed=false"
response = service.files().list(q=query, spaces='drive', fields='files(id, name)').execute()
files = response.get('files', [])
if files:
# Folder exists, return its ID
return files[0]['id']
else:
# Folder doesn't exist, create it
folder_metadata = {
'name': folder_name,
'mimeType': 'application/vnd.google-apps.folder',
'parents': [parent_folder_id]
}
folder = service.files().create(body=folder_metadata, fields='id').execute()
return folder.get('id')
except Exception as e:
logging.error(f"Failed to find or create Google Drive folder '{folder_name}': {e}")
raise ValueError("Failed to find or create Google Drive folder. Check permissions")
def upload_file(self, service, file_path, drive_folder_id):
try:
file_metadata = {'name': os.path.basename(file_path), 'parents': [drive_folder_id]}
media = MediaFileUpload(file_path, resumable=True)
file = service.files().create(body=file_metadata, media_body=media, fields='id').execute()
logging.info(f"Uploaded {file_path} to Google Drive with ID {file.get('id')}")
except Exception as e:
logging.error(f"Failed to upload file {file_path} to Google Drive: {e}")
raise ValueError("Failed to upload file to Google Drive. Check folder permissions")
def sync_folder_to_drive(self, service, local_folder_path, drive_parent_folder_id, is_root=True):
"""
Synchronize a local folder structure and its files with Google Drive.
:param service: Authenticated Google Drive service instance.
:param local_folder_path: Path to the local folder to sync.
:param drive_parent_folder_id: The Google Drive folder ID to sync with.
:param is_root: Boolean indicating if the current folder is the root of the sync operation.
"""
# If it's the root directory, upload files directly in it, then handle directories
if is_root:
for item_name in os.listdir(local_folder_path):
item_path = os.path.join(local_folder_path, item_name)
if os.path.isfile(item_path):
# Uploads 'processing.log' and any other files directly under the root
self.upload_file(service, item_path, drive_parent_folder_id)
# Process directories and their contents
for item_name in os.listdir(local_folder_path):
item_path = os.path.join(local_folder_path, item_name)
if os.path.isdir(item_path):
# It's a directory, find or create a corresponding folder on Drive
drive_folder_id = self.find_or_create_drive_folder(service, item_name, drive_parent_folder_id)
# Recursively sync the subfolder
self.sync_folder_to_drive(service, item_path, drive_folder_id, is_root=False)
elif os.path.isfile(item_path) and not is_root:
# For files in subdirectories, upload them to their respective folder on Google Drive
self.upload_file(service, item_path, drive_parent_folder_id)
def cleanup_input_folder(self, folder_path):
"""
Deletes all files and folders under the specified folder_path.
:param folder_path: Path to the folder to clean up.
"""
try:
# Safety check to prevent accidental deletion of unintended directories
if "Input-Folder" in folder_path:
# List all items in the folder
for item_name in os.listdir(folder_path):
item_path = os.path.join(folder_path, item_name)
try:
# Check if it's a file and delete it
if os.path.isfile(item_path) or os.path.islink(item_path):
os.unlink(item_path)
# Else, it's a directory, delete the directory tree
elif os.path.isdir(item_path):
shutil.rmtree(item_path)
logging.info(f"Deleted {item_path}")
except Exception as e:
logging.error(f"Failed to delete {item_path}. Reason: {e}")
else:
logging.error("Safety check failed. The folder path does not seem to be correct.")
except Exception as e:
logging.error(f"Failed to clean up input folder {folder_path}: {e}")
raise ValueError("Failed to clean up input folder")
def check_folder_accessibility(self, service, folder_id):
"""
Checks if the specified Google Drive folder is publicly accessible and has write access.
:param service: The Google Drive service instance.
:param folder_id: The ID of the Google Drive folder to check.
:return: True if the folder is publicly accessible and has write access, False otherwise.
"""
try:
permissions = service.permissions().list(fileId=folder_id).execute()
for permission in permissions.get('permissions', []):
# Check if the permission type is anyone (public) and the role includes writer or owner
if permission.get('type') == 'anyone' and permission.get('role') in ['writer', 'owner']:
return True
return False
except Exception as e:
logging.error(f"Failed to check folder accessibility for folder {folder_id}: {e}")
return False
# Above is newly added codes
# Load environment variables and API key via .env file
load_dotenv()
api_key = os.getenv("OPENAI_API_KEY")
# Example usage
if __name__ == "__main__":
input_folder_path = os.path.abspath(args.input_folder)
transcribe = args.transcribe
extract_topics = args.topic
processor = KnowledgeTranscriber(api_key)
processor.process_folder(input_folder_path, transcribe) |