import os import sys import copy import time import numpy as np import streamlit as st import bitsandbytes, flash_attn from typing import Optional from stqdm import stqdm from numpy import ndarray from typing import Iterable from huggingface_hub import hf_hub_download from qdrant_client import QdrantClient, models from fastembed.sparse.splade_pp import supported_splade_models from fastembed import SparseTextEmbedding, SparseEmbedding from langchain_community.chat_models.ollama import ChatOllama from langchain_huggingface import HuggingFacePipeline from langchain_core.callbacks import CallbackManager, StreamingStdOutCallbackHandler from fastembed_ext import FastEmbedEmbeddingsLc from langchain_community.document_loaders.wikipedia import WikipediaLoader from langchain_community.document_loaders.unstructured import UnstructuredFileLoader from langchain_experimental.text_splitter import SemanticChunker from langchain_core.documents import Document from qdrant_client.models import ( NamedSparseVector, NamedVector, SparseVector, PointStruct, SearchRequest, ScoredPoint, ) from langchain_core.prompts import PromptTemplate from langchain.chains.summarize import load_summarize_chain from transformers import AutoTokenizer, LlamaForCausalLM MAP_PROMPT = """ You will be given a single passage of a book. This section will be enclosed in triple backticks (```) Your goal is to give a summary of this section so that a reader will have a full understanding of what happened. Your response should be at least three paragraphs and fully encompass what said in the passage. ```{text}``` FULL SUMMARY: """ COMBINE_PROMPT = """ You will be given a series of summaries from a book. The summaries will be enclosed in triple backticks (```) Your goal is to give a verbose summary of what happened in the story. The reader should be able to grasp what happened in the book. ```{text}``` VERBOSE SUMMARY: """ supported_splade_models[0] = { "model": "prithivida/Splade_PP_en_v2", "vocab_size": 30522, "description": "Implementation of SPLADE++ Model for English v2", "size_in_GB": 0.532, "sources": { "hf": "devve1/Splade_PP_en_v2_onnx" }, "model_file": "model.onnx" } def make_points(chunks: list[str], dense: list[ndarray], sparse: list[SparseEmbedding])-> Iterable[PointStruct]: for idx, (chunk, sparse_vector, dense_vector) in stqdm(enumerate(zip(chunks, sparse, dense)), desc='Save embeddings on disk...', backend=True): sparse_vector = SparseVector(indices=sparse_vector.indices.tolist(), values=sparse_vector.values.tolist()) point = PointStruct( id=idx, vector={ "text-sparse": sparse_vector, "text-dense": dense_vector, }, payload={ "text": chunk } ) yield point def search(client: QdrantClient, collection_name: str, dense: ndarray, sparse: list[SparseEmbedding]): search_results = client.search_batch( collection_name, [ SearchRequest( vector=NamedVector( name="text-dense", vector=dense, ), limit=10 ), SearchRequest( vector=NamedSparseVector( name="text-sparse", vector=SparseVector( indices=sparse[0].indices.tolist(), values=sparse[0].values.tolist(), ), ), limit=10 ), ], ) return search_results def rank_list(search_result: list[ScoredPoint]): return [(point.id, rank + 1) for rank, point in enumerate(search_result)] def rrf(rank_lists, alpha=60, default_rank=1000): """ Optimized Reciprocal Rank Fusion (RRF) using NumPy for large rank lists. :param rank_lists: A list of rank lists. Each rank list should be a list of (item, rank) tuples. :param alpha: The parameter alpha used in the RRF formula. Default is 60. :param default_rank: The default rank assigned to items not present in a rank list. Default is 1000. :return: Sorted list of items based on their RRF scores. """ all_items = set(item for rank_list in rank_lists for item, _ in rank_list) item_to_index = {item: idx for idx, item in enumerate(all_items)} rank_matrix = np.full((len(all_items), len(rank_lists)), default_rank) for list_idx, rank_list in enumerate(rank_lists): for item, rank in rank_list: rank_matrix[item_to_index[item], list_idx] = rank rrf_scores = np.sum(1.0 / (alpha + rank_matrix), axis=1) sorted_indices = np.argsort(-rrf_scores) sorted_items = [(list(item_to_index.keys())[idx], rrf_scores[idx]) for idx in sorted_indices] return sorted_items def main(query: str, chunks: list[str], llm, dense_model, sparse_model): # name = 'Kia_EV6' # filepath = os.path.join(os.getcwd(), name + '.pdf') # docs = UnstructuredFileLoader( # file_path=filepath, # mode='elements', # strategy='hi_res', # skip_infer_table_types=['png', 'pdf', 'jpg', 'xls', 'xlsx', 'heic'], # hi_res_model_name='yolox', # include_page_breaks=True # ) # docs = docs.load() client = QdrantClient(location=':memory:') collection_name = 'collection_demo' if not client.collection_exists(collection_name): client.create_collection( collection_name, { "text-dense": models.VectorParams( size=1024, distance=models.Distance.COSINE, on_disk=True, quantization_config=models.BinaryQuantization( binary=models.BinaryQuantizationConfig( always_ram=False ) ) ) }, { "text-sparse": models.SparseVectorParams( index=models.SparseIndexParams( on_disk=True ) ) }, 2, optimizers_config=models.OptimizersConfigDiff( memmap_threshold=10000 ), hnsw_config=models.HnswConfigDiff( on_disk=True, m=16, ef_construct=100 ) ) dense_embeddings = dense_model.embed_documents(stqdm(chunks,desc='Generate dense embeddings...', backend=True), 32) sparse_embeddings = list(sparse_model.embed(stqdm(chunks, desc='Generate sparse embeddings...', backend=True), 32)) client.upload_points( collection_name, make_points( chunks, dense_embeddings, sparse_embeddings ) ) dense_query = list(dense_model.embed_query(query, 32)) sparse_query = list(sparse_model.embed(query, 32)) search_results = search( client, collection_name, dense_query, sparse_query ) dense_rank_list, sparse_rank_list = rank_list(search_results[0]), rank_list(search_results[1]) rrf_rank_list = rrf([dense_rank_list, sparse_rank_list]) records_list = client.retrieve( collection_name, [item[0] for item in stqdm(rrf_rank_list, desc='Retrieve relevant chunk of texts...', backend=True)] ) docs = [Document(record.payload['text']) for record in records_list[:3]] map_prompt = PromptTemplate( template=MAP_PROMPT, input_variables=['text'] ) combine_prompt = PromptTemplate( template=COMBINE_PROMPT, input_variables=['text'] ) map_chain = load_summarize_chain(llm, "stuff", prompt=map_prompt ) summary_list = [] for doc in docs: chunk_summary = map_chain.invoke([doc]) summary_list.append(chunk_summary['output_text']) summaries = Document(page_content="\n".join(summary_list)) reduce_chain = load_summarize_chain(llm, "stuff", prompt=combine_prompt ) output = reduce_chain.invoke([summaries]) return output['output_text'] @st.cache_resource def load_models_and_components(show_spinner="Loading models..."): tokenizer = AutoTokenizer.from_pretrained('NousResearch/Hermes-2-Theta-Llama-3-8B', trust_remote_code=True) model = LlamaForCausalLM.from_pretrained( "NousResearch/Hermes-2-Theta-Llama-3-8B", torch_dtype=torch.float16, device_map="auto", load_in_8bit=True, load_in_4bit=False, use_flash_attention_2=True ) pipe = pipeline(model=model, tokenizer=tokenizer, max_new_tokens=3000, temperature=0.75) llm = HuggingFacePipeline(pipeline=pipe) provider = ['CPUExecutionProvider'] sparse_model = SparseTextEmbedding( 'prithivida/Splade_PP_en_v2', providers=provider ) dense_model = FastEmbedEmbeddingsLc( model_name='mixedbread-ai/mxbai-embed-large-v1', providers=provider, batch_size=32 ) return llm, dense_model, sparse_model @st.cache_data def chunk_documents(_dense_model, show_spinner="Parsing and chunking texts..."): docs = WikipediaLoader(query='Action-RPG').load() text_splitter = SemanticChunker( _dense_model, breakpoint_threshold_type='standard_deviation' ) documents = [doc.page_content for doc in text_splitter.transform_documents(list(docs))] return documents if __name__ == '__main__': st.set_page_config(page_title="Video Game Assistant", layout="wide" ) import torch print(f"Is CUDA available: {torch.cuda.is_available()}") print(f"CUDA device: {torch.cuda.get_device_name(torch.cuda.current_device())}") llm, dense_model, sparse_model = load_models_and_components() chunks = chunk_documents(dense_model) st.title("Video Game Assistant") if "messages" not in st.session_state: st.session_state.messages = [] for message in st.session_state.messages: with st.chat_message(message["role"]): st.markdown(message["content"]) if prompt := st.chat_input("Message Video Game Assistant"): st.chat_message("user").markdown(prompt) st.session_state.messages.append({"role": "user", "content": prompt}) ai_response = main(prompt, chunks, llm, dense_model, sparse_model) response = f"Echo: {ai_response}" with st.chat_message("assistant"): message_placeholder = st.empty() full_response = "" for chunk in re.split(r'(\s+)', response): full_response += chunk + " " time.sleep(0.01) message_placeholder.markdown(full_response + "▌") st.session_state.messages.append({"role": "assistant", "content": full_response})