|
import openai |
|
import sqlite3 |
|
import numpy as np |
|
from sklearn.metrics.pairwise import cosine_similarity |
|
import os |
|
import gradio as gr |
|
from docx import Document |
|
from PyPDF2 import PdfFileReader |
|
import re |
|
from gradio import Interface, components |
|
|
|
|
|
|
|
openai.api_key = os.environ["Secret"] |
|
|
|
def find_closest_neighbors(vector1, dictionary_of_vectors): |
|
vector = openai.Embedding.create( |
|
input=vector1, |
|
engine="text-embedding-ada-002" |
|
)['data'][0]['embedding'] |
|
vector = np.array(vector) |
|
|
|
cosine_similarities = {} |
|
for key, value in dictionary_of_vectors.items(): |
|
cosine_similarities[key] = cosine_similarity(vector.reshape(1, -1), value.reshape(1, -1))[0][0] |
|
|
|
sorted_cosine_similarities = sorted(cosine_similarities.items(), key=lambda x: x[1], reverse=True) |
|
return sorted_cosine_similarities[0:4] |
|
|
|
def extract_words_from_docx(filename): |
|
doc = Document(filename) |
|
full_text = [] |
|
for paragraph in doc.paragraphs: |
|
full_text.append(paragraph.text) |
|
text = '\n'.join(full_text) |
|
return re.findall(r'\b\w+\b', text) |
|
|
|
def extract_words_from_pdf(filename): |
|
with open(filename, "rb") as file: |
|
pdf = PdfFileReader(file) |
|
text = "" |
|
for page_num in range(pdf.getNumPages()): |
|
text += pdf.getPage(page_num).extractText() |
|
return re.findall(r'\b\w+\b', text) |
|
|
|
def process_file(file_obj): |
|
if file_obj is not None: |
|
|
|
if file_obj.name.endswith('.docx'): |
|
words = extract_words_from_docx(file_obj.name) |
|
elif file_obj.name.endswith('.pdf'): |
|
words = extract_words_from_pdf(file_obj.name) |
|
else: |
|
return "Unsupported file type." |
|
|
|
|
|
conn = sqlite3.connect('text_chunks_with_embeddings (1).db') |
|
cursor = conn.cursor() |
|
|
|
chunks = [" ".join(words[i:i+200]) for i in range(0, len(words), 200)] |
|
for chunk in chunks: |
|
embedding = openai.Embedding.create(input=chunk, engine="text-embedding-ada-002")['data'][0]['embedding'] |
|
embedding_str = " ".join(map(str, embedding)) |
|
cursor.execute("INSERT INTO chunks (text, embedding) VALUES (?, ?)", (chunk, embedding_str)) |
|
|
|
conn.commit() |
|
conn.close() |
|
return "File processed and added to database." |
|
|
|
return "No file uploaded." |
|
|
|
def predict(message, history, file_obj=None): |
|
|
|
if file_obj: |
|
process_file(file_obj) |
|
|
|
|
|
conn = sqlite3.connect('text_chunks_with_embeddings (1).db') |
|
cursor = conn.cursor() |
|
cursor.execute("SELECT text, embedding FROM chunks") |
|
rows = cursor.fetchall() |
|
|
|
dictionary_of_vectors = {} |
|
for row in rows: |
|
text = row[0] |
|
embedding_str = row[1] |
|
embedding = np.fromstring(embedding_str, sep=' ') |
|
dictionary_of_vectors[text] = embedding |
|
conn.close() |
|
|
|
match_list = find_closest_neighbors(message, dictionary_of_vectors) |
|
context = '' |
|
for match in match_list: |
|
context += str(match[0]) |
|
context = context[:1500] |
|
|
|
prep = f"This is an OpenAI model designed to answer questions specific to grant-making applications for an aquarium. Here is some question-specific context: {context}. Q: {message} A: " |
|
|
|
history_openai_format = [] |
|
for human, assistant in history: |
|
history_openai_format.append({"role": "user", "content": human}) |
|
history_openai_format.append({"role": "assistant", "content": assistant}) |
|
history_openai_format.append({"role": "user", "content": prep}) |
|
|
|
response = openai.ChatCompletion.create( |
|
model='gpt-4', |
|
messages=history_openai_format, |
|
temperature=1.0, |
|
stream=True |
|
) |
|
|
|
partial_message = "" |
|
for chunk in response: |
|
if len(chunk['choices'][0]['delta']) != 0: |
|
partial_message += chunk['choices'][0]['delta']['content'] |
|
yield partial_message |
|
|
|
|
|
Interface(fn=predict, |
|
inputs=["text", "list", components.File(label="Upload PDF or DOCX file")], |
|
outputs="textbox", |
|
live=True).launch() |
|
|
|
|