Spaces:
Sleeping
Sleeping
import streamlit as st | |
import speech_recognition as sr | |
from pydub import AudioSegment | |
from pydub.playback import play | |
from io import BytesIO | |
from time import sleep | |
client = InferenceClient("mistralai/Mixtral-8x7B-Instruct-v0.1") | |
system_prompt = "Tu nombre es Chaman 3.0 una IA conductual" | |
system_prompt_sent = False | |
def format_prompt(message, history): | |
global system_prompt_sent | |
prompt = "<s>" | |
if history is not None and isinstance(history, list): | |
if not any(f"[INST] {system_prompt} [/INST]" in user_prompt for user_prompt, _ in history): | |
prompt += f"[INST] {system_prompt} [/INST]" | |
system_prompt_sent = True | |
for user_prompt, bot_response in history: | |
prompt += f"[INST] {user_prompt} [/INST]" | |
prompt += f" {bot_response}</s> " | |
prompt += f"[INST] {message} [/INST]" | |
return prompt | |
def text_to_speech(text, speed=2.0): | |
tts = gTTS(text=text, lang='es') | |
audio_file_path = BytesIO() | |
tts.write_to_fp(audio_file_path) | |
return audio_file_path | |
def generate_with_progress( | |
user_input, history, temperature=None, max_new_tokens=2048, top_p=0.95, repetition_penalty=1.0, | |
): | |
global system_prompt_sent | |
temperature = float(temperature) if temperature is not None else 0.9 | |
if temperature < 1e-2: | |
temperature = 1e-2 | |
top_p = float(top_p) | |
generate_kwargs = dict( | |
temperature=temperature, | |
max_new_tokens=max_new_tokens, | |
top_p=top_p, | |
repetition_penalty=repetition_penalty, | |
do_sample=True, | |
seed=42, | |
) | |
formatted_prompt = format_prompt(user_input, history) | |
stream = client.text_generation(formatted_prompt, **generate_kwargs, stream=True, details=True, return_full_text=True) | |
response = "" | |
total_tokens = 0 | |
for response_token in stream: | |
response += response_token.token.text | |
total_tokens += 1 | |
st.subheader("Generando respuesta...") | |
st.progress(total_tokens / max_new_tokens) | |
response = ' '.join(response.split()).replace('</s>', '') | |
return response | |
if "history" not in st.session_state: | |
st.session_state.history = [] | |
recognizer = sr.Recognizer() | |
while True: | |
with st.spinner("Escuchando..."): | |
try: | |
with sr.Microphone() as source: | |
audio_data = recognizer.listen(source, timeout=5) | |
st.success("Audio capturado con éxito.") | |
text = recognizer.recognize_google(audio_data, language="es-ES") | |
st.success(f"Texto reconocido: {text}") | |
st.subheader("Generando respuesta...") | |
st.progress(0.0) | |
output = generate_with_progress(text, history=st.session_state.history) | |
st.session_state.history.append((text, output)) | |
st.success("Respuesta generada con éxito.") | |
st.subheader("Reproduciendo respuesta...") | |
audio_file_path = text_to_speech(output) | |
play(audio_file_path) | |
for progress_value in range(0, 101, 10): | |
st.progress(progress_value / 100) | |
sleep(0.5) | |
except sr.UnknownValueError: | |
st.warning("No se pudo reconocer el habla.") | |
except sr.RequestError as e: | |
st.error(f"Error en la solicitud al servicio de reconocimiento de voz: {e}") | |
break |