import gradio as gr
import os
import re
import requests
import tempfile
import time
from pyzotero import zotero
from paperqa import Docs
from lxml import html
from models import Icons, Message
def is_integer(string):
try:
int(string)
except ValueError:
return False
else:
return True
def reset_open_ai(openai_api_key):
os.environ['OPENAI_API_KEY'] = openai_api_key.strip()
return gr.HTML.update(value=None)
def fetch_collections(openai_api_key, id, type, key, messages):
if openai_api_key == '':
messages.append(
Message(Icons.ERR, f"Your Open API key is missing. Check out: https://platform.openai.com/overview."))
return (
None,
[],
None,
gr.Button.update(visible=True),
gr.HTML.update(visible=True),
messages,
gr.HTML.update(value=str(messages)),
)
if key == '':
messages.append(
Message(Icons.ERR, f"Your Zotero API key is missing. Click here to create a new one."))
return (
None,
[],
None,
gr.Button.update(visible=True),
gr.HTML.update(visible=True),
messages,
gr.HTML.update(value=str(messages)),
)
if not is_integer(id):
messages.append(
Message(Icons.ERR, f"Your Zotero ID should be an integer."))
return (
None,
[],
None,
gr.Button.update(visible=True),
gr.HTML.update(visible=True),
messages,
gr.HTML.update(value=str(messages)),
)
zot = zotero.Zotero(int(id), type.lower(), key)
try:
collections = zot.collections_top()
collection_names = [
f"{x['data']['name']} ({x['meta']['numItems']})" for x in collections]
messages.append(
Message(Icons.INFO, "Please select a Zotero collection to proceed."))
return (
zot,
collections,
gr.Radio.update(choices=collection_names,
visible=True, interactive=True),
gr.Button.update(visible=False),
gr.HTML.update(visible=False),
messages,
gr.HTML.update(value=str(messages)),
)
except Exception as e:
messages.append(
Message(Icons.ERR, f"Error occurred when fetching Zotero collection: {e}"))
return (
None,
[],
None,
gr.Button.update(visible=True),
None,
messages,
gr.HTML.update(value=str(messages)),
)
def select_collection(collection, messages):
if collection is None:
return None, messages, gr.HTML.update(), None
collection_name = re.sub('\s\(\d+\)$', '', collection)
messages.set([Message(
Icons.OK, f"Selected collection: {collection_name}. Please type your question and hit \"Enter\".")])
return (
gr.Text.update(
placeholder="Please type your question and hit \"Enter\".", interactive=True),
messages,
gr.HTML.update(value=str(messages)),
gr.HTML.update(value=None)
)
# def search_attachments(id, type, key, collection, queries=[], limit=10):
# try:
# zot = zotero.Zotero(int(id), type.lower(), key)
# searches = [zot.collection_items(
# collection['key'],
# q=q,
# limit=limit,
# itemType='attachment',
# qmode='everything'
# ) for q in queries]
# attachments = [x for x in {item['key']: item for search in searches for item in search if item['data']
# ['contentType'] == 'application/pdf'}.values()][:limit]
# parents = set([a['data']['parentItem'] for a in attachments])
# message = f"
✅ Found {len(attachments)} PDF {'attachments' if len(attachments) > 1 else 'attachment'} from {len(parents)} {'articles' if len(parents) > 1 else 'article'}.
" if len(
# attachments) else "❔ No results. Make sure to index your PDF attachments in Zotero.
"
# return parents, attachments, message
# except Exception as e:
# message = f"⚠️ Error occurred when searching in Zotero: {e}
"
# return [], [], message
def download_attachment(id, type, key, attachment):
zot = zotero.Zotero(int(id), type.lower(), key)
link_mode = attachment['data']['linkMode']
if link_mode == 'imported_file':
return zot.file(attachment['key'])
elif link_mode == 'imported_url':
res = requests.get(attachment['data']['url'])
return res.content
else:
raise ValueError(
f'Unsupported link mode: {link_mode} for {attachment["key"]}.')
def reset_collection(messages):
messages.set([Message(
Icons.INFO, "Please provide all the required OpenAI and Zotero information in the left panel.")])
return (
gr.Radio.update(choices=[], visible=False),
gr.HTML.update(visible=True),
gr.Text.update(
placeholder="You have to select a Zotero collection to proceed", interactive=False),
gr.HTML.update(value=None),
messages,
gr.HTML.update(value=str(messages)),
None
)
def handle_submit(zot, collection_name, collections, style, question, messages):
collection_name_only = re.sub('\s\(\d+\)$', '', collection_name)
messages.set([Message(
Icons.OK, f"Selected collection: {collection_name_only}.")])
yield (
messages,
gr.HTML.update(value=str(messages)),
None,
)
docs = Docs()
# Generate search queries from the question by Paper QA
try:
question_prompt = 'A "keyword search" is a list of no more than 3 words, which separated by whitespace only and with no boolean operators (e.g. "dog canine puppy"). Avoid adding any new words not in the question unless they are synonyms to the existing words.'
queries = [x.strip('"').lower() for x in
docs.generate_search_query(question + '\n' + question_prompt)]
query_str = ", ".join(
[f"{q}" for q in queries])
messages.append(
Message(Icons.WAIT, f"Searching your Zotero collection for {query_str}."))
yield (
messages,
gr.HTML.update(value=str(messages)),
None,
)
except Exception as e:
messages.append(
Message(Icons.ERR, f"Error occurred when generating search queries: {e}"))
yield (
messages,
gr.HTML.update(value=str(messages)),
None,
)
return None, None, None
# Search for attachments in Zotero
try:
collection = [
x for x in collections if f"{x['data']['name']} ({x['meta']['numItems']})" == collection_name][0]
searches = [zot.collection_items(
collection['key'],
q=q,
limit=10,
itemType='attachment',
qmode='everything'
) for q in queries]
attachments = [x for x in {
item['key']: item for search in searches for item in search if item['data']['contentType'] == 'application/pdf'}.values()][:10]
parents = set([a['data']['parentItem'] if 'parentItem' in a['data'] else a['key'] for a in attachments ])
if len(attachments) > 0:
messages.append(Message(
Icons.SUCCESS, f"Found {len(attachments)} PDF {'attachments' if len(attachments) > 1 else 'attachment'} from {len(parents)} {'articles' if len(parents) > 1 else 'article'}."))
yield (
messages,
gr.HTML.update(value=str(messages)),
None,
)
else:
messages.append(Message(
Icons.ERR, "No results. Make sure to index your PDF attachments in Zotero and try rephrasing your question."))
yield (
messages,
gr.HTML.update(value=str(messages)),
None,
)
return None, None, None
except Exception as e:
messages.append(
Message(Icons.ERR, f"Error occurred when searching in Zotero: {e}"))
yield (
messages,
gr.HTML.update(value=str(messages)),
None,
)
return None, None, None
# Compile citation metadata
citation_dict = {}
parents = {}
messages.append(
Message(Icons.WAIT, f"Fetching attachment bibliography information."))
yield (
messages,
gr.HTML.update(value=str(messages)),
None,
)
for attachment in attachments:
parent_id = attachment["data"]["parentItem"] if "parentItem" in attachment["data"] else attachment["key"]
try:
if parent_id in parents:
citation_dict[attachment["key"]] = parents[parent_id]
else:
parent = zot.item(
parent_id, content="bib", style=style)[0]
bib = f"""
{html.fragment_fromstring(parent).xpath("normalize-space(//*)")}
Open in Zotero
"""
parents[parent_id] = bib
citation_dict[attachment["key"]] = bib
except Exception as e:
messages.append(Message(
Icons.WARN, f"Failed to retrieve bibliography for PDF attachment {attachment['data']['title']}: {e}"))
yield (
messages,
gr.HTML.update(value=str(messages)),
None,
)
# Load attachments
available_attachments = 0
for attachment in attachments:
try:
link_mode = attachment['data']['linkMode']
if link_mode in ['imported_file', 'imported_url']:
attachment_content = zot.file(attachment['key']) if link_mode == 'imported_file' else requests.get(
attachment['data']['url']).content
temp_file = tempfile.NamedTemporaryFile(suffix=".pdf")
temp_file.write(attachment_content)
temp_file.flush()
docs.add(temp_file.name, citation_dict[attachment["key"]])
messages.append(Message(
Icons.INDEX, f"Loaded PDF attachment: {attachment['data']['title']}."))
available_attachments += 1
else:
messages.append(Message(
Icons.WARN, f"Unable to access linked PDF attachment {attachment['data']['title']}: The file is not in Zotero online storage."))
yield (
messages,
gr.HTML.update(value=str(messages)),
None,
)
except Exception as e:
messages.append(Message(
Icons.WARN, f"Failed to retrieve PDF attachment {attachment['data']['title']}: {e}"))
yield (
messages,
gr.HTML.update(value=str(messages)),
None,
)
# Build vector index
if available_attachments == 0:
messages.append(Message(
Icons.ERR, "No answer. Unable to access any PDF attachments from your Zotero online storage or public URLs."))
yield (
messages,
gr.HTML.update(value=str(messages)),
None,
)
return None, None, None
if docs._faiss_index is None:
try:
messages.append(Message(
Icons.WAIT, f"Building vector index based on {available_attachments} available PDF {'attachment' if available_attachments==1 else 'attachments'}."))
yield (
messages,
gr.HTML.update(value=str(messages)),
None,
)
docs._build_faiss_index()
except Exception as e:
messages.append(Message(
Icons.ERR, f"Unable to build vector index: {e}"))
yield (
messages,
gr.HTML.update(value=str(messages)),
None,
)
return None, None, None
# Synthesize response
messages.append(Message(
Icons.WAIT, f"""Creating answer. {"This should be done within a minute." if available_attachments==1 else "This will loop through all available PDF attachments and may take a couple of minutes."}."""))
yield (
messages,
gr.HTML.update(value=str(messages)),
None,
)
try:
start_time = time.time()
total_time = 0
for i, answer in enumerate(docs.query_gen(question)):
end_time = time.time()
time_dif = end_time - start_time
if time_dif > 15:
start_time = end_time
total_time += time_dif
messages.append(Message(
Icons.INFO, f"Still in prgress: {total_time:.1f} seconds"))
yield (
messages,
gr.HTML.update(value=str(messages)),
None,
)
answer_text = "\n".join(
[f"{x}
" for x in answer.answer.split("\n")])
reference_list = "" if answer.references == "" else "\n".join([f"{x.split('.', 1)[1]}"
for x in answer.references.split('\n\n')])
references = "" if reference_list == "" else f"""
References:
{reference_list}
"""
formatted_answer = f"""
{answer_text}
{references}
Tokens Used: {answer.tokens} Cost: ${answer.tokens/1000 * 0.002:.2f}
""".strip()
messages.append(Message(
Icons.OK, f"Answer created."))
yield (
messages,
gr.HTML.update(value=str(messages)),
gr.HTML.update(value=formatted_answer)
)
except Exception as e:
messages.append(Message(
Icons.ERR, f"Error occurred when creating answer: {e}"))
yield (
messages,
gr.HTML.update(value=str(messages)),
None,
)
return None, None, None