Spaces:
Running
on
T4
Running
on
T4
import json | |
import os | |
import gradio as gr | |
import numpy as np | |
import torch | |
import torchaudio | |
from seamless_communication.models.inference.translator import Translator | |
DESCRIPTION = "# SeamlessM4T" | |
with open("./mlg_config.json", "r") as f: | |
lang_idx_map = json.loads(f.read()) | |
LANGUAGES = lang_idx_map["multilingual"].keys() | |
TASK_NAMES = [ | |
"S2ST (Speech to Speech translation)", | |
"S2TT (Speech to Text translation)", | |
"T2ST (Text to Speech translation)", | |
"T2TT (Text to Text translation)", | |
"ASR (Automatic Speech Recognition)", | |
] | |
AUDIO_SAMPLE_RATE = 16000.0 | |
MAX_INPUT_AUDIO_LENGTH = 60 # in seconds | |
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu") | |
translator = Translator( | |
model_name_or_card="multitask_unity_large", | |
vocoder_name_or_card="vocoder_36langs", | |
device=device, | |
sample_rate=AUDIO_SAMPLE_RATE, | |
) | |
def predict( | |
task_name: str, | |
audio_source: str, | |
input_audio_mic: str, | |
input_audio_file: str, | |
input_text: str, | |
source_language: str, | |
target_language: str, | |
) -> tuple[tuple[int, np.ndarray] | None, str]: | |
task_name = task_name.split()[0] | |
if task_name in ["S2ST", "S2TT", "ASR"]: | |
if audio_source == "microphone": | |
input_data = input_audio_mic | |
else: | |
input_data = input_audio_file | |
arr, org_sr = torchaudio.load(input_data) | |
new_arr = torchaudio.functional.resample(arr, orig_freq=org_sr, new_freq=AUDIO_SAMPLE_RATE) | |
max_length = int(MAX_INPUT_AUDIO_LENGTH * AUDIO_SAMPLE_RATE) | |
if new_arr.shape[1] > max_length: | |
new_arr = new_arr[:, :max_length] | |
gr.Warning(f"Input audio is too long. Only the first {MAX_INPUT_AUDIO_LENGTH} seconds is used.") | |
torchaudio.save(input_data, new_arr, sample_rate=int(AUDIO_SAMPLE_RATE)) | |
else: | |
input_data = input_text | |
text_out, wav, sr = translator.predict( | |
input=input_data, | |
task_str=task_name, | |
tgt_lang=target_language, | |
src_lang=source_language, | |
) | |
if task_name in ["S2ST", "T2ST"]: | |
return (sr, wav.cpu().detach().numpy()), text_out | |
else: | |
return None, text_out | |
def update_audio_ui(audio_source: str) -> tuple[dict, dict]: | |
mic = audio_source == "microphone" | |
return ( | |
gr.update(visible=mic, value=None), # input_audio_mic | |
gr.update(visible=not mic, value=None), # input_audio_file | |
) | |
def update_input_ui(task_name: str) -> tuple[dict, dict, dict, dict]: | |
task_name = task_name.split()[0] | |
if task_name in ["S2ST", "S2TT"]: | |
return ( | |
gr.update(visible=True), # audio_box | |
gr.update(visible=False), # input_text | |
gr.update(visible=False), # source_language | |
gr.update(visible=True), # target_language | |
) | |
elif task_name in ["T2ST", "T2TT"]: | |
return ( | |
gr.update(visible=False), # audio_box | |
gr.update(visible=True), # input_text | |
gr.update(visible=True), # source_language | |
gr.update(visible=True), # target_language | |
) | |
elif task_name == "ASR": | |
return ( | |
gr.update(visible=True), # audio_box | |
gr.update(visible=False), # input_text | |
gr.update(visible=False), # source_language | |
gr.update(visible=True), # target_language | |
) | |
else: | |
raise ValueError(f"Unknown task: {task_name}") | |
def update_output_ui(task_name: str) -> tuple[dict, dict]: | |
task_name = task_name.split()[0] | |
if task_name in ["S2ST", "T2ST"]: | |
return ( | |
gr.update(visible=True, value=None), # output_audio | |
gr.update(value=None), # output_text | |
) | |
elif task_name in ["S2TT", "T2TT", "ASR"]: | |
return ( | |
gr.update(visible=False, value=None), # output_audio | |
gr.update(value=None), # output_text | |
) | |
else: | |
raise ValueError(f"Unknown task: {task_name}") | |
with gr.Blocks(css="style.css") as demo: | |
gr.Markdown(DESCRIPTION) | |
gr.DuplicateButton( | |
value="Duplicate Space for private use", | |
elem_id="duplicate-button", | |
visible=os.getenv("SHOW_DUPLICATE_BUTTON") == "1", | |
) | |
with gr.Group(): | |
task_name = gr.Dropdown( | |
label="Task", | |
choices=TASK_NAMES, | |
value=TASK_NAMES[0], | |
) | |
with gr.Row(): | |
source_language = gr.Dropdown( | |
label="Source language", | |
choices=LANGUAGES, | |
value="eng", | |
visible=False, | |
) | |
target_language = gr.Dropdown( | |
label="Target language", | |
choices=LANGUAGES, | |
value="fra", | |
) | |
with gr.Row() as audio_box: | |
audio_source = gr.Radio( | |
label="Audio source", | |
choices=["file", "microphone"], | |
value="file", | |
) | |
input_audio_mic = gr.Audio( | |
label="Input speech", | |
type="filepath", | |
source="microphone", | |
visible=False, | |
) | |
input_audio_file = gr.Audio( | |
label="Input speech", | |
type="filepath", | |
source="upload", | |
visible=True, | |
) | |
input_text = gr.Textbox(label="Input text", visible=False) | |
btn = gr.Button("Translate") | |
with gr.Column(): | |
output_audio = gr.Audio( | |
label="Translated speech", | |
autoplay=False, | |
streaming=False, | |
type="numpy", | |
) | |
output_text = gr.Textbox(label="Translated text") | |
audio_source.change( | |
fn=update_audio_ui, | |
inputs=audio_source, | |
outputs=[ | |
input_audio_mic, | |
input_audio_file, | |
], | |
queue=False, | |
api_name=False, | |
) | |
task_name.change( | |
fn=update_input_ui, | |
inputs=task_name, | |
outputs=[ | |
audio_box, | |
input_text, | |
source_language, | |
target_language, | |
], | |
queue=False, | |
api_name=False, | |
).then( | |
fn=update_output_ui, | |
inputs=task_name, | |
outputs=[output_audio, output_text], | |
queue=False, | |
api_name=False, | |
) | |
btn.click( | |
fn=predict, | |
inputs=[ | |
task_name, | |
audio_source, | |
input_audio_mic, | |
input_audio_file, | |
input_text, | |
source_language, | |
target_language, | |
], | |
outputs=[output_audio, output_text], | |
api_name="run", | |
) | |
demo.queue(max_size=50).launch() | |