danicafisher's picture
Update app.py
7569e6d verified
raw
history blame
3.22 kB
from typing import List
from aimakerspace.text_utils import CharacterTextSplitter, PDFFileLoader
from aimakerspace.openai_utils.prompts import (
UserRolePrompt,
SystemRolePrompt
)
from aimakerspace.vectordatabase import VectorDatabase
from aimakerspace.openai_utils.chatmodel import ChatOpenAI
from langchain_community.embeddings import OpenAIEmbeddings
import chainlit as cl
import nest_asyncio
nest_asyncio.apply()
pdf_loader_NIST = PDFFileLoader("data/NIST.AI.600-1.pdf")
pdf_loader_Blueprint = PDFFileLoader("data/Blueprint-for-an-AI-Bill-of-Rights.pdf")
documents_NIST = pdf_loader_NIST.load_documents()
documents_Blueprint = pdf_loader_Blueprint.load_documents()
text_splitter = CharacterTextSplitter()
split_documents_NIST = text_splitter.split_texts(documents_NIST)
split_documents_Blueprint = text_splitter.split_texts(documents_Blueprint)
RAG_PROMPT_TEMPLATE = """ \
Use the provided context to answer the user's query.
You may not answer the user's query unless there is specific context in the following text.
If you do not know the answer, or cannot answer, please respond with "I don't know".
"""
rag_prompt = SystemRolePrompt(RAG_PROMPT_TEMPLATE)
USER_PROMPT_TEMPLATE = """ \
Context:
{context}
User Query:
{user_query}
"""
user_prompt = UserRolePrompt(USER_PROMPT_TEMPLATE)
class RetrievalAugmentedQAPipeline:
def __init__(self, llm: ChatOpenAI(), vector_db_retriever: VectorDatabase) -> None:
self.llm = llm
self.vector_db_retriever = vector_db_retriever
async def arun_pipeline(self, user_query: str):
context_list = self.vector_db_retriever.search_by_text(user_query, k=4)
context_prompt = ""
for context in context_list:
context_prompt += context[0] + "\n"
formatted_system_prompt = rag_prompt.create_message()
formatted_user_prompt = user_prompt.create_message(user_query=user_query, context=context_prompt)
async def generate_response():
async for chunk in self.llm.astream([formatted_system_prompt, formatted_user_prompt]):
yield chunk
return {"response": generate_response(), "context": context_list}
# ------------------------------------------------------------
@cl.on_chat_start
async def start_chat():
settings = {
"model": "gpt-4o-mini"
}
cl.user_session.set("settings", settings)
embeddings = OpenAIEmbeddings("text-embedding-3-small")
# Create a vector store
vector_db = VectorDatabase(embedding_model = embeddings)
vector_db = await vector_db.abuild_from_list(split_documents_NIST)
vector_db = await vector_db.abuild_from_list(split_documents_Blueprint)
chat_openai = ChatOpenAI()
# Create a chain
retrieval_augmented_qa_pipeline = RetrievalAugmentedQAPipeline(
vector_db_retriever=vector_db,
llm=chat_openai
)
cl.user_session.set("chain", retrieval_augmented_qa_pipeline)
@cl.on_message
async def main(message):
chain = cl.user_session.get("chain")
msg = cl.Message(content="")
result = await chain.arun_pipeline(message.content)
async for stream_resp in result["response"]:
await msg.stream_token(stream_resp)
await msg.send()