Spaces:
Sleeping
Sleeping
from pydantic import NoneStr | |
import os | |
from langchain.chains.question_answering import load_qa_chain | |
from langchain.document_loaders import UnstructuredFileLoader | |
from langchain.embeddings.openai import OpenAIEmbeddings | |
from langchain.llms import OpenAI | |
from langchain.text_splitter import CharacterTextSplitter | |
from langchain.vectorstores import FAISS | |
from langchain.vectorstores import Chroma | |
from langchain.chains import ConversationalRetrievalChain | |
import gradio as gr | |
import openai | |
from langchain import PromptTemplate, OpenAI, LLMChain | |
import validators | |
import requests | |
import mimetypes | |
import tempfile | |
class Chatbot: | |
def __init__(self): | |
openai.api_key = os.getenv("OPENAI_API_KEY") | |
def get_empty_state(self): | |
""" Create empty Knowledge base""" | |
return {"knowledge_base": None} | |
def create_knowledge_base(self,docs): | |
"""Create a knowledge base from the given documents. | |
Args: | |
docs (List[str]): List of documents. | |
Returns: | |
FAISS: Knowledge base built from the documents. | |
""" | |
# Initialize a CharacterTextSplitter to split the documents into chunks | |
# Each chunk has a maximum length of 500 characters | |
# There is no overlap between the chunks | |
text_splitter = CharacterTextSplitter( | |
separator="\n", chunk_size=1000, chunk_overlap=200, length_function=len | |
) | |
# Split the documents into chunks using the text_splitter | |
chunks = text_splitter.split_documents(docs) | |
# Initialize an OpenAIEmbeddings model to compute embeddings of the chunks | |
embeddings = OpenAIEmbeddings() | |
# Build a knowledge base using FAISS from the chunks and their embeddings | |
knowledge_base = Chroma.from_documents(chunks, embeddings) | |
# Return the resulting knowledge base | |
return knowledge_base | |
def upload_file(self,file_paths): | |
"""Upload a file and create a knowledge base from its contents. | |
Args: | |
file_paths : The files to uploaded. | |
Returns: | |
tuple: A tuple containing the file name and the knowledge base. | |
""" | |
file_paths = [i.name for i in file_paths] | |
print(file_paths) | |
loaders = [UnstructuredFileLoader(file_obj, strategy="fast") for file_obj in file_paths] | |
# Load the contents of the file using the loader | |
docs = [] | |
for loader in loaders: | |
docs.extend(loader.load()) | |
# Create a knowledge base from the loaded documents using the create_knowledge_base() method | |
knowledge_base = self.create_knowledge_base(docs) | |
# Return a tuple containing the file name and the knowledge base | |
return file_paths, {"knowledge_base": knowledge_base} | |
def add_text(self,history, text): | |
history = history + [(text, None)] | |
return history, gr.update(value="", interactive=False) | |
def upload_multiple_urls(self,urls): | |
urlss = [url.strip() for url in urls.split(',')] | |
all_docs = [] | |
file_paths = [] | |
for url in urlss: | |
if validators.url(url): | |
headers = {'user-agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/114.0.0.0 Safari/537.36',} | |
r = requests.get(url,headers=headers) | |
if r.status_code != 200: | |
raise ValueError( | |
"Check the url of your file; returned status code %s" % r.status_code | |
) | |
content_type = r.headers.get("content-type") | |
file_extension = mimetypes.guess_extension(content_type) | |
temp_file = tempfile.NamedTemporaryFile(suffix=file_extension, delete=False) | |
temp_file.write(r.content) | |
file_path = temp_file.name | |
file_paths.append(file_path) | |
loaders = [UnstructuredFileLoader(file_obj, strategy="fast") for file_obj in file_paths] | |
# Load the contents of the file using the loader | |
docs = [] | |
for loader in loaders: | |
docs.extend(loader.load()) | |
# Create a knowledge base from the loaded documents using the create_knowledge_base() method | |
knowledge_base = self.create_knowledge_base(docs) | |
return file_paths,{"knowledge_base":knowledge_base} | |
def answer_question(self, question,history,state): | |
"""Answer a question based on the current knowledge base. | |
Args: | |
state (dict): The current state containing the knowledge base. | |
Returns: | |
str: The answer to the question. | |
""" | |
# Retrieve the knowledge base from the state dictionary | |
knowledge_base = state["knowledge_base"] | |
retriever = knowledge_base.as_retriever() | |
qa = ConversationalRetrievalChain.from_llm( | |
llm=OpenAI(temperature=0.5), | |
retriever=retriever, | |
return_source_documents=False) | |
# Set the question for which we want to find the answer | |
res = [] | |
question = history[-1][0] | |
for human, ai in history[:-1]: | |
pair = (human, ai) | |
res.append(pair) | |
chat_history = res | |
#print(chat_history) | |
query = question | |
result = qa({"question": query, "chat_history": chat_history}) | |
# Perform a similarity search on the knowledge base to retrieve relevant documents | |
response = result["answer"] | |
# Return the response as the answer to the question | |
history[-1][1] = response | |
return history | |
def extract_excel_data(self,file_path): | |
# Read the Excel file | |
df = pd.read_excel(file_path) | |
# Flatten the data to a single list | |
data_list = [] | |
for _, row in df.iterrows(): | |
data_list.extend(row.tolist()) | |
return data_list | |
def comparing_chemicals(self,excel_file_path,chemicals): | |
chemistry_capability = self.extract_excel_data(excel_file_path.name) | |
response = openai.Completion.create( | |
engine="text-davinci-003", | |
prompt= f"""Analyse the following text delimited by triple backticks to return the comman chemicals. | |
text : ```{chemicals} {chemistry_capability}```. | |
result should be in bullet points format. | |
""", | |
max_tokens=100, | |
n=1, | |
stop=None, | |
temperature=0, | |
top_p=1.0, | |
frequency_penalty=0.0, | |
presence_penalty=0.0 | |
) | |
result = response.choices[0].text.strip() | |
return result | |
def clear_function(self,state): | |
state.clear() | |
# state = gr.State(self.get_empty_state()) | |
def gradio_interface(self): | |
"""Create the Gradio interface for the Chemical Identifier.""" | |
with gr.Blocks(css="style.css",theme=gr.themes.Soft()) as demo: | |
state = gr.State(self.get_empty_state()) | |
with gr.Column(elem_id="col-container"): | |
gr.HTML( | |
"""<hr style="border-top: 5px solid white;">""" | |
) | |
gr.HTML( | |
"""<br> | |
<h1 style="text-align:center;"> | |
Multi URL and Doc Chatbot Q&A | |
</h1> """ | |
) | |
gr.HTML( | |
"""<hr style="border-top: 5px solid white;">""" | |
) | |
gr.Markdown("**Upload your file**") | |
with gr.Row(elem_id="row-flex"): | |
with gr.Column(scale=1, min_width=0): | |
file_url = gr.Textbox(label='file url :',show_label=True, placeholder="") | |
with gr.Row(elem_id="row-flex"): | |
with gr.Accordion("Upload Files", open = False): | |
with gr.Column(scale=0.90, min_width=160): | |
file_output = gr.File(elem_classes="heightfit") | |
with gr.Column(scale=0.10, min_width=160): | |
upload_button = gr.UploadButton( | |
"Browse File", file_types=[".txt", ".pdf", ".doc", ".docx"], | |
file_count = "multiple") | |
with gr.Row(): | |
chatbot = gr.Chatbot([], elem_id="chatbot") | |
with gr.Row(): | |
txt = gr.Textbox( | |
label = "Question", | |
show_label=True, | |
placeholder="Enter text and press enter, or upload an image", | |
) | |
with gr.Row(): | |
clear_btn = gr.Button(value="Clear") | |
txt_msg = txt.submit(self.add_text, [chatbot, txt], [chatbot, txt], queue=False).then( | |
self.answer_question, [txt,chatbot,state], chatbot | |
) | |
txt_msg.then(lambda: gr.update(interactive=True), None, [txt], queue=False) | |
file_url.submit(self.upload_multiple_urls, file_url, [file_output, state]) | |
clear_btn.click(self.clear_function,[state],[]) | |
clear_btn.click(lambda: None, None, chatbot, queue=False) | |
upload_button.upload(self.upload_file, upload_button, [file_output,state]) | |
demo.queue().launch(debug=True) | |
if __name__=="__main__": | |
chatbot = Chatbot() | |
chatbot.gradio_interface() |