Spaces:
Runtime error
Runtime error
import time | |
import openai | |
import os | |
import pandas as pd | |
import gradio as gr | |
from llama_index import StorageContext, load_index_from_storage | |
from openai.embeddings_utils import get_embedding, cosine_similarity | |
openai.api_key = os.getenv("OPENAI_API_KEY") | |
passwd = os.getenv("PASSWD_SECRET") | |
title = "Confidential forensics tool with ChatGPT" | |
examples = ["Who is Phillip Allen?", "What the project in Austin is about?", "Give me more details about the real estate project"] | |
file_metadata = lambda x: {"filename": x} | |
storage_context = StorageContext.from_defaults(persist_dir="./storage") | |
index = load_index_from_storage(storage_context) | |
query_engine = index.as_query_engine( | |
similarity_top_k=3, | |
) | |
default_mail_name = "no title" | |
df = pd.read_csv("metadata.csv", delimiter=";") | |
df["subject"] = df["subject"].replace([None], default_mail_name) | |
df2 = df["subject"].head(10).to_frame() | |
df2 = df2.rename({'subject': 'Emails titles'}, axis=1) | |
def get_email_subject(response): | |
emails = [] | |
for node in response.source_nodes: | |
email = node.node.extra_info["filename"].split("\\")[-1].split(".")[0] | |
emails.append(email) | |
mask = df.email_name.apply(lambda x: x in emails) | |
return df.loc[mask] | |
# def get_email_subject(response): | |
# podcasts = [] | |
# for node in response.source_nodes: | |
# podcast = node.node.extra_info["filename"].split("/")[-1].split(".")[0] | |
# podcasts.append(podcast) | |
def search_emails(opt, message, n=3): | |
"Outputs the top n emails that match the most the pattern" | |
if len(message.strip()) < 1: | |
message = "Oops, it looks like your query was not valid. Please make sure you typed something in your text box and then try again." | |
else: | |
try: | |
embedding = get_embedding(message) | |
message = "" | |
df['similarities'] = df.embedding.apply(func=(lambda x: cosine_similarity(x, embedding))) | |
message_tmp = df.sort_values('similarities', ascending=False).head(n) | |
message_tmp = [(row.file, row.body, row.similarities) for index, row in message_tmp.iterrows()] | |
for msg in message_tmp: | |
message += f"Mail ID: {msg[0]}\nContent: {msg[1].strip()}\nSimilarity score: {msg[2]}\n\n" | |
except Exception as e: | |
message = "An error occured when handling your query, please try again." | |
print(e) | |
return message, "" | |
def respond_upload(btn_upload, message, chat_history): | |
time.sleep(2) | |
message = "***File uploaded***" | |
bot_message = "Your document has been uploaded and will be accounted for your queries." | |
chat_history.append((message, bot_message)) | |
return btn_upload, "", chat_history | |
def respond2(message, chat_history, box, btn): | |
message, chat_history = respond_common(message, chat_history, box, btn) | |
return message, chat_history, box | |
def respond(message, chat_history): | |
message, chat_history = respond_common(message, chat_history) | |
return "", chat_history | |
def respond_common(message, chat_history, box=None, btn=None): | |
if len(message.strip()) < 1: | |
message = "***Empty***" | |
bot_message = "Oops, it looks like your query was not valid. Please make sure you typed something in your text box and then try again." | |
else: | |
try: | |
resp = query_engine.query(message) | |
bot_message = str(resp).strip() | |
get_email_subject(resp) | |
bot_message += "\n\n\n\nSource(s):\n\n" | |
for i, row in get_email_subject(resp).iterrows(): | |
bot_message += f"Email ID: **{row.email_name}**\n**Subject: {row.subject}**\n" | |
except Exception as e: | |
bot_message = "An error occured when handling your query, please try again." | |
print(e) | |
chat_history.append((message, bot_message)) | |
return message, chat_history | |
with gr.Blocks(title=title) as demo: | |
gr.Markdown( | |
""" | |
# """ + title + """ | |
""") | |
dat = gr.Dataframe( | |
value=df2, | |
max_cols=1, | |
max_rows=4, | |
overflow_row_behaviour="paginate", | |
) | |
btn_upload = gr.UploadButton("Upload a new document...", file_types=["text"]) | |
gr.Markdown( | |
""" | |
## Chatbot | |
""") | |
chatbot = gr.Chatbot().style(height=400) | |
with gr.Row(): | |
with gr.Column(scale=0.85): | |
msg = gr.Textbox( | |
show_label=False, | |
placeholder="Enter text and press enter, or click on Send.", | |
).style(container=False) | |
with gr.Column(scale=0.15, min_width=0): | |
btn_send = gr.Button("Send your query") | |
with gr.Row(): | |
gr.Markdown( | |
""" | |
Example of queries | |
""") | |
for ex in examples: | |
btn = gr.Button(ex) | |
btn.click(respond2, [btn, chatbot, msg], [btn, chatbot, msg]) | |
msg.submit(respond, [msg, chatbot], [msg, chatbot]) | |
btn_send.click(respond, [msg, chatbot], [msg, chatbot]) | |
btn_upload.upload(respond_upload, [btn_upload, msg, chatbot], [btn_upload, msg, chatbot]) | |
# gr.Markdown( | |
# """ | |
# ## Search the matching document | |
# """) | |
# opt = gr.Textbox( | |
# show_label=False, | |
# placeholder="The document matching with your query will be shown here.", | |
# interactive=False, | |
# lines=8 | |
# ) | |
# with gr.Row(): | |
# with gr.Column(scale=0.85): | |
# msg2 = gr.Textbox( | |
# show_label=False, | |
# placeholder="Enter text and press enter, or click on Send.", | |
# ).style(container=False) | |
# with gr.Column(scale=0.15, min_width=0): | |
# btn_send2 = gr.Button("Send your query") | |
# btn_send2.click(search_emails, [opt, msg2], [opt, msg2]) | |
if __name__ == "__main__": | |
demo.launch(auth=("mithril", passwd)) |