import os import shutil import streamlit as st import requests from bs4 import BeautifulSoup from langchain_core.prompts import ChatPromptTemplate from langchain_core.output_parsers import StrOutputParser from langchain_core.runnables import RunnablePassthrough from langchain_community.llms import Together from langchain_community.vectorstores import FAISS from langchain_community.document_loaders import UnstructuredPDFLoader from langchain_community.document_loaders import UnstructuredWordDocumentLoader from langchain_community.document_loaders import UnstructuredExcelLoader from langchain.text_splitter import CharacterTextSplitter from langchain.embeddings import HuggingFaceEmbeddings # Set API key os.environ["TOGETHER_API_KEY"] = os.getenv("TOGETHER_API_KEY") def inference(chain, input_query): """Invoke the processing chain with the input query.""" result = chain.invoke(input_query) return result def create_chain(retriever, prompt, model): """Compose the processing chain with the specified components.""" chain = ( {"context": retriever, "question": RunnablePassthrough()} | prompt | model | StrOutputParser() ) return chain def generate_prompt(): """Define the prompt template for question answering.""" template = """[INST] Answer the question in a simple sentence based only on the following context: {context} Question: {question} [/INST] """ return ChatPromptTemplate.from_template(template) def configure_model(): """Configure the language model with specified parameters.""" return Together( model="mistralai/Mixtral-8x7B-Instruct-v0.1", temperature=0.1, max_tokens=3000, top_k=50, top_p=0.7, repetition_penalty=1.1, ) def configure_retriever(documents): """Configure the retriever with embeddings and a FAISS vector store.""" embeddings = HuggingFaceEmbeddings(model_name="sentence-transformers/all-MiniLM-L6-v2") vector_db = FAISS.from_documents(documents, embeddings) return vector_db.as_retriever() def load_pdf_documents(path): """Load and preprocess PDF documents from the specified path.""" documents = [] for file in os.listdir(path): if file.endswith('.pdf'): filepath = os.path.join(path, file) loader = UnstructuredPDFLoader(filepath) documents.extend(loader.load()) return documents def load_word_documents(path): """Load and preprocess Word documents from the specified path.""" documents = [] for file in os.listdir(path): if file.endswith('.docx'): filepath = os.path.join(path, file) loader = UnstructuredWordDocumentLoader(filepath) documents.extend(loader.load()) return documents def load_excel_documents(path): """Load and preprocess Excel documents from the specified path.""" documents = [] for file in os.listdir(path): if file.endswith('.xlsx'): filepath = os.path.join(path, file) loader = UnstructuredExcelLoader(filepath) documents.extend(loader.load()) return documents def load_documents(path): """Load and preprocess documents from PDF, Word, and Excel files.""" pdf_docs = load_pdf_documents(path) word_docs = load_word_documents(path) excel_docs = load_excel_documents(path) return pdf_docs + word_docs + excel_docs def scrape_url(url): """Scrape content from a given URL and save it to a text file.""" try: response = requests.get(url) response.raise_for_status() # Ensure we notice bad responses soup = BeautifulSoup(response.content, 'html.parser') text = soup.get_text() # Save the text content to a file for processing text_file_path = "data/scraped_content.txt" with open(text_file_path, "w") as file: file.write(text) return text_file_path except requests.RequestException as e: st.error(f"Error fetching the URL: {e}") return None def process_document(path, input_query): """Process the document by setting up the chain and invoking it with the input query.""" documents = load_documents(path) if not documents: st.error("No documents found. Please check the uploaded files or scraped content.") return "No documents found." text_splitter = CharacterTextSplitter(chunk_size=18000, chunk_overlap=10) split_docs = text_splitter.split_documents(documents) if not split_docs: st.error("No text could be extracted from the documents.") return "No text could be extracted." llm_model = configure_model() prompt = generate_prompt() retriever = configure_retriever(split_docs) chain = create_chain(retriever, prompt, llm_model) response = inference(chain, input_query) return response def main(): """Main function to run the Streamlit app.""" tmp_folder = '/tmp/1' os.makedirs(tmp_folder, exist_ok=True) st.title("Q&A Document AI RAG Chatbot") uploaded_files = st.sidebar.file_uploader("Choose PDF, Word, or Excel files", accept_multiple_files=True, type=['pdf', 'docx', 'xlsx']) if uploaded_files: for file in uploaded_files: with open(os.path.join(tmp_folder, file.name), 'wb') as f: f.write(file.getbuffer()) st.success('Files successfully uploaded. Start prompting!') if 'chat_history' not in st.session_state: st.session_state.chat_history = [] if uploaded_files: with st.form(key='question_form'): user_query = st.text_input("Ask a question:", key="query_input") if st.form_submit_button("Ask") and user_query: response = process_document(tmp_folder, user_query) if response: # Check if response is not empty st.session_state.chat_history.append({"question": user_query, "answer": response}) if st.button("Clear Chat History"): st.session_state.chat_history = [] for chat in st.session_state.chat_history: st.markdown(f"**Q:** {chat['question']}") st.markdown(f"**A:** {chat['answer']}") st.markdown("---") else: st.success('Upload Documents to Start Processing!') url_input = st.sidebar.text_input("Or enter a URL to scrape content from:") if st.sidebar.button("Scrape URL"): if url_input: file_path = scrape_url(url_input) if file_path: documents = load_documents(tmp_folder) if documents: # Check if documents are loaded after scraping response = process_document(tmp_folder, "What is the content of the URL?") if response: # Check if response is not empty st.session_state.chat_history.append({"question": "What is the content of the URL?", "answer": response}) st.success("URL content processed successfully!") else: st.error("Failed to load any documents from the scraped URL content.") else: st.error("Failed to process URL content.") else: st.warning("Please enter a valid URL.") if st.sidebar.button("REMOVE UPLOADED FILES"): document_count = os.listdir(tmp_folder) if len(document_count) > 0: shutil.rmtree(tmp_folder) st.sidebar.write("FILES DELETED SUCCESSFULLY!") else: st.sidebar.write("NO DOCUMENT FOUND TO DELETE! PLEASE UPLOAD DOCUMENTS TO START PROCESS!") if __name__ == "__main__": main()