local-tts / app.py
gsarti's picture
Fix GPU exec
92cdaa1
import os
import io
import spaces
import tempfile
import asyncio
#import soundfile as sf
import numpy as np
from pydub import AudioSegment
import requests
from markdown import Markdown
from io import StringIO
import gradio as gr
from kokoro_onnx import Kokoro
from markitdown import MarkItDown
md = MarkItDown()
kokoro = Kokoro("kokoro-v0_19.onnx", "voices.json")
voices = {
"en-us": ['af', 'af_bella', 'af_nicole', 'af_sarah', 'af_sky', 'am_adam', 'am_michael'],
"en-gb": ['bf_emma', 'bf_isabella', 'bm_george', 'bm_lewis']
}
def numpy_to_mp3(audio_array, sampling_rate):
# Normalize audio_array if it's floating-point
if np.issubdtype(audio_array.dtype, np.floating):
max_val = np.max(np.abs(audio_array))
audio_array = (audio_array / max_val) * 32767 # Normalize to 16-bit range
audio_array = audio_array.astype(np.int16)
# Create an audio segment from the numpy array
audio_segment = AudioSegment(
audio_array.tobytes(),
frame_rate=sampling_rate,
sample_width=audio_array.dtype.itemsize,
channels=1
)
# Export the audio segment to MP3 bytes - use a high bitrate to maximise quality
mp3_io = io.BytesIO()
audio_segment.export(mp3_io, format="mp3", bitrate="320k")
# Get the MP3 bytes
mp3_bytes = mp3_io.getvalue()
mp3_io.close()
return mp3_bytes
def unmark_element(element, stream=None):
if stream is None:
stream = StringIO()
if element.text:
stream.write(element.text)
for sub in element:
unmark_element(sub, stream)
if element.tail:
stream.write(element.tail)
return stream.getvalue()
# patching Markdown
Markdown.output_formats["plain"] = unmark_element
__md = Markdown(output_format="plain")
__md.stripTopLevelTags = False
def markdown2text(text):
return __md.convert(text)
def create_temp_html_from_url(url: str) -> str:
try:
response = requests.get(url)
response.raise_for_status()
html = response.text
temp_dir = tempfile.mkdtemp()
temp_path = os.path.join(temp_dir, "output.html")
with open(temp_path, "w") as f:
f.write(html)
except Exception as e:
raise requests.HTTPError(f"Error fetching URL: {str(e)}") from e
return temp_path
def parse(input_type, url_input, file_input, text_input):
if input_type in ["URL", "File"]:
if input_type == "URL":
filepath = create_temp_html_from_url(url_input)
else:
filepath = file_input
print(filepath)
markdown = md.convert(filepath).text_content
else:
markdown = text_input
return markdown
def clean(output_markdown):
return markdown2text(output_markdown)
@spaces.GPU
def text_to_speech(output_text, voice, speed, lang):
async def stream_audio(output_text, voice, speed, lang):
stream = kokoro.create_stream(
output_text,
voice=voice,
speed=float(speed),
lang=lang
)
async for samples, sample_rate in stream:
yield numpy_to_mp3(samples, sampling_rate=sample_rate)
async def run_stream():
async for chunk in stream_audio(output_text, voice, speed, lang):
yield chunk
# Create a new event loop for this thread
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
# Use the new loop to run the async generator
iterator = run_stream().__aiter__()
while True:
try:
yield loop.run_until_complete(iterator.__anext__())
except StopAsyncIteration:
break
with gr.Blocks() as demo:
gr.Markdown(
"# Stream Local TTS with Kokoro-82M 🗣️\n"
"Provide a URL or upload a file to convert its content into speech using [Markitdown](https://github.com/microsoft/markitdown) and [Kokoro-ONNX](https://github.com/thewh1teagle/kokoro-onnx)."
)
with gr.Row():
with gr.Column():
input_type = gr.Radio(["URL", "File", "Custom Text"], label="Input Type")
with gr.Row():
speed = gr.Slider(minimum=0.5, maximum=2.0, value=1.0, step=0.1, label="Speed")
lang = gr.Dropdown(choices=voices.keys(), label="Language", value="en-us")
voice = gr.Dropdown(choices=voices[lang.value], label="Voice", value=voices[lang.value][0])
with gr.Column():
url_input = gr.Textbox(label="Enter URL", lines=1)
file_input = gr.File(label="Upload File", visible=False)
text_input = gr.Textbox(label="Text", visible=False, lines=5, placeholder="Enter text here", show_label=False, interactive=True)
def toggle_file_input(input_type):
return gr.update(visible=(input_type == "File")), gr.update(
visible=(input_type == "URL"),
), gr.update(visible=(input_type == "Custom Text"))
def update_lang(lang):
return gr.Dropdown(choices=voices[lang], label="Voice", value=voices[lang][0])
input_type.change(toggle_file_input, input_type, [file_input, url_input, text_input])
lang.change(update_lang, lang, [voice])
with gr.Accordion("Markdown output", open=False):
output_text = gr.Textbox(visible=False)
output_markdown = gr.Markdown("Parsed markdown will appear here", label="Parsed Text", show_copy_button=True)
output_audio = gr.Audio(label="Generated Audio", streaming=True, autoplay=True, loop=False)
submit_button = gr.Button("Convert")
submit_button.click(
parse,
inputs=[input_type, url_input, file_input, text_input],
outputs=[output_markdown],
).success(
clean,
inputs=[output_markdown],
outputs=[output_text],
).success(
text_to_speech,
inputs=[output_text, voice, speed, lang],
outputs=[output_audio],
)
demo.launch()