|
import os |
|
os.environ['NUMPY_EXPERIMENTAL_ARRAY_FUNCTION'] = '0' |
|
|
|
import gradio as gr |
|
import torch |
|
import torchaudio |
|
from whisperspeech.vq_stoks import RQBottleneckTransformer |
|
from encodec.utils import convert_audio |
|
from transformers import AutoModelForCausalLM, AutoTokenizer, BitsAndBytesConfig, pipeline |
|
from transformers import StoppingCriteria, StoppingCriteriaList, TextIteratorStreamer |
|
from threading import Thread |
|
import logging |
|
from generate_audio import TTSProcessor |
|
import uuid |
|
|
|
device = "cpu" |
|
vq_model = RQBottleneckTransformer.load_model( |
|
"whisper-vq-stoks-medium-en+pl-fixed.model" |
|
).to(device) |
|
|
|
use_8bit = False |
|
llm_path = "QuietImpostor/Llama-3.2s-1B-Instruct-v0.1" |
|
tokenizer = AutoTokenizer.from_pretrained(llm_path) |
|
model_kwargs = {} |
|
if use_8bit: |
|
model_kwargs["quantization_config"] = BitsAndBytesConfig( |
|
load_in_8bit=True, |
|
llm_int8_enable_fp32_cpu_offload=False, |
|
llm_int8_has_fp16_weight=False, |
|
) |
|
else: |
|
model_kwargs["torch_dtype"] = torch.float32 |
|
model = AutoModelForCausalLM.from_pretrained(llm_path, **model_kwargs).to(device) |
|
|
|
def audio_to_sound_tokens_whisperspeech(audio_path): |
|
vq_model.ensure_whisper(device) |
|
wav, sr = torchaudio.load(audio_path) |
|
if sr != 16000: |
|
wav = torchaudio.functional.resample(wav, sr, 16000) |
|
with torch.no_grad(): |
|
codes = vq_model.encode_audio(wav.to(device)) |
|
codes = codes[0].cpu().tolist() |
|
|
|
result = ''.join(f'<|sound_{num:04d}|>' for num in codes) |
|
return f'<|sound_start|>{result}<|sound_end|>' |
|
|
|
def audio_to_sound_tokens_whisperspeech_transcribe(audio_path): |
|
vq_model.ensure_whisper(device) |
|
wav, sr = torchaudio.load(audio_path) |
|
if sr != 16000: |
|
wav = torchaudio.functional.resample(wav, sr, 16000) |
|
with torch.no_grad(): |
|
codes = vq_model.encode_audio(wav.to(device)) |
|
codes = codes[0].cpu().tolist() |
|
|
|
result = ''.join(f'<|sound_{num:04d}|>' for num in codes) |
|
return f'<|reserved_special_token_69|><|sound_start|>{result}<|sound_end|>' |
|
|
|
def text_to_audio_file(text): |
|
id = str(uuid.uuid4()) |
|
temp_file = f"./user_audio/{id}_temp_audio.wav" |
|
text_split = "_".join(text.lower().split(" ")) |
|
if text_split[-1] == ".": |
|
text_split = text_split[:-1] |
|
tts = TTSProcessor(device) |
|
tts.convert_text_to_audio_file(text, temp_file) |
|
print(f"Saved audio to {temp_file}") |
|
return temp_file |
|
|
|
def run_on_cpu(func): |
|
def wrapper(*args, **kwargs): |
|
return func(*args, **kwargs) |
|
return wrapper |
|
|
|
@run_on_cpu |
|
def process_input(audio_file=None): |
|
full_message = "" |
|
for partial_message in process_audio(audio_file): |
|
full_message = partial_message |
|
return full_message |
|
|
|
@run_on_cpu |
|
def process_transcribe_input(audio_file=None): |
|
full_message = "" |
|
for partial_message in process_audio(audio_file, transcript=True): |
|
full_message = partial_message |
|
return full_message |
|
|
|
class StopOnTokens(StoppingCriteria): |
|
def __call__(self, input_ids: torch.LongTensor, scores: torch.FloatTensor, **kwargs) -> bool: |
|
stop_ids = [tokenizer.eos_token_id, 128009] |
|
for stop_id in stop_ids: |
|
if input_ids[0][-1] == stop_id: |
|
return True |
|
return False |
|
|
|
def process_audio(audio_file, transcript=False): |
|
if audio_file is None: |
|
raise ValueError("No audio file provided") |
|
|
|
logging.info(f"Audio file received: {audio_file}") |
|
logging.info(f"Audio file type: {type(audio_file)}") |
|
|
|
sound_tokens = audio_to_sound_tokens_whisperspeech_transcribe(audio_file) if transcript else audio_to_sound_tokens_whisperspeech(audio_file) |
|
logging.info("Sound tokens generated successfully") |
|
|
|
messages = [ |
|
{"role": "user", "content": sound_tokens}, |
|
] |
|
|
|
stop = StopOnTokens() |
|
input_str = tokenizer.apply_chat_template(messages, tokenize=False) |
|
input_ids = tokenizer.encode(input_str, return_tensors="pt") |
|
input_ids = input_ids.to(model.device) |
|
|
|
streamer = TextIteratorStreamer(tokenizer, timeout=10., skip_prompt=True, skip_special_tokens=True) |
|
generation_kwargs = dict( |
|
input_ids=input_ids, |
|
streamer=streamer, |
|
max_new_tokens=1024, |
|
do_sample=False, |
|
stopping_criteria=StoppingCriteriaList([stop]) |
|
) |
|
|
|
thread = Thread(target=model.generate, kwargs=generation_kwargs) |
|
thread.start() |
|
|
|
partial_message = "" |
|
for new_token in streamer: |
|
partial_message += new_token |
|
if tokenizer.eos_token in partial_message: |
|
break |
|
partial_message = partial_message.replace("assistant\n\n", "") |
|
yield partial_message |
|
|
|
good_examples = [] |
|
for file in os.listdir("./examples"): |
|
if file.endswith(".wav"): |
|
good_examples.append([f"./examples/{file}"]) |
|
bad_examples = [] |
|
for file in os.listdir("./bad_examples"): |
|
if file.endswith(".wav"): |
|
bad_examples.append([f"./bad_examples/{file}"]) |
|
examples = [] |
|
examples.extend(good_examples) |
|
examples.extend(bad_examples) |
|
|
|
with gr.Blocks() as iface: |
|
gr.Markdown("# Llama3.2s Mini: checkpoint September 26, 2024") |
|
gr.Markdown("Enter text to convert to audio, then submit the audio to generate text or Upload Audio") |
|
gr.Markdown("Inspired by [Homebrew Ltd](https://homebrew.ltd/) | [Read our blog post](https://homebrew.ltd/blog/llama3-just-got-ears)") |
|
|
|
with gr.Row(): |
|
input_type = gr.Radio(["text", "audio"], label="Input Type", value="audio") |
|
text_input = gr.Textbox(label="Text Input", visible=False) |
|
audio_input = gr.Audio(label="Audio", type="filepath", visible=True) |
|
|
|
convert_button = gr.Button("Make synthetic audio", visible=False) |
|
submit_button = gr.Button("Chat with AI using audio") |
|
transcrip_button = gr.Button("Make Model transcribe the audio") |
|
|
|
text_output = gr.Textbox(label="Generated Text") |
|
|
|
def update_visibility(input_type): |
|
return (gr.update(visible=input_type == "text"), |
|
gr.update(visible=input_type == "text")) |
|
|
|
def convert_and_display(text): |
|
audio_file = text_to_audio_file(text) |
|
return audio_file |
|
|
|
input_type.change( |
|
update_visibility, |
|
inputs=[input_type], |
|
outputs=[text_input, convert_button] |
|
) |
|
|
|
convert_button.click( |
|
convert_and_display, |
|
inputs=[text_input], |
|
outputs=[audio_input] |
|
) |
|
|
|
submit_button.click( |
|
process_input, |
|
inputs=[audio_input], |
|
outputs=[text_output] |
|
) |
|
transcrip_button.click( |
|
process_transcribe_input, |
|
inputs=[audio_input], |
|
outputs=[text_output] |
|
) |
|
|
|
gr.Examples(examples, inputs=[audio_input]) |
|
|
|
iface.queue() |
|
iface.launch() |