|
import os |
|
import io |
|
import spaces |
|
import tempfile |
|
import asyncio |
|
|
|
import numpy as np |
|
from pydub import AudioSegment |
|
import requests |
|
from markdown import Markdown |
|
from io import StringIO |
|
|
|
import gradio as gr |
|
from kokoro_onnx import Kokoro |
|
from markitdown import MarkItDown |
|
|
|
md = MarkItDown() |
|
kokoro = Kokoro("kokoro-v0_19.onnx", "voices.json") |
|
voices = { |
|
"en-us": ['af', 'af_bella', 'af_nicole', 'af_sarah', 'af_sky', 'am_adam', 'am_michael'], |
|
"en-gb": ['bf_emma', 'bf_isabella', 'bm_george', 'bm_lewis'] |
|
} |
|
|
|
def numpy_to_mp3(audio_array, sampling_rate): |
|
|
|
if np.issubdtype(audio_array.dtype, np.floating): |
|
max_val = np.max(np.abs(audio_array)) |
|
audio_array = (audio_array / max_val) * 32767 |
|
audio_array = audio_array.astype(np.int16) |
|
|
|
|
|
audio_segment = AudioSegment( |
|
audio_array.tobytes(), |
|
frame_rate=sampling_rate, |
|
sample_width=audio_array.dtype.itemsize, |
|
channels=1 |
|
) |
|
|
|
|
|
mp3_io = io.BytesIO() |
|
audio_segment.export(mp3_io, format="mp3", bitrate="320k") |
|
|
|
|
|
mp3_bytes = mp3_io.getvalue() |
|
mp3_io.close() |
|
|
|
return mp3_bytes |
|
|
|
def unmark_element(element, stream=None): |
|
if stream is None: |
|
stream = StringIO() |
|
if element.text: |
|
stream.write(element.text) |
|
for sub in element: |
|
unmark_element(sub, stream) |
|
if element.tail: |
|
stream.write(element.tail) |
|
return stream.getvalue() |
|
|
|
|
|
|
|
Markdown.output_formats["plain"] = unmark_element |
|
__md = Markdown(output_format="plain") |
|
__md.stripTopLevelTags = False |
|
|
|
|
|
def markdown2text(text): |
|
return __md.convert(text) |
|
|
|
|
|
def create_temp_html_from_url(url: str) -> str: |
|
try: |
|
response = requests.get(url) |
|
response.raise_for_status() |
|
html = response.text |
|
temp_dir = tempfile.mkdtemp() |
|
temp_path = os.path.join(temp_dir, "output.html") |
|
with open(temp_path, "w") as f: |
|
f.write(html) |
|
except Exception as e: |
|
raise requests.HTTPError(f"Error fetching URL: {str(e)}") from e |
|
return temp_path |
|
|
|
|
|
def parse(input_type, url_input, file_input, text_input): |
|
if input_type in ["URL", "File"]: |
|
if input_type == "URL": |
|
filepath = create_temp_html_from_url(url_input) |
|
else: |
|
filepath = file_input |
|
print(filepath) |
|
markdown = md.convert(filepath).text_content |
|
else: |
|
markdown = text_input |
|
return markdown |
|
|
|
|
|
def clean(output_markdown): |
|
return markdown2text(output_markdown) |
|
|
|
|
|
@spaces.GPU |
|
def text_to_speech(output_text, voice, speed, lang): |
|
async def stream_audio(output_text, voice, speed, lang): |
|
stream = kokoro.create_stream( |
|
output_text, |
|
voice=voice, |
|
speed=float(speed), |
|
lang=lang |
|
) |
|
async for samples, sample_rate in stream: |
|
yield numpy_to_mp3(samples, sampling_rate=sample_rate) |
|
|
|
async def run_stream(): |
|
async for chunk in stream_audio(output_text, voice, speed, lang): |
|
yield chunk |
|
|
|
|
|
loop = asyncio.new_event_loop() |
|
asyncio.set_event_loop(loop) |
|
|
|
|
|
iterator = run_stream().__aiter__() |
|
while True: |
|
try: |
|
yield loop.run_until_complete(iterator.__anext__()) |
|
except StopAsyncIteration: |
|
break |
|
|
|
|
|
with gr.Blocks() as demo: |
|
gr.Markdown( |
|
"# Stream Local TTS with Kokoro-82M 🗣️\n" |
|
"Provide a URL or upload a file to convert its content into speech using [Markitdown](https://github.com/microsoft/markitdown) and [Kokoro-ONNX](https://github.com/thewh1teagle/kokoro-onnx)." |
|
) |
|
|
|
with gr.Row(): |
|
with gr.Column(): |
|
input_type = gr.Radio(["URL", "File", "Custom Text"], label="Input Type") |
|
with gr.Row(): |
|
speed = gr.Slider(minimum=0.5, maximum=2.0, value=1.0, step=0.1, label="Speed") |
|
lang = gr.Dropdown(choices=voices.keys(), label="Language", value="en-us") |
|
voice = gr.Dropdown(choices=voices[lang.value], label="Voice", value=voices[lang.value][0]) |
|
with gr.Column(): |
|
url_input = gr.Textbox(label="Enter URL", lines=1) |
|
file_input = gr.File(label="Upload File", visible=False) |
|
text_input = gr.Textbox(label="Text", visible=False, lines=5, placeholder="Enter text here", show_label=False, interactive=True) |
|
|
|
def toggle_file_input(input_type): |
|
return gr.update(visible=(input_type == "File")), gr.update( |
|
visible=(input_type == "URL"), |
|
), gr.update(visible=(input_type == "Custom Text")) |
|
|
|
def update_lang(lang): |
|
return gr.Dropdown(choices=voices[lang], label="Voice", value=voices[lang][0]) |
|
|
|
input_type.change(toggle_file_input, input_type, [file_input, url_input, text_input]) |
|
lang.change(update_lang, lang, [voice]) |
|
|
|
with gr.Accordion("Markdown output", open=False): |
|
output_text = gr.Textbox(visible=False) |
|
output_markdown = gr.Markdown("Parsed markdown will appear here", label="Parsed Text", show_copy_button=True) |
|
output_audio = gr.Audio(label="Generated Audio", streaming=True, autoplay=True, loop=False) |
|
submit_button = gr.Button("Convert") |
|
|
|
submit_button.click( |
|
parse, |
|
inputs=[input_type, url_input, file_input, text_input], |
|
outputs=[output_markdown], |
|
).success( |
|
clean, |
|
inputs=[output_markdown], |
|
outputs=[output_text], |
|
).success( |
|
text_to_speech, |
|
inputs=[output_text, voice, speed, lang], |
|
outputs=[output_audio], |
|
) |
|
|
|
demo.launch() |
|
|