File size: 5,898 Bytes
ecc3537
2efd326
ecc3537
 
92cdaa1
2efd326
 
 
ecc3537
 
 
 
 
 
 
 
 
 
 
 
 
 
 
2efd326
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
ecc3537
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
2efd326
ecc3537
 
 
 
 
 
 
 
 
2efd326
 
 
 
 
 
 
 
92cdaa1
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
ecc3537
 
 
 
2efd326
 
ecc3537
 
 
 
 
 
 
 
 
 
2efd326
ecc3537
 
 
 
 
 
 
 
 
 
 
 
 
 
 
2efd326
ecc3537
2efd326
ecc3537
 
 
2efd326
 
 
 
 
 
 
 
 
 
 
ecc3537
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
import os
import io
import spaces
import tempfile
import asyncio
#import soundfile as sf
import numpy as np
from pydub import AudioSegment
import requests
from markdown import Markdown
from io import StringIO

import gradio as gr
from kokoro_onnx import Kokoro
from markitdown import MarkItDown

md = MarkItDown()
kokoro = Kokoro("kokoro-v0_19.onnx", "voices.json")
voices = {
    "en-us": ['af', 'af_bella', 'af_nicole', 'af_sarah', 'af_sky', 'am_adam', 'am_michael'],
    "en-gb": ['bf_emma', 'bf_isabella', 'bm_george', 'bm_lewis']
}

def numpy_to_mp3(audio_array, sampling_rate):
    # Normalize audio_array if it's floating-point
    if np.issubdtype(audio_array.dtype, np.floating):
        max_val = np.max(np.abs(audio_array))
        audio_array = (audio_array / max_val) * 32767 # Normalize to 16-bit range
        audio_array = audio_array.astype(np.int16)

    # Create an audio segment from the numpy array
    audio_segment = AudioSegment(
        audio_array.tobytes(),
        frame_rate=sampling_rate,
        sample_width=audio_array.dtype.itemsize,
        channels=1
    )

    # Export the audio segment to MP3 bytes - use a high bitrate to maximise quality
    mp3_io = io.BytesIO()
    audio_segment.export(mp3_io, format="mp3", bitrate="320k")

    # Get the MP3 bytes
    mp3_bytes = mp3_io.getvalue()
    mp3_io.close()

    return mp3_bytes

def unmark_element(element, stream=None):
    if stream is None:
        stream = StringIO()
    if element.text:
        stream.write(element.text)
    for sub in element:
        unmark_element(sub, stream)
    if element.tail:
        stream.write(element.tail)
    return stream.getvalue()


# patching Markdown
Markdown.output_formats["plain"] = unmark_element
__md = Markdown(output_format="plain")
__md.stripTopLevelTags = False


def markdown2text(text):
    return __md.convert(text)


def create_temp_html_from_url(url: str) -> str:
    try:
        response = requests.get(url)
        response.raise_for_status()
        html = response.text
        temp_dir = tempfile.mkdtemp()
        temp_path = os.path.join(temp_dir, "output.html")
        with open(temp_path, "w") as f:
            f.write(html)   
    except Exception as e:
        raise requests.HTTPError(f"Error fetching URL: {str(e)}") from e
    return temp_path


def parse(input_type, url_input, file_input, text_input):
    if input_type in ["URL", "File"]:
        if input_type == "URL":
            filepath = create_temp_html_from_url(url_input)
        else:
            filepath = file_input
        print(filepath)
        markdown = md.convert(filepath).text_content
    else:
        markdown = text_input
    return markdown


def clean(output_markdown):
    return markdown2text(output_markdown)


@spaces.GPU
def text_to_speech(output_text, voice, speed, lang):
    async def stream_audio(output_text, voice, speed, lang):
        stream = kokoro.create_stream(
            output_text,
            voice=voice,
            speed=float(speed),
            lang=lang
        )
        async for samples, sample_rate in stream:
            yield numpy_to_mp3(samples, sampling_rate=sample_rate)
    
    async def run_stream():
        async for chunk in stream_audio(output_text, voice, speed, lang):
            yield chunk

    # Create a new event loop for this thread
    loop = asyncio.new_event_loop()
    asyncio.set_event_loop(loop)
    
    # Use the new loop to run the async generator
    iterator = run_stream().__aiter__()
    while True:
        try:
            yield loop.run_until_complete(iterator.__anext__())
        except StopAsyncIteration:
            break


with gr.Blocks() as demo:
    gr.Markdown(
        "# Stream Local TTS with Kokoro-82M 🗣️\n"
        "Provide a URL or upload a file to convert its content into speech using [Markitdown](https://github.com/microsoft/markitdown) and [Kokoro-ONNX](https://github.com/thewh1teagle/kokoro-onnx)."
    )

    with gr.Row():
        with gr.Column():
            input_type = gr.Radio(["URL", "File", "Custom Text"], label="Input Type")
            with gr.Row():
                speed = gr.Slider(minimum=0.5, maximum=2.0, value=1.0, step=0.1, label="Speed")
                lang = gr.Dropdown(choices=voices.keys(), label="Language", value="en-us")
                voice = gr.Dropdown(choices=voices[lang.value], label="Voice", value=voices[lang.value][0])
        with gr.Column():
            url_input = gr.Textbox(label="Enter URL", lines=1)
            file_input = gr.File(label="Upload File", visible=False)
            text_input = gr.Textbox(label="Text", visible=False, lines=5, placeholder="Enter text here", show_label=False, interactive=True)

    def toggle_file_input(input_type):
        return gr.update(visible=(input_type == "File")), gr.update(
            visible=(input_type == "URL"),
        ), gr.update(visible=(input_type == "Custom Text"))

    def update_lang(lang):
        return gr.Dropdown(choices=voices[lang], label="Voice", value=voices[lang][0])

    input_type.change(toggle_file_input, input_type, [file_input, url_input, text_input])
    lang.change(update_lang, lang, [voice])

    with gr.Accordion("Markdown output", open=False):
        output_text = gr.Textbox(visible=False)
        output_markdown = gr.Markdown("Parsed markdown will appear here", label="Parsed Text", show_copy_button=True)
    output_audio = gr.Audio(label="Generated Audio", streaming=True, autoplay=True, loop=False)
    submit_button = gr.Button("Convert")

    submit_button.click(
        parse,
        inputs=[input_type, url_input, file_input, text_input],
        outputs=[output_markdown],
    ).success(
        clean,
        inputs=[output_markdown],
        outputs=[output_text],
    ).success(
        text_to_speech,
        inputs=[output_text, voice, speed, lang],
        outputs=[output_audio],
    )

demo.launch()