|
import streamlit as st |
|
import os, glob, re, base64, asyncio, requests |
|
from datetime import datetime |
|
from collections import defaultdict |
|
from urllib.parse import quote |
|
from xml.etree import ElementTree as ET |
|
import edge_tts |
|
import streamlit.components.v1 as components |
|
|
|
|
|
USER_NAMES = [ |
|
"Aria", "Guy", "Sonia", "Tony", "Jenny", "Davis", "Libby", "Clara", "Liam", "Natasha", "William" |
|
] |
|
|
|
ENGLISH_VOICES = [ |
|
"en-US-AriaNeural", "en-US-GuyNeural", "en-GB-SoniaNeural", "en-GB-TonyNeural", |
|
"en-US-JennyNeural", "en-US-DavisNeural", "en-GB-LibbyNeural", "en-CA-ClaraNeural", |
|
"en-CA-LiamNeural", "en-AU-NatashaNeural", "en-AU-WilliamNeural" |
|
] |
|
|
|
USER_VOICES = dict(zip(USER_NAMES, ENGLISH_VOICES)) |
|
SAVED_INPUTS_DIR = "saved_inputs" |
|
os.makedirs(SAVED_INPUTS_DIR, exist_ok=True) |
|
|
|
if 'user_name' not in st.session_state: |
|
st.session_state['user_name'] = USER_NAMES[0] |
|
if 'old_val' not in st.session_state: |
|
st.session_state['old_val'] = None |
|
if 'should_rerun' not in st.session_state: |
|
st.session_state['should_rerun'] = False |
|
if 'viewing_prefix' not in st.session_state: |
|
st.session_state['viewing_prefix'] = None |
|
|
|
def clean_for_speech(text: str) -> str: |
|
text = text.replace("\n", " ") |
|
text = text.replace("</s>", " ") |
|
text = text.replace("#", "") |
|
text = re.sub(r"\(https?:\/\/[^\)]+\)", "", text) |
|
text = re.sub(r"\s+", " ", text).strip() |
|
return text |
|
|
|
async def edge_tts_generate_audio(text, voice="en-US-AriaNeural"): |
|
text = clean_for_speech(text) |
|
if not text.strip(): |
|
return None |
|
communicate = edge_tts.Communicate(text, voice) |
|
out_fn = f"speech_{datetime.now().strftime('%Y%m%d_%H%M%S')}.mp3" |
|
try: |
|
await communicate.save(out_fn) |
|
except edge_tts.exceptions.NoAudioReceived: |
|
st.error("No audio received from TTS service.") |
|
return None |
|
return out_fn |
|
|
|
def speak_with_edge_tts(text, voice="en-US-AriaNeural"): |
|
return asyncio.run(edge_tts_generate_audio(text, voice)) |
|
|
|
def play_and_download_audio(file_path): |
|
if file_path and os.path.exists(file_path): |
|
st.audio(file_path) |
|
dl_link = f'<a href="data:audio/mpeg;base64,{base64.b64encode(open(file_path,"rb").read()).decode()}" download="{os.path.basename(file_path)}">Download {os.path.basename(file_path)}</a>' |
|
st.markdown(dl_link, unsafe_allow_html=True) |
|
|
|
def save_input_as_md(user_name, text, prefix="input"): |
|
if not text.strip(): |
|
return |
|
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") |
|
safe_text = re.sub(r'[^\w\s-]', '', text[:50]).strip().lower() |
|
safe_text = re.sub(r'[-\s]+', '-', safe_text) |
|
fn = f"{prefix}_{timestamp}_{safe_text}.md" |
|
full_path = os.path.join(SAVED_INPUTS_DIR, fn) |
|
with open(full_path, 'w', encoding='utf-8') as f: |
|
f.write(f"# User: {user_name}\n") |
|
f.write(f"**Timestamp:** {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n\n") |
|
f.write(text) |
|
return full_path |
|
|
|
def list_saved_inputs(): |
|
files = sorted(glob.glob(os.path.join(SAVED_INPUTS_DIR, "*.md"))) |
|
return files |
|
|
|
def parse_md_file(fpath): |
|
user_line = "" |
|
ts_line = "" |
|
content_lines = [] |
|
with open(fpath, 'r', encoding='utf-8') as f: |
|
lines = f.readlines() |
|
for line in lines: |
|
if line.startswith("# User:"): |
|
user_line = line.replace("# User:", "").strip() |
|
elif line.startswith("**Timestamp:**"): |
|
ts_line = line.replace("**Timestamp:**", "").strip() |
|
else: |
|
content_lines.append(line.strip()) |
|
content = "\n".join(content_lines).strip() |
|
return user_line, ts_line, content |
|
|
|
def arxiv_search(query, max_results=3): |
|
base_url = "http://export.arxiv.org/api/query" |
|
params = { |
|
'search_query': query.replace(' ', '+'), |
|
'start': 0, |
|
'max_results': max_results |
|
} |
|
response = requests.get(base_url, params=params, timeout=30) |
|
if response.status_code == 200: |
|
root = ET.fromstring(response.text) |
|
ns = {"a": "http://www.w3.org/2005/Atom"} |
|
entries = root.findall('a:entry', ns) |
|
results = [] |
|
for entry in entries: |
|
title = entry.find('a:title', ns).text.strip() |
|
summary = entry.find('a:summary', ns).text.strip() |
|
|
|
links = entry.findall('a:link', ns) |
|
pdf_link = None |
|
for link in links: |
|
if link.get('type') == 'application/pdf': |
|
pdf_link = link.get('href') |
|
summary_short = summary[:300] + "..." |
|
|
|
if pdf_link: |
|
formatted = f"Title: {title}\nPDF: {pdf_link}\nSummary: {summary_short}" |
|
else: |
|
formatted = f"Title: {title}\n(No PDF link)\nSummary: {summary_short}" |
|
results.append(formatted) |
|
return results |
|
return [] |
|
|
|
def summarize_arxiv_results(results): |
|
if not results: |
|
return "No results found." |
|
return "\n\n".join(results) |
|
|
|
def concatenate_mp3(files, output_file): |
|
with open(output_file, 'wb') as outfile: |
|
for f in files: |
|
with open(f, 'rb') as infile: |
|
outfile.write(infile.read()) |
|
|
|
st.title("ποΈ Voice Chat & ArXiv Search") |
|
|
|
with st.sidebar: |
|
st.session_state['user_name'] = st.selectbox("Current User:", USER_NAMES, index=0) |
|
|
|
saved_files = list_saved_inputs() |
|
st.write("π Saved Inputs:") |
|
for fpath in saved_files: |
|
user, ts, content = parse_md_file(fpath) |
|
fname = os.path.basename(fpath) |
|
st.write(f"- {fname} (User: {user})") |
|
|
|
if st.button("ποΈ Clear All History"): |
|
for fpath in saved_files: |
|
os.remove(fpath) |
|
st.session_state['viewing_prefix'] = None |
|
st.success("All history cleared!") |
|
st.experimental_rerun() |
|
|
|
mycomponent = components.declare_component("mycomponent", path="mycomponent") |
|
voice_val = mycomponent(my_input_value="Start speaking...") |
|
|
|
tabs = st.tabs(["π€ Voice Chat", "πΎ History", "βοΈ Settings"]) |
|
|
|
|
|
with tabs[0]: |
|
st.subheader("π€ Voice Chat") |
|
if voice_val: |
|
voice_text = voice_val.strip() |
|
input_changed = (voice_text != st.session_state.get('old_val')) |
|
if input_changed and voice_text: |
|
|
|
save_input_as_md(st.session_state['user_name'], voice_text, prefix="input") |
|
|
|
|
|
with st.spinner("Searching ArXiv..."): |
|
results = arxiv_search(voice_text) |
|
summary = summarize_arxiv_results(results) |
|
|
|
save_input_as_md(st.session_state['user_name'], summary, prefix="arxiv") |
|
st.write(summary) |
|
|
|
|
|
voice = USER_VOICES.get(st.session_state['user_name'], "en-US-AriaNeural") |
|
audio_file = speak_with_edge_tts(summary, voice=voice) |
|
if audio_file: |
|
play_and_download_audio(audio_file) |
|
|
|
|
|
st.session_state['old_val'] = voice_text |
|
|
|
|
|
st.experimental_rerun() |
|
|
|
st.write("Speak a query to automatically run an ArXiv search and read results aloud.") |
|
|
|
|
|
with tabs[1]: |
|
st.subheader("πΎ History") |
|
files = list_saved_inputs() |
|
conversation = [] |
|
for fpath in files: |
|
user, ts, content = parse_md_file(fpath) |
|
conversation.append((user, ts, content, fpath)) |
|
|
|
for i, (user, ts, content, fpath) in enumerate(reversed(conversation), start=1): |
|
with st.expander(f"{ts} - {user}", expanded=False): |
|
st.write(content) |
|
if st.button(f"π Read Aloud {ts}-{user}", key=f"read_{i}_{fpath}"): |
|
voice = USER_VOICES.get(user, "en-US-AriaNeural") |
|
audio_file = speak_with_edge_tts(content, voice=voice) |
|
if audio_file: |
|
play_and_download_audio(audio_file) |
|
|
|
if st.button("π Read Entire Conversation"): |
|
conversation_chrono = list(reversed(conversation)) |
|
mp3_files = [] |
|
for user, ts, content, fpath in conversation_chrono: |
|
voice = USER_VOICES.get(user, "en-US-AriaNeural") |
|
audio_file = speak_with_edge_tts(content, voice=voice) |
|
if audio_file: |
|
mp3_files.append(audio_file) |
|
st.write(f"**{user} ({ts}):**") |
|
play_and_download_audio(audio_file) |
|
|
|
if mp3_files: |
|
combined_file = f"full_conversation_{datetime.now().strftime('%Y%m%d_%H%M%S')}.mp3" |
|
concatenate_mp3(mp3_files, combined_file) |
|
st.write("**Full Conversation Audio:**") |
|
play_and_download_audio(combined_file) |
|
|
|
|
|
with tabs[2]: |
|
st.subheader("βοΈ Settings") |
|
st.write("Currently no additional settings. Use the sidebar to pick a user.") |
|
|
|
if st.session_state.should_rerun: |
|
st.session_state.should_rerun = False |
|
st.rerun() |
|
|