seamless_m4t / app.py
Vaibhav Srivastav
Initial commit
8fb8950
raw
history blame
6.71 kB
import json
import os
import gradio as gr
import numpy as np
import torch
import torchaudio
from seamless_communication.models.inference.translator import Translator
DESCRIPTION = "# SeamlessM4T"
with open("./mlg_config.json", "r") as f:
lang_idx_map = json.loads(f.read())
LANGUAGES = lang_idx_map["multilingual"].keys()
TASK_NAMES = [
"S2ST (Speech to Speech translation)",
"S2TT (Speech to Text translation)",
"T2ST (Text to Speech translation)",
"T2TT (Text to Text translation)",
"ASR (Automatic Speech Recognition)",
]
AUDIO_SAMPLE_RATE = 16000.0
MAX_INPUT_AUDIO_LENGTH = 60 # in seconds
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
translator = Translator(
model_name_or_card="multitask_unity_large",
vocoder_name_or_card="vocoder_36langs",
device=device,
sample_rate=AUDIO_SAMPLE_RATE,
)
def predict(
task_name: str,
audio_source: str,
input_audio_mic: str,
input_audio_file: str,
input_text: str,
source_language: str,
target_language: str,
) -> tuple[tuple[int, np.ndarray] | None, str]:
task_name = task_name.split()[0]
if task_name in ["S2ST", "S2TT", "ASR"]:
if audio_source == "microphone":
input_data = input_audio_mic
else:
input_data = input_audio_file
arr, org_sr = torchaudio.load(input_data)
new_arr = torchaudio.functional.resample(arr, orig_freq=org_sr, new_freq=AUDIO_SAMPLE_RATE)
max_length = int(MAX_INPUT_AUDIO_LENGTH * AUDIO_SAMPLE_RATE)
if new_arr.shape[1] > max_length:
new_arr = new_arr[:, :max_length]
gr.Warning(f"Input audio is too long. Only the first {MAX_INPUT_AUDIO_LENGTH} seconds is used.")
torchaudio.save(input_data, new_arr, sample_rate=int(AUDIO_SAMPLE_RATE))
else:
input_data = input_text
text_out, wav, sr = translator.predict(
input=input_data,
task_str=task_name,
tgt_lang=target_language,
src_lang=source_language,
)
if task_name in ["S2ST", "T2ST"]:
return (sr, wav.cpu().detach().numpy()), text_out
else:
return None, text_out
def update_audio_ui(audio_source: str) -> tuple[dict, dict]:
mic = audio_source == "microphone"
return (
gr.update(visible=mic, value=None), # input_audio_mic
gr.update(visible=not mic, value=None), # input_audio_file
)
def update_input_ui(task_name: str) -> tuple[dict, dict, dict, dict]:
task_name = task_name.split()[0]
if task_name in ["S2ST", "S2TT"]:
return (
gr.update(visible=True), # audio_box
gr.update(visible=False), # input_text
gr.update(visible=False), # source_language
gr.update(visible=True), # target_language
)
elif task_name in ["T2ST", "T2TT"]:
return (
gr.update(visible=False), # audio_box
gr.update(visible=True), # input_text
gr.update(visible=True), # source_language
gr.update(visible=True), # target_language
)
elif task_name == "ASR":
return (
gr.update(visible=True), # audio_box
gr.update(visible=False), # input_text
gr.update(visible=False), # source_language
gr.update(visible=True), # target_language
)
else:
raise ValueError(f"Unknown task: {task_name}")
def update_output_ui(task_name: str) -> tuple[dict, dict]:
task_name = task_name.split()[0]
if task_name in ["S2ST", "T2ST"]:
return (
gr.update(visible=True, value=None), # output_audio
gr.update(value=None), # output_text
)
elif task_name in ["S2TT", "T2TT", "ASR"]:
return (
gr.update(visible=False, value=None), # output_audio
gr.update(value=None), # output_text
)
else:
raise ValueError(f"Unknown task: {task_name}")
with gr.Blocks(css="style.css") as demo:
gr.Markdown(DESCRIPTION)
gr.DuplicateButton(
value="Duplicate Space for private use",
elem_id="duplicate-button",
visible=os.getenv("SHOW_DUPLICATE_BUTTON") == "1",
)
with gr.Group():
task_name = gr.Dropdown(
label="Task",
choices=TASK_NAMES,
value=TASK_NAMES[0],
)
with gr.Row():
source_language = gr.Dropdown(
label="Source language",
choices=LANGUAGES,
value="eng",
visible=False,
)
target_language = gr.Dropdown(
label="Target language",
choices=LANGUAGES,
value="fra",
)
with gr.Row() as audio_box:
audio_source = gr.Radio(
label="Audio source",
choices=["file", "microphone"],
value="file",
)
input_audio_mic = gr.Audio(
label="Input speech",
type="filepath",
source="microphone",
visible=False,
)
input_audio_file = gr.Audio(
label="Input speech",
type="filepath",
source="upload",
visible=True,
)
input_text = gr.Textbox(label="Input text", visible=False)
btn = gr.Button("Translate")
with gr.Column():
output_audio = gr.Audio(
label="Translated speech",
autoplay=False,
streaming=False,
type="numpy",
)
output_text = gr.Textbox(label="Translated text")
audio_source.change(
fn=update_audio_ui,
inputs=audio_source,
outputs=[
input_audio_mic,
input_audio_file,
],
queue=False,
api_name=False,
)
task_name.change(
fn=update_input_ui,
inputs=task_name,
outputs=[
audio_box,
input_text,
source_language,
target_language,
],
queue=False,
api_name=False,
).then(
fn=update_output_ui,
inputs=task_name,
outputs=[output_audio, output_text],
queue=False,
api_name=False,
)
btn.click(
fn=predict,
inputs=[
task_name,
audio_source,
input_audio_mic,
input_audio_file,
input_text,
source_language,
target_language,
],
outputs=[output_audio, output_text],
api_name="run",
)
demo.queue(max_size=50).launch()