jeevan
locally working
ef80283
import os
import uuid
from dotenv import load_dotenv
from langchain_text_splitters import RecursiveCharacterTextSplitter
from langchain_community.document_loaders import PyMuPDFLoader
from qdrant_client import QdrantClient
from qdrant_client.http.models import Distance, VectorParams
from langchain.storage import LocalFileStore
from langchain_qdrant import QdrantVectorStore
from langchain.embeddings import CacheBackedEmbeddings
from chainlit.types import AskFileResponse
from operator import itemgetter
from langchain_core.runnables.passthrough import RunnablePassthrough
import chainlit as cl
from langchain_core.runnables.config import RunnableConfig
from langchain_huggingface import HuggingFaceEndpoint
from langchain_huggingface.embeddings import HuggingFaceEndpointEmbeddings
from langchain_core.prompts import PromptTemplate
load_dotenv()
YOUR_LLM_ENDPOINT_URL = os.environ["YOUR_LLM_ENDPOINT_URL"]
YOUR_EMBED_MODEL_URL = os.environ["YOUR_EMBED_MODEL_URL"]
RAG_PROMPT_TEMPLATE = """\
<|start_header_id|>system<|end_header_id|>
You are a helpful assistant. You answer user questions based on provided context. If you can't answer the question with the provided context, say you don't know.<|eot_id|>
<|start_header_id|>user<|end_header_id|>
User Query:
{query}
Context:
{context}<|eot_id|>
<|start_header_id|>assistant<|end_header_id|>
"""
text_splitter = RecursiveCharacterTextSplitter(chunk_size=600, chunk_overlap=100)
hf_llm = HuggingFaceEndpoint(
endpoint_url=f"{YOUR_LLM_ENDPOINT_URL}",
max_new_tokens=300,
top_k=10,
top_p=0.95,
typical_p=0.95,
temperature=0.01,
repetition_penalty=1.03,
huggingfacehub_api_token=os.environ["HF_TOKEN"]
)
hf_embeddings = HuggingFaceEndpointEmbeddings(
model=os.environ["YOUR_EMBED_MODEL_URL"],
task="feature-extraction",
huggingfacehub_api_token=os.environ["HF_TOKEN"],
)
rag_prompt = PromptTemplate.from_template(RAG_PROMPT_TEMPLATE)
def process_file(file: AskFileResponse):
import tempfile
with tempfile.NamedTemporaryFile(mode="w", delete=False) as tempfile:
with open(tempfile.name, "wb") as f:
f.write(file.content)
Loader = PyMuPDFLoader
loader = Loader(tempfile.name)
documents = loader.load()
docs = text_splitter.split_documents(documents)
for i, doc in enumerate(docs):
doc.metadata["source"] = f"source_{i}"
return docs
@cl.on_chat_start
async def on_chat_start():
files = None
while files == None:
files = await cl.AskFileMessage(
content="Please upload a PDF file to begin!",
accept=["application/pdf"],
max_size_mb=20,
timeout=180,
max_files=1
).send()
file = files[0]
msg = cl.Message(
content=f"Processing `{file.name}`...",
)
await msg.send()
docs = process_file(file)
# QDrant Client Set-up
collection_name = f"pdf_to_parse_{uuid.uuid4()}"
client = QdrantClient(":memory:")
client.create_collection(
collection_name=collection_name,
vectors_config=VectorParams(size=768, distance=Distance.COSINE),
)
# Adding cache!
# store = LocalFileStore("./cache/")
# cached_embedder = CacheBackedEmbeddings.from_bytes_store(
# hf_embeddings, store, namespace=hf_embeddings.model
# )
# Typical QDrant Vector Store Set-up
vectorstore = QdrantVectorStore(
client=client,
collection_name=collection_name,
embedding=hf_embeddings)
retriever = vectorstore.as_retriever(search_type="mmr", search_kwargs={"k": 3})
for i in range(0, len(docs), 32):
if i == 0:
retriever.add_documents(docs[i:i+32])
continue
retriever.add_documents(docs[i:i+32])
retrieval_augmented_qa_chain = (
{"context": itemgetter("query") | retriever, "query": itemgetter("query")}| rag_prompt | hf_llm
)
# Let the user know that the system is ready
msg.content = f"Processing `{file.name}` done. You can now ask questions!"
await msg.update()
cl.user_session.set("chain", retrieval_augmented_qa_chain)
### Rename Chains ###
@cl.author_rename
def rename(orig_author: str):
""" RENAME CODE HERE """
rename_dict = {"ChatOpenAI": "the Generator...", "VectorStoreRetriever": "the Retriever..."}
return rename_dict.get(orig_author, orig_author)
### On Message Section ###
@cl.on_message
async def main(message: cl.Message):
runnable = cl.user_session.get("chain")
msg = cl.Message(content="")
async for chunk in runnable.astream(
{"query": message.content},
config=RunnableConfig(callbacks=[cl.LangchainCallbackHandler()]),
):
await msg.stream_token(chunk)
await msg.send()
if __name__ == "__main__":
from chainlit.cli import run_chainlit
run_chainlit(__file__)