demo-dubbing / pipeline.py
Lagyamfi's picture
first draft of frontend
c59a3c0
raw
history blame
7.67 kB
# %%
# %load_ext autoreload
# %autoreload 2
from transformers import pipeline
import re
from num2words import num2words
import aiohttp
from aiohttp import ClientSession
from aiohttp_retry import RetryClient, ExponentialRetry
from tqdm import tqdm
import asyncio
import os
from dotenv import load_dotenv
import requests
import ffmpeg
# load khaya token from environment
load_dotenv()
# load khaya token
KHAYA_TOKEN = os.getenv("KHAYA_TOKEN")
translation_url = "https://translation-api.ghananlp.org/v1/translate"
translation_hdr = {
# Request headers
"Content-Type": "application/json",
"Cache-Control": "no-cache",
"Ocp-Apim-Subscription-Key": KHAYA_TOKEN,
}
LANG = "tw"
def replace_numbers_with_words(text):
def replace(match):
return num2words(match.group().replace(",", ""), lang="en")
return re.sub(r"[\d]+[.,\d]+", replace, text)
async def fetch(session, url, headers, data, semaphore, index):
async with semaphore:
try:
async with session.post(
url, headers=headers, json=data, timeout=10
) as response:
response.raise_for_status()
return index, await response.json()
except aiohttp.ClientError as e:
print(f"Request error: {e}")
return index, str(e)
except Exception as e:
print(f"Unexpected error: {e}")
return index, str(e)
async def translation_main(sentences, url, headers, lang):
khaya_translations = [None] * len(sentences)
semaphore = asyncio.Semaphore(2) # limit the number of concurrent requests
retry_options = ExponentialRetry(
attempts=3,
)
async with RetryClient(ClientSession(), retry_options=retry_options) as session:
tasks = []
for index, sent in enumerate(sentences):
data = {"in": sent, "lang": f"en-{lang}"}
tasks.append(fetch(session, url, headers, data, semaphore, index))
for f in tqdm(
asyncio.as_completed(tasks), total=len(tasks), desc="Translating Sentences"
):
index, result = await f
khaya_translations[index] = result
return khaya_translations
async def convert_text_to_speech(session, text, speaker, output_file):
speaker_dict = {"male": "twi_speaker_5", "female": "twi_speaker_7"}
speaker_id = speaker_dict[speaker]
try:
tts_url = "https://tts-backend-nlpghana-staging.azurewebsites.net/v0/tts" # Replace with your TTS API URL
data = {"text": text, "language": LANG, "speaker_id": speaker_id}
hdr = {
# Request headers
"Content-Type": "application/json",
"Cache-Control": "no-cache",
"Ocp-Apim-Subscription-Key": f"{KHAYA_TOKEN}",
}
async with session.post(tts_url, headers=hdr, json=data) as response:
response.raise_for_status()
with open(output_file, "wb") as file:
while True:
chunk = await response.content.read(1024)
if not chunk:
break
file.write(chunk)
except aiohttp.ClientError as e:
print(f"Request error: {e}")
except Exception as e:
print(f"Unexpected error: {e}")
async def tts_main(khaya_translations, speaker, list_of_output_chunks):
async with aiohttp.ClientSession() as session:
tasks = []
for i, sent in enumerate(khaya_translations):
output_file = list_of_output_chunks[i]
tasks.append(convert_text_to_speech(session, sent, speaker, output_file))
for f in tqdm(
asyncio.as_completed(tasks), total=len(tasks), desc="Converting to Speech"
):
await f
# %%
# filename = "CoolVision-Uzbekistan.mov"
output_path = "/Users/lawrenceadu-gyamfi/Documents/PERSONAL/GHANANLP/PROJECTS/SAINT/Examples/test_pipeline"
input_video = "test_input_video.mov"
input_audio = "input_audio.aac"
output_audio = "output_audio.wav"
output_video = "test_output_video.mp4"
filename_with_path = f"{output_path}/{input_video}"
# %%
# only need to run this once
# !ffmpeg -i {output_path}/{input_video} -vn -acodec copy {output_path}/{input_audio} -y
def extract_audio_from_video(input_video):
if input_video:
output_audio_path = f"separated_audio.aac"
try:
(
ffmpeg.input(f"{input_video}")
.output(f"{output_audio_path}", acodec="copy", vn=None)
.run(overwrite_output=True)
)
print("Audio extracted successfully")
return output_audio_path
except ffmpeg.Error as e:
print(e.stderr.decode())
raise e
# %%
# ASR pipeline
def transcribe_and_preprocess_audio(input_audio):
asr = pipeline(
"automatic-speech-recognition", model="openai/whisper-large-v3", device=0
)
pipeline_whisper_output = asr(
f"{input_audio}",
return_timestamps=True,
)
# preprocess the output before machine translation
sentences = pipeline_whisper_output["text"].split(". ")
sentences = [el.strip() for el in sentences if el]
# replace numbers with words
sentences = [replace_numbers_with_words(sent) for sent in sentences]
return sentences
# %%
# combine the audio files
def combine_audio_streams(list_of_output_chunks, output_audio):
input_streams = [ffmpeg.input(chunk) for chunk in list_of_output_chunks]
concatenated = ffmpeg.concat(*input_streams, v=0, a=1).output(f"{output_audio}")
try:
concatenated.run(overwrite_output=True)
return output_audio
except ffmpeg.Error as e:
print(e.stderr.decode())
# %%
# combine the audio and video
def create_combined_output(input_video, output_audio, output_video):
try:
video = ffmpeg.input(f"{input_video}")
audio = ffmpeg.input(f"{output_audio}") # .filter_('atempo', 1.09580838323)
(
ffmpeg.output(
video["v"],
audio["a"],
filename=f"{output_video}",
vcodec="copy",
).run(overwrite_output=True)
)
print("Video and audio combined successfully")
return output_video
except ffmpeg.Error as e:
print(e.stderr.decode())
# %%
async def process_video_translation(input_video, output_video):
print("Processing video translation")
print("Extracting audio from video")
output_audio_path = extract_audio_from_video(input_video)
# transcribe audio
print("Transcribing audio")
sentences = transcribe_and_preprocess_audio(output_audio_path)
# translate to twi
print("Translating to Twi")
khaya_translations = await translation_main(
sentences, translation_url, translation_hdr, LANG
)
# create output files
print("Creating output files")
list_of_output_chunks = [
f"translated_{i}.wav" for i in range(len(khaya_translations))
]
# convert to speech
print("Converting to speech")
await tts_main(khaya_translations, list_of_output_chunks)
# combine audio streams
print("Combining audio streams")
output_audio = combine_audio_streams(list_of_output_chunks, "combined_audio.wav")
print("Combining audio and video")
create_combined_output(input_video, output_audio, output_video)
print("Video translation completed")
return output_video
# %%
# test_input_video = "../Examples/test_pipeline/test_input_video.mov"
# test_output_video = "test_output_video.mp4"
# await process_video_translation(test_input_video, test_output_video)
# %%