import streamlit as st from pypdf import PdfReader # import replicate import os from pathlib import Path from dotenv import load_dotenv import pickle import timeit from PIL import Image import datetime import base64 from langchain.embeddings import HuggingFaceEmbeddings from langchain.vectorstores import FAISS from langchain.document_loaders import PyPDFLoader from langchain.text_splitter import RecursiveCharacterTextSplitter from langchain.document_loaders import PyPDFLoader, DirectoryLoader from langchain.memory import ConversationBufferMemory from langchain.chains import ConversationalRetrievalChain from langchain.prompts.prompt import PromptTemplate from langchain.llms import LlamaCpp from langchain.callbacks.manager import CallbackManager from langchain.callbacks.streaming_stdout import StreamingStdOutCallbackHandler from langchain.vectorstores import Chroma from langchain.document_loaders import PyPDFDirectoryLoader from langchain.retrievers import BM25Retriever, EnsembleRetriever from langchain.chat_models import ChatOpenAI from langchain.agents.agent_toolkits import create_retriever_tool from langchain.agents.agent_toolkits import create_conversational_retrieval_agent from langchain.utilities import SerpAPIWrapper from utils import build_embedding_model, build_llm from utils import load_retriver,load_vectorstore, load_conversational_retrievel_chain load_dotenv() # Getting current timestamp to keep track of historical conversations current_timestamp = datetime.datetime.now() timestamp_string = current_timestamp.strftime("%Y-%m-%d %H:%M:%S") #Directories path persist_directory= "vector_db_gsa" all_docs_pkl_directory= 'Database/text_chunks_html_pdf.pkl' # Initliazing sesstion states in Streamlit to cache different stuffs like model iniitialization and there by avoid re-running of alredy initialized stuffs over and again. if "llm" not in st.session_state: st.session_state["llm"] = build_llm() if "embeddings" not in st.session_state: st.session_state["embeddings"] = build_embedding_model() if "vector_db" not in st.session_state: st.session_state["vector_db"] = load_vectorstore(persist_directory=persist_directory, embeddings=st.session_state["embeddings"]) # if "text_chunks" not in st.session_state: # st.session_state["text_chunks"] = load_text_chunks(text_chunks_pkl_dir=all_docs_pkl_directory) if "load_retriver" not in st.session_state: st.session_state["load_retriver"] = load_retriver(chroma_vectorstore=st.session_state["vector_db"] ) if "conversation_chain" not in st.session_state: st.session_state["conversation_chain"] = load_conversational_retrievel_chain(retriever=st.session_state["load_retriver"], llm=st.session_state["llm"]) # App title st.set_page_config( page_title="OMP Search Bot", layout="wide", initial_sidebar_state="expanded", ) st.markdown(""" """, unsafe_allow_html=True) # To get header in the App col1, col2= st.columns(2) title1 = """

GSA Procurement Services Assistant

""" def clear_chat_history(): st.session_state.messages = [{"role": "assistant", "content": "How may I assist you today?"}] file_ = open("logo.png", "rb") contents = file_.read() data_url = base64.b64encode(contents).decode("utf-8") file_.close() st.markdown( f"""
OPM Logo
{title1} """, unsafe_allow_html=True ) st.write("") st.write('

The Procurement Services Digital AI Assistant is a quantum leap in GSA’s strategic goal of delivering better services to the public using modern technology. This AI enabled assistant makes it easy for citizens to get the information they need from the government by answering questions and providing assistance 24/7. It\'s designed to be user-friendly, making government services more accessible and reliable for all citizens. Just ask away.

', unsafe_allow_html=True) st.markdown("""---""") text_html = """

Type your question in conversational style

Example: what is Electronic Protest Docketing System?

