import os import io import spaces import tempfile import asyncio #import soundfile as sf import numpy as np from pydub import AudioSegment import requests from markdown import Markdown from io import StringIO import gradio as gr from kokoro_onnx import Kokoro from markitdown import MarkItDown md = MarkItDown() kokoro = Kokoro("kokoro-v0_19.onnx", "voices.json") voices = { "en-us": ['af', 'af_bella', 'af_nicole', 'af_sarah', 'af_sky', 'am_adam', 'am_michael'], "en-gb": ['bf_emma', 'bf_isabella', 'bm_george', 'bm_lewis'] } def numpy_to_mp3(audio_array, sampling_rate): # Normalize audio_array if it's floating-point if np.issubdtype(audio_array.dtype, np.floating): max_val = np.max(np.abs(audio_array)) audio_array = (audio_array / max_val) * 32767 # Normalize to 16-bit range audio_array = audio_array.astype(np.int16) # Create an audio segment from the numpy array audio_segment = AudioSegment( audio_array.tobytes(), frame_rate=sampling_rate, sample_width=audio_array.dtype.itemsize, channels=1 ) # Export the audio segment to MP3 bytes - use a high bitrate to maximise quality mp3_io = io.BytesIO() audio_segment.export(mp3_io, format="mp3", bitrate="320k") # Get the MP3 bytes mp3_bytes = mp3_io.getvalue() mp3_io.close() return mp3_bytes def unmark_element(element, stream=None): if stream is None: stream = StringIO() if element.text: stream.write(element.text) for sub in element: unmark_element(sub, stream) if element.tail: stream.write(element.tail) return stream.getvalue() # patching Markdown Markdown.output_formats["plain"] = unmark_element __md = Markdown(output_format="plain") __md.stripTopLevelTags = False def markdown2text(text): return __md.convert(text) def create_temp_html_from_url(url: str) -> str: try: response = requests.get(url) response.raise_for_status() html = response.text temp_dir = tempfile.mkdtemp() temp_path = os.path.join(temp_dir, "output.html") with open(temp_path, "w") as f: f.write(html) except Exception as e: raise requests.HTTPError(f"Error fetching URL: {str(e)}") from e return temp_path def parse(input_type, url_input, file_input, text_input): if input_type in ["URL", "File"]: if input_type == "URL": filepath = create_temp_html_from_url(url_input) else: filepath = file_input print(filepath) markdown = md.convert(filepath).text_content else: markdown = text_input return markdown def clean(output_markdown): return markdown2text(output_markdown) @spaces.GPU def text_to_speech(output_text, voice, speed, lang): async def stream_audio(output_text, voice, speed, lang): stream = kokoro.create_stream( output_text, voice=voice, speed=float(speed), lang=lang ) async for samples, sample_rate in stream: yield numpy_to_mp3(samples, sampling_rate=sample_rate) async def run_stream(): async for chunk in stream_audio(output_text, voice, speed, lang): yield chunk # Create a new event loop for this thread loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) # Use the new loop to run the async generator iterator = run_stream().__aiter__() while True: try: yield loop.run_until_complete(iterator.__anext__()) except StopAsyncIteration: break with gr.Blocks() as demo: gr.Markdown( "# Stream Local TTS with Kokoro-82M 🗣️\n" "Provide a URL or upload a file to convert its content into speech using [Markitdown](https://github.com/microsoft/markitdown) and [Kokoro-ONNX](https://github.com/thewh1teagle/kokoro-onnx)." ) with gr.Row(): with gr.Column(): input_type = gr.Radio(["URL", "File", "Custom Text"], label="Input Type") with gr.Row(): speed = gr.Slider(minimum=0.5, maximum=2.0, value=1.0, step=0.1, label="Speed") lang = gr.Dropdown(choices=voices.keys(), label="Language", value="en-us") voice = gr.Dropdown(choices=voices[lang.value], label="Voice", value=voices[lang.value][0]) with gr.Column(): url_input = gr.Textbox(label="Enter URL", lines=1) file_input = gr.File(label="Upload File", visible=False) text_input = gr.Textbox(label="Text", visible=False, lines=5, placeholder="Enter text here", show_label=False, interactive=True) def toggle_file_input(input_type): return gr.update(visible=(input_type == "File")), gr.update( visible=(input_type == "URL"), ), gr.update(visible=(input_type == "Custom Text")) def update_lang(lang): return gr.Dropdown(choices=voices[lang], label="Voice", value=voices[lang][0]) input_type.change(toggle_file_input, input_type, [file_input, url_input, text_input]) lang.change(update_lang, lang, [voice]) with gr.Accordion("Markdown output", open=False): output_text = gr.Textbox(visible=False) output_markdown = gr.Markdown("Parsed markdown will appear here", label="Parsed Text", show_copy_button=True) output_audio = gr.Audio(label="Generated Audio", streaming=True, autoplay=True, loop=False) submit_button = gr.Button("Convert") submit_button.click( parse, inputs=[input_type, url_input, file_input, text_input], outputs=[output_markdown], ).success( clean, inputs=[output_markdown], outputs=[output_text], ).success( text_to_speech, inputs=[output_text, voice, speed, lang], outputs=[output_audio], ) demo.launch()