import os import streamlit as st import pickle import time from langchain import OpenAI from langchain.chains import RetrievalQAWithSourcesChain from langchain.text_splitter import RecursiveCharacterTextSplitter from langchain.document_loaders import UnstructuredURLLoader # from langchain.embeddings import OpenAIEmbeddings from langchain.embeddings import FakeEmbeddings from langchain.llms import HuggingFaceHub from langchain.chains import LLMChain from langchain.vectorstores import FAISS from dotenv import load_dotenv # load_dotenv() # take environment variables from .env (especially openai api key) os.environ["HUGGINGFACEHUB_API_TOKEN"] = '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' st.title("RockyBot: News Research Tool 📈") st.sidebar.title("News Article URLs") urls = [] for i in range(3): url = st.sidebar.text_input(f"URL {i+1}") urls.append(url) process_url_clicked = st.sidebar.button("Process URLs") file_path = "faiss_store_openai.pkl" main_placeholder = st.empty() llm = HuggingFaceHub( repo_id="google/flan-t5-xxl", model_kwargs={"temperature": 0.5, "max_length": 64} ) @st.cache def process_urls(urls): """Processes the given URLs and saves the FAISS index to a pickle file.""" # load data loader = UnstructuredURLLoader(urls=urls) # split data text_splitter = RecursiveCharacterTextSplitter( separators=['\n\n', '\n', '.', ','], chunk_size=1000 ) docs = text_splitter.split_documents(loader.load()) # create embeddings and save it to FAISS index embeddings = FakeEmbeddings(size=1352) vectorstore_openai = FAISS.from_documents(docs, embeddings) # Save the FAISS index to a pickle file with open(file_path, "wb") as f: pickle.dump(vectorstore_openai, f) if process_url_clicked: with st.progress(0.0): process_urls(urls) st.progress(100.0) query = main_placeholder.text_input("Question: ") if query: try: with open(file_path, "rb") as f: vectorstore = pickle.load(f) chain = RetrievalQAWithSourcesChain.from_llm(llm=llm, retriever=vector_store.as_retriever()) result = chain({"question": query}, return_only_outputs=True) # result will be a dictionary of this format --> {"answer": "", "sources": [] } st.header("Answer") st.write(result["answer"]) # Display sources, if available sources = result.get("sources", "") if sources: st.subheader("Sources:") sources_list = sources.split("\n") # Split the sources by newline for source in sources_list: st.write(source) except Exception as e: st.error(e)