""" st.write(text_html, unsafe_allow_html=True) with st.sidebar: st.subheader("") if st.session_state["vector_db"] and st.session_state["llm"]: # Store LLM generated responses if "messages" not in st.session_state.keys(): st.session_state.messages = [{"role": "assistant", "content": "How may I assist you today?", "Source":""}] # Display or clear chat messages for message in st.session_state.messages: with st.chat_message(message["role"]): st.write(message["content"]) if message["Source"]=="": st.write("") else: with st.expander("source"): for idx, item in enumerate(message["Source"]): st.markdown(item["Page"]) st.markdown(item["Source"]) st.markdown(item["page_content"]) st.write("---") # Initialize the session state to store chat history if "stored_session" not in st.session_state: st.session_state["stored_session"] = [] # Create a list to store expanders if "expanders" not in st.session_state: st.session_state["expanders"] = [] # Define a function to add a new chat expander def add_chat_expander(chat_history): current_timestamp = datetime.datetime.now() timestamp_string = current_timestamp.strftime("%Y-%m-%d %H:%M:%S") st.session_state["expanders"].append({"timestamp": timestamp_string, "chat_history": chat_history}) def clear_chat_history(): """ To remove existing chat history and start new conversation """ stored_session = [] for dict_message in st.session_state.messages: if dict_message["role"] == "user": string_dialogue = "User: " + dict_message["content"] + "\n\n" st.session_state["stored_session"].append(string_dialogue) else: string_dialogue = "Assistant: " + dict_message["content"] + "\n\n" st.session_state["stored_session"].append(string_dialogue) stored_session.append(string_dialogue) # Add a new chat expander add_chat_expander(stored_session) st.session_state.messages = [{"role": "assistant", "content": "How may I assist you today?", "Source":""}] st.sidebar.button('New chat', on_click=clear_chat_history, use_container_width=True) st.sidebar.text("") st.sidebar.write('

Chat history

', unsafe_allow_html=True) # Display existing chat expanders for expander_info in st.session_state["expanders"]: with st.sidebar.expander("Conversation ended at:"+"\n\n"+expander_info["timestamp"]): for message in expander_info["chat_history"]: if message.startswith("User:"): st.write(f'{message}', unsafe_allow_html=True) elif message.startswith("Assistant:"): st.write(f'{message}', unsafe_allow_html=True) else: st.write(message) def generate_llm_response(conversation_chain, prompt_input): # output= conversation_chain({'question': prompt_input}) res = conversation_chain(prompt_input) return res['result'] # User-provided prompt if prompt := st.chat_input(disabled= not st.session_state["vector_db"]): st.session_state.messages.append({"role": "user", "content": prompt, "Source":""}) with st.chat_message("user"): st.write(prompt) # Generate a new response if last message is not from assistant if st.session_state.messages[-1]["role"] != "assistant": with st.chat_message("assistant"): with st.spinner("Searching..."): start = timeit.default_timer() response = generate_llm_response(conversation_chain=st.session_state["conversation_chain"], prompt_input=prompt) placeholder = st.empty() full_response = '' for item in response: full_response += item placeholder.markdown(full_response) # The following logic will work in the way given below. # -- Check if intermediary steps are present in the output of the given prompt. # -- If not, we can conclude that, agent has used internet search as tool. # -- Check if intermediary steps are present in the output of the prompt. # -- If intermediary steps are present, it means agent has used exising custom knowledge base for iformation retrival and therefore we need to give souce docs as output along with LLM's reponse. if response: st.text("-------------------------------------") docs= st.session_state["load_retriver"].get_relevant_documents(prompt) source_doc_list= [] for doc in docs: source_doc_list.append(doc.dict()) merged_source_doc= [] with st.expander("source"): for idx, item in enumerate(source_doc_list): source_doc = {"Page": f"Source {idx + 1}", "Source": f"**Source:** {item['metadata']['source'].split('/')[-1]}", "page_content":item["page_content"]} merged_source_doc.append(source_doc) st.markdown(f"Source {idx + 1}") st.markdown(f"**Source:** {item['metadata']['source'].split('/')[-1]}") st.markdown(item["page_content"]) st.write("---") # Add a separator between entries message = {"role": "assistant", "content": full_response, "Source":merged_source_doc} st.session_state.messages.append(message) st.markdown("👍 👎 Create Ticket") # else: # with st.expander("source"): # message = {"role": "assistant", "content": full_response, "Source":""} # st.session_state.messages.append(message) end = timeit.default_timer() print(f"Time to retrieve response: {end - start}")