import openai |
import sqlite3 |
import numpy as np |
from sklearn.metrics.pairwise import cosine_similarity |
import os |
import gradio as gr |
from docx import Document |
from PyPDF2 import PdfFileReader |
import re |
from gradio import Interface, components |
openai.api_key = os.environ["Secret"] |
def find_closest_neighbors(vector1, dictionary_of_vectors): |
vector = openai.Embedding.create( |
input=vector1, |
engine="text-embedding-ada-002" |
)['data'][0]['embedding'] |
vector = np.array(vector) |
cosine_similarities = {} |
for key, value in dictionary_of_vectors.items(): |
cosine_similarities[key] = cosine_similarity(vector.reshape(1, -1), value.reshape(1, -1))[0][0] |
sorted_cosine_similarities = sorted(cosine_similarities.items(), key=lambda x: x[1], reverse=True) |
return sorted_cosine_similarities[0:4] |
def extract_words_from_docx(filename): |
doc = Document(filename) |
full_text = [] |
for paragraph in doc.paragraphs: |
full_text.append(paragraph.text) |
text = '\n'.join(full_text) |
return re.findall(r'\b\w+\b', text) |
def extract_words_from_pdf(filename): |
with open(filename, "rb") as file: |
pdf = PdfFileReader(file) |
text = "" |
for page_num in range(pdf.getNumPages()): |
text += pdf.getPage(page_num).extractText() |
return re.findall(r'\b\w+\b', text) |
def process_file(file_obj): |
if file_obj is not None: |
if file_obj.name.endswith('.docx'): |
words = extract_words_from_docx(file_obj.name) |
elif file_obj.name.endswith('.pdf'): |
words = extract_words_from_pdf(file_obj.name) |
else: |
return "Unsupported file type." |
conn = sqlite3.connect('text_chunks_with_embeddings (1).db') |
cursor = conn.cursor() |
chunks = [" ".join(words[i:i+200]) for i in range(0, len(words), 200)] |
for chunk in chunks: |
embedding = openai.Embedding.create(input=chunk, engine="text-embedding-ada-002")['data'][0]['embedding'] |
embedding_str = " ".join(map(str, embedding)) |
cursor.execute("INSERT INTO chunks (text, embedding) VALUES (?, ?)", (chunk, embedding_str)) |
conn.commit() |
conn.close() |
return "File processed and added to database." |
return "No file uploaded." |
def predict(message, history, file_obj=None): |
if file_obj: |
process_file(file_obj) |
conn = sqlite3.connect('text_chunks_with_embeddings (1).db') |
cursor = conn.cursor() |
cursor.execute("SELECT text, embedding FROM chunks") |
rows = cursor.fetchall() |
dictionary_of_vectors = {} |
for row in rows: |
text = row[0] |
embedding_str = row[1] |
embedding = np.fromstring(embedding_str, sep=' ') |
dictionary_of_vectors[text] = embedding |
conn.close() |
match_list = find_closest_neighbors(message, dictionary_of_vectors) |
context = '' |
for match in match_list: |
context += str(match[0]) |
context = context[:1500] |
prep = f"This is an OpenAI model designed to answer questions specific to grant-making applications for an aquarium. Here is some question-specific context: {context}. Q: {message} A: " |
history_openai_format = [] |
for human, assistant in history: |
history_openai_format.append({"role": "user", "content": human}) |
history_openai_format.append({"role": "assistant", "content": assistant}) |
history_openai_format.append({"role": "user", "content": prep}) |
response = openai.ChatCompletion.create( |
model='gpt-4', |
messages=history_openai_format, |
temperature=1.0, |
stream=True |
) |
partial_message = "" |
for chunk in response: |
if len(chunk['choices'][0]['delta']) != 0: |
partial_message += chunk['choices'][0]['delta']['content'] |
yield partial_message |
Interface(fn=predict, |
inputs=["text", "list", components.File(label="Upload PDF or DOCX file")], |
outputs="textbox", |
live=True).launch() |