from transformers import pipeline import re from num2words import num2words import aiohttp from aiohttp import ClientSession from aiohttp_retry import RetryClient, ExponentialRetry from tqdm import tqdm import asyncio import os from dotenv import load_dotenv import requests import ffmpeg import torch import aiofiles import tempfile import subprocess # load khaya token from environment load_dotenv() # load khaya token KHAYA_TOKEN = os.getenv("KHAYA_TOKEN") translation_url = "https://translation-api.ghananlp.org/v1/translate" tts_url = "https://tts-backend-nlpghana-staging.azurewebsites.net/v0/tts" translation_hdr = { # Request headers "Content-Type": "application/json", "Cache-Control": "no-cache", "Ocp-Apim-Subscription-Key": KHAYA_TOKEN, } tts_header = { # Request headers "Content-Type": "application/json", "Cache-Control": "no-cache", "Ocp-Apim-Subscription-Key": f"{KHAYA_TOKEN}", } LANG_DICT = {"Twi": "tw", "Ewe": "ee"} # Check if GPU is available pipe_device = 0 if torch.cuda.is_available() else -1 def replace_numbers_with_words(text): def replace(match): return num2words(match.group().replace(",", ""), lang="en") return re.sub(r"[\d]+[.,\d]+", replace, text) async def fetch(session, url, headers, data, semaphore, index): async with semaphore: try: async with session.post( url, headers=headers, json=data, timeout=10 ) as response: response.raise_for_status() return index, await response.json() except aiohttp.ClientError as e: print(f"Request error: {e}") return index, str(e) except Exception as e: print(f"Unexpected error: {e}") return index, str(e) async def translation_main(sentences, url, headers, lang): khaya_translations = [None] * len(sentences) semaphore = asyncio.Semaphore(2) # limit the number of concurrent requests retry_options = ExponentialRetry( attempts=3, ) async with RetryClient(ClientSession(), retry_options=retry_options) as session: tasks = [] for index, sent in enumerate(sentences): data = {"in": sent, "lang": f"en-{lang}"} tasks.append(fetch(session, url, headers, data, semaphore, index)) for f in tqdm( asyncio.as_completed(tasks), total=len(tasks), desc="Translating Sentences" ): index, result = await f # TODO: handle error response khaya_translations[index] = result return khaya_translations async def convert_text_to_speech( session, tts_url, tts_header, text, text_index, language, speaker, semaphore, output_dir, ): speaker_dict = { "tw": {"male": "twi_speaker_5", "female": "twi_speaker_7"}, "ee": {"male": "ewe_speaker_3", "female": None}, } speaker_id = speaker_dict[language][speaker] data = {"text": text, "language": language, "speaker_id": speaker_id} try: async with semaphore: async with session.post(tts_url, headers=tts_header, json=data) as response: response.raise_for_status() output_path = os.path.join(output_dir, f"{text_index}_tts.wav") async with aiofiles.open(output_path, "wb") as file: while True: chunk = await response.content.read(16384) if not chunk: break await file.write(chunk) return output_path except aiohttp.ClientError as e: print(f"Request error: {e}") except Exception as e: print(f"Unexpected error: {e}") async def tts_main(khaya_translations, speaker, language): with tempfile.TemporaryDirectory() as temp_dir: async with aiohttp.ClientSession() as session: semaphore = asyncio.Semaphore(3) tasks = [ convert_text_to_speech( session, tts_url, tts_header, sent, text_index, language, speaker, semaphore, temp_dir, ) for text_index, sent in enumerate(khaya_translations) ] output_files = [] for task in tqdm( asyncio.as_completed(tasks), total=len(tasks), desc="Converting to Speech", ): result = await task if result: output_files.append(result) output_audio = combine_audio_streams(output_files, "combined_audio.wav") return output_audio def extract_audio_from_video(input_video): if input_video: output_audio_path = f"separated_audio.aac" try: ( ffmpeg.input(f"{input_video}") .output(f"{output_audio_path}", acodec="copy", vn=None) .run(overwrite_output=True) ) print("Audio extracted successfully") return output_audio_path except ffmpeg.Error as e: print(e.stderr.decode()) raise e def transcribe_and_preprocess_audio(input_audio): asr = pipeline( "automatic-speech-recognition", model="openai/whisper-large-v3", device=pipe_device, ) pipeline_whisper_output = asr( f"{input_audio}", return_timestamps=True, ) # preprocess the output before machine translation sentences = pipeline_whisper_output["text"].split(". ") sentences = [el.strip() for el in sentences if el] # replace numbers with words sentences = [replace_numbers_with_words(sent) for sent in sentences] return sentences def combine_audio_streams(list_of_output_chunks, output_audio): list_of_output_chunks = sorted( list_of_output_chunks, key=lambda x: int(os.path.basename(x).split("_")[0]) ) input_streams = [ffmpeg.input(chunk) for chunk in list_of_output_chunks] concatenated = ffmpeg.concat(*input_streams, v=0, a=1).output(f"{output_audio}") try: concatenated.run(overwrite_output=True) return output_audio except ffmpeg.Error as e: print(e.stderr.decode()) def create_combined_output(input_video, output_audio, output_video): try: video = ffmpeg.input(f"{input_video}") audio = ffmpeg.input(f"{output_audio}") ( ffmpeg.output( video["v"], audio["a"], filename=f"{output_video}", vcodec="copy", ).run(overwrite_output=True) ) print("Video and audio combined successfully") return output_video except ffmpeg.Error as e: print(e.stderr.decode()) raise e def create_combined_output_subprocess(input_video, output_audio, output_video): video_duration = get_media_duration(input_video) audio_duration = get_media_duration(output_audio) speed_factor = calculate_speed_factor(video_duration, audio_duration) if speed_factor < 0.5: speed_factor = 0.5 if speed_factor > 100: speed_factor = 100 print(f"Speed factor: {speed_factor}") try: command = [ "ffmpeg", "-i", f"{input_video}", "-i", f"{output_audio}", "-filter:a", f"atempo={speed_factor}", "-c:v", "copy", "-map", "0:v:0", "-map", "1:a:0", f"{output_video}", ] subprocess.run(command, check=True) print("Video and audio combined successfully") return output_video except subprocess.CalledProcessError as e: print(e.stderr.decode()) raise e def get_media_duration(media_file): """ Get the duration of a media file in seconds. """ probe = ffmpeg.probe(media_file) duration = float(probe["format"]["duration"]) return duration def calculate_speed_factor(video_duration, audio_duration): """ Calculate the speed factor to align audio with video. """ return audio_duration / video_duration