# %% # %load_ext autoreload # %autoreload 2 from transformers import pipeline import re from num2words import num2words import aiohttp from aiohttp import ClientSession from aiohttp_retry import RetryClient, ExponentialRetry from tqdm import tqdm import asyncio import os from dotenv import load_dotenv import requests import ffmpeg # load khaya token from environment load_dotenv() # load khaya token KHAYA_TOKEN = os.getenv("KHAYA_TOKEN") translation_url = "https://translation-api.ghananlp.org/v1/translate" translation_hdr = { # Request headers "Content-Type": "application/json", "Cache-Control": "no-cache", "Ocp-Apim-Subscription-Key": KHAYA_TOKEN, } LANG = "tw" def replace_numbers_with_words(text): def replace(match): return num2words(match.group().replace(",", ""), lang="en") return re.sub(r"[\d]+[.,\d]+", replace, text) async def fetch(session, url, headers, data, semaphore, index): async with semaphore: try: async with session.post( url, headers=headers, json=data, timeout=10 ) as response: response.raise_for_status() return index, await response.json() except aiohttp.ClientError as e: print(f"Request error: {e}") return index, str(e) except Exception as e: print(f"Unexpected error: {e}") return index, str(e) async def translation_main(sentences, url, headers, lang): khaya_translations = [None] * len(sentences) semaphore = asyncio.Semaphore(2) # limit the number of concurrent requests retry_options = ExponentialRetry( attempts=3, ) async with RetryClient(ClientSession(), retry_options=retry_options) as session: tasks = [] for index, sent in enumerate(sentences): data = {"in": sent, "lang": f"en-{lang}"} tasks.append(fetch(session, url, headers, data, semaphore, index)) for f in tqdm( asyncio.as_completed(tasks), total=len(tasks), desc="Translating Sentences" ): index, result = await f khaya_translations[index] = result return khaya_translations async def convert_text_to_speech(session, text, speaker, output_file): speaker_dict = {"male": "twi_speaker_5", "female": "twi_speaker_7"} speaker_id = speaker_dict[speaker] try: tts_url = "https://tts-backend-nlpghana-staging.azurewebsites.net/v0/tts" # Replace with your TTS API URL data = {"text": text, "language": LANG, "speaker_id": speaker_id} hdr = { # Request headers "Content-Type": "application/json", "Cache-Control": "no-cache", "Ocp-Apim-Subscription-Key": f"{KHAYA_TOKEN}", } async with session.post(tts_url, headers=hdr, json=data) as response: response.raise_for_status() with open(output_file, "wb") as file: while True: chunk = await response.content.read(1024) if not chunk: break file.write(chunk) except aiohttp.ClientError as e: print(f"Request error: {e}") except Exception as e: print(f"Unexpected error: {e}") async def tts_main(khaya_translations, speaker, list_of_output_chunks): async with aiohttp.ClientSession() as session: tasks = [] for i, sent in enumerate(khaya_translations): output_file = list_of_output_chunks[i] tasks.append(convert_text_to_speech(session, sent, speaker, output_file)) for f in tqdm( asyncio.as_completed(tasks), total=len(tasks), desc="Converting to Speech" ): await f # %% # filename = "CoolVision-Uzbekistan.mov" output_path = "/Users/lawrenceadu-gyamfi/Documents/PERSONAL/GHANANLP/PROJECTS/SAINT/Examples/test_pipeline" input_video = "test_input_video.mov" input_audio = "input_audio.aac" output_audio = "output_audio.wav" output_video = "test_output_video.mp4" filename_with_path = f"{output_path}/{input_video}" # %% # only need to run this once # !ffmpeg -i {output_path}/{input_video} -vn -acodec copy {output_path}/{input_audio} -y def extract_audio_from_video(input_video): if input_video: output_audio_path = f"separated_audio.aac" try: ( ffmpeg.input(f"{input_video}") .output(f"{output_audio_path}", acodec="copy", vn=None) .run(overwrite_output=True) ) print("Audio extracted successfully") return output_audio_path except ffmpeg.Error as e: print(e.stderr.decode()) raise e # %% # ASR pipeline def transcribe_and_preprocess_audio(input_audio): asr = pipeline( "automatic-speech-recognition", model="openai/whisper-large-v3", device=0 ) pipeline_whisper_output = asr( f"{input_audio}", return_timestamps=True, ) # preprocess the output before machine translation sentences = pipeline_whisper_output["text"].split(". ") sentences = [el.strip() for el in sentences if el] # replace numbers with words sentences = [replace_numbers_with_words(sent) for sent in sentences] return sentences # %% # combine the audio files def combine_audio_streams(list_of_output_chunks, output_audio): input_streams = [ffmpeg.input(chunk) for chunk in list_of_output_chunks] concatenated = ffmpeg.concat(*input_streams, v=0, a=1).output(f"{output_audio}") try: concatenated.run(overwrite_output=True) return output_audio except ffmpeg.Error as e: print(e.stderr.decode()) # %% # combine the audio and video def create_combined_output(input_video, output_audio, output_video): try: video = ffmpeg.input(f"{input_video}") audio = ffmpeg.input(f"{output_audio}") # .filter_('atempo', 1.09580838323) ( ffmpeg.output( video["v"], audio["a"], filename=f"{output_video}", vcodec="copy", ).run(overwrite_output=True) ) print("Video and audio combined successfully") return output_video except ffmpeg.Error as e: print(e.stderr.decode()) # %% async def process_video_translation(input_video, output_video): print("Processing video translation") print("Extracting audio from video") output_audio_path = extract_audio_from_video(input_video) # transcribe audio print("Transcribing audio") sentences = transcribe_and_preprocess_audio(output_audio_path) # translate to twi print("Translating to Twi") khaya_translations = await translation_main( sentences, translation_url, translation_hdr, LANG ) # create output files print("Creating output files") list_of_output_chunks = [ f"translated_{i}.wav" for i in range(len(khaya_translations)) ] # convert to speech print("Converting to speech") await tts_main(khaya_translations, list_of_output_chunks) # combine audio streams print("Combining audio streams") output_audio = combine_audio_streams(list_of_output_chunks, "combined_audio.wav") print("Combining audio and video") create_combined_output(input_video, output_audio, output_video) print("Video translation completed") return output_video # %% # test_input_video = "../Examples/test_pipeline/test_input_video.mov" # test_output_video = "test_output_video.mp4" # await process_video_translation(test_input_video, test_output_video) # %%