Spaces:
Paused
Paused
# %% | |
# %load_ext autoreload | |
# %autoreload 2 | |
from transformers import pipeline | |
import re | |
from num2words import num2words | |
import aiohttp | |
from aiohttp import ClientSession | |
from aiohttp_retry import RetryClient, ExponentialRetry | |
from tqdm import tqdm | |
import asyncio | |
import os | |
from dotenv import load_dotenv | |
import requests | |
import ffmpeg | |
# load khaya token from environment | |
load_dotenv() | |
# load khaya token | |
KHAYA_TOKEN = os.getenv("KHAYA_TOKEN") | |
translation_url = "https://translation-api.ghananlp.org/v1/translate" | |
translation_hdr = { | |
# Request headers | |
"Content-Type": "application/json", | |
"Cache-Control": "no-cache", | |
"Ocp-Apim-Subscription-Key": KHAYA_TOKEN, | |
} | |
LANG = "tw" | |
def replace_numbers_with_words(text): | |
def replace(match): | |
return num2words(match.group().replace(",", ""), lang="en") | |
return re.sub(r"[\d]+[.,\d]+", replace, text) | |
async def fetch(session, url, headers, data, semaphore, index): | |
async with semaphore: | |
try: | |
async with session.post( | |
url, headers=headers, json=data, timeout=10 | |
) as response: | |
response.raise_for_status() | |
return index, await response.json() | |
except aiohttp.ClientError as e: | |
print(f"Request error: {e}") | |
return index, str(e) | |
except Exception as e: | |
print(f"Unexpected error: {e}") | |
return index, str(e) | |
async def translation_main(sentences, url, headers, lang): | |
khaya_translations = [None] * len(sentences) | |
semaphore = asyncio.Semaphore(2) # limit the number of concurrent requests | |
retry_options = ExponentialRetry( | |
attempts=3, | |
) | |
async with RetryClient(ClientSession(), retry_options=retry_options) as session: | |
tasks = [] | |
for index, sent in enumerate(sentences): | |
data = {"in": sent, "lang": f"en-{lang}"} | |
tasks.append(fetch(session, url, headers, data, semaphore, index)) | |
for f in tqdm( | |
asyncio.as_completed(tasks), total=len(tasks), desc="Translating Sentences" | |
): | |
index, result = await f | |
khaya_translations[index] = result | |
return khaya_translations | |
async def convert_text_to_speech(session, text, speaker, output_file): | |
speaker_dict = {"male": "twi_speaker_5", "female": "twi_speaker_7"} | |
speaker_id = speaker_dict[speaker] | |
try: | |
tts_url = "https://tts-backend-nlpghana-staging.azurewebsites.net/v0/tts" # Replace with your TTS API URL | |
data = {"text": text, "language": LANG, "speaker_id": speaker_id} | |
hdr = { | |
# Request headers | |
"Content-Type": "application/json", | |
"Cache-Control": "no-cache", | |
"Ocp-Apim-Subscription-Key": f"{KHAYA_TOKEN}", | |
} | |
async with session.post(tts_url, headers=hdr, json=data) as response: | |
response.raise_for_status() | |
with open(output_file, "wb") as file: | |
while True: | |
chunk = await response.content.read(1024) | |
if not chunk: | |
break | |
file.write(chunk) | |
except aiohttp.ClientError as e: | |
print(f"Request error: {e}") | |
except Exception as e: | |
print(f"Unexpected error: {e}") | |
async def tts_main(khaya_translations, speaker, list_of_output_chunks): | |
async with aiohttp.ClientSession() as session: | |
tasks = [] | |
for i, sent in enumerate(khaya_translations): | |
output_file = list_of_output_chunks[i] | |
tasks.append(convert_text_to_speech(session, sent, speaker, output_file)) | |
for f in tqdm( | |
asyncio.as_completed(tasks), total=len(tasks), desc="Converting to Speech" | |
): | |
await f | |
# %% | |
# filename = "CoolVision-Uzbekistan.mov" | |
output_path = "/Users/lawrenceadu-gyamfi/Documents/PERSONAL/GHANANLP/PROJECTS/SAINT/Examples/test_pipeline" | |
input_video = "test_input_video.mov" | |
input_audio = "input_audio.aac" | |
output_audio = "output_audio.wav" | |
output_video = "test_output_video.mp4" | |
filename_with_path = f"{output_path}/{input_video}" | |
# %% | |
# only need to run this once | |
# !ffmpeg -i {output_path}/{input_video} -vn -acodec copy {output_path}/{input_audio} -y | |
def extract_audio_from_video(input_video): | |
if input_video: | |
output_audio_path = f"separated_audio.aac" | |
try: | |
( | |
ffmpeg.input(f"{input_video}") | |
.output(f"{output_audio_path}", acodec="copy", vn=None) | |
.run(overwrite_output=True) | |
) | |
print("Audio extracted successfully") | |
return output_audio_path | |
except ffmpeg.Error as e: | |
print(e.stderr.decode()) | |
raise e | |
# %% | |
# ASR pipeline | |
def transcribe_and_preprocess_audio(input_audio): | |
asr = pipeline( | |
"automatic-speech-recognition", model="openai/whisper-large-v3", device=0 | |
) | |
pipeline_whisper_output = asr( | |
f"{input_audio}", | |
return_timestamps=True, | |
) | |
# preprocess the output before machine translation | |
sentences = pipeline_whisper_output["text"].split(". ") | |
sentences = [el.strip() for el in sentences if el] | |
# replace numbers with words | |
sentences = [replace_numbers_with_words(sent) for sent in sentences] | |
return sentences | |
# %% | |
# combine the audio files | |
def combine_audio_streams(list_of_output_chunks, output_audio): | |
input_streams = [ffmpeg.input(chunk) for chunk in list_of_output_chunks] | |
concatenated = ffmpeg.concat(*input_streams, v=0, a=1).output(f"{output_audio}") | |
try: | |
concatenated.run(overwrite_output=True) | |
return output_audio | |
except ffmpeg.Error as e: | |
print(e.stderr.decode()) | |
# %% | |
# combine the audio and video | |
def create_combined_output(input_video, output_audio, output_video): | |
try: | |
video = ffmpeg.input(f"{input_video}") | |
audio = ffmpeg.input(f"{output_audio}") # .filter_('atempo', 1.09580838323) | |
( | |
ffmpeg.output( | |
video["v"], | |
audio["a"], | |
filename=f"{output_video}", | |
vcodec="copy", | |
).run(overwrite_output=True) | |
) | |
print("Video and audio combined successfully") | |
return output_video | |
except ffmpeg.Error as e: | |
print(e.stderr.decode()) | |
# %% | |
async def process_video_translation(input_video, output_video): | |
print("Processing video translation") | |
print("Extracting audio from video") | |
output_audio_path = extract_audio_from_video(input_video) | |
# transcribe audio | |
print("Transcribing audio") | |
sentences = transcribe_and_preprocess_audio(output_audio_path) | |
# translate to twi | |
print("Translating to Twi") | |
khaya_translations = await translation_main( | |
sentences, translation_url, translation_hdr, LANG | |
) | |
# create output files | |
print("Creating output files") | |
list_of_output_chunks = [ | |
f"translated_{i}.wav" for i in range(len(khaya_translations)) | |
] | |
# convert to speech | |
print("Converting to speech") | |
await tts_main(khaya_translations, list_of_output_chunks) | |
# combine audio streams | |
print("Combining audio streams") | |
output_audio = combine_audio_streams(list_of_output_chunks, "combined_audio.wav") | |
print("Combining audio and video") | |
create_combined_output(input_video, output_audio, output_video) | |
print("Video translation completed") | |
return output_video | |
# %% | |
# test_input_video = "../Examples/test_pipeline/test_input_video.mov" | |
# test_output_video = "test_output_video.mp4" | |
# await process_video_translation(test_input_video, test_output_video) | |
# %% | |