import os import re import time import torch import msgpack import numpy as np import streamlit as st from numpy import ndarray from scipy.sparse import csr_matrix, save_npz, load_npz, vstack from qdrant_client import QdrantClient, models from langchain_community.llms.llamacpp import LlamaCpp from langchain_community.document_loaders.wikipedia import WikipediaLoader from langchain_community.document_loaders.unstructured import UnstructuredFileLoader from langchain_core.prompts import PromptTemplate from langchain.chains.summarize import load_summarize_chain from langchain_experimental.text_splitter import SemanticChunker from fastembed.sparse.splade_pp import supported_splade_models from fastembed import SparseTextEmbedding, SparseEmbedding from fastembed_ext import FastEmbedEmbeddingsLc from langchain_core.documents import Document from huggingface_hub import hf_hub_download from qdrant_client.models import ( NamedSparseVector, NamedVector, SparseVector, PointStruct, SearchRequest, ScoredPoint, ) MAP_PROMPT = """ You will be given a single passage of a book. This section will be enclosed in triple backticks (```) Your goal is to give a summary of this section so that a reader will have a full understanding of what happened. Your response should be at least three paragraphs and fully encompass what said in the passage. ```{text}``` FULL SUMMARY: """ COMBINE_PROMPT = """ You will be given a series of summaries from a book. The summaries will be enclosed in triple backticks (```) Your goal is to give a verbose summary of what happened in the story. The reader should be able to grasp what happened in the book. ```{text}``` VERBOSE SUMMARY: """ def make_points(chunks: list[str], dense: list[ndarray], sparse)-> list[PointStruct]: points = [] for idx, (sparse_vector, chunk, dense_vector) in enumerate(zip(sparse, chunks, dense)): sparse_vec = SparseVector(indices=sparse_vector.indices.tolist(), values=sparse_vector.values.tolist()) point = PointStruct( id=idx, vector={ "text-sparse": sparse_vector, "text-dense": dense_vector, }, payload={ "text": chunk } ) points.append(point) return points def search(client: QdrantClient, collection_name: str, dense, sparse): search_results = client.search_batch( collection_name, [ SearchRequest( vector=NamedVector( name="text-dense", vector=dense, ), limit=10 ), SearchRequest( vector=NamedSparseVector( name="text-sparse", vector=SparseVector( indices=sparse[0].indices.tolist(), values=sparse[0].values.tolist(), ), ), limit=10 ), ], ) return search_results def rank_list(search_result: list[ScoredPoint]): return [(point.id, rank + 1) for rank, point in enumerate(search_result)] def rrf(rank_lists, alpha=60, default_rank=1000): """ Optimized Reciprocal Rank Fusion (RRF) using NumPy for large rank lists. :param rank_lists: A list of rank lists. Each rank list should be a list of (item, rank) tuples. :param alpha: The parameter alpha used in the RRF formula. Default is 60. :param default_rank: The default rank assigned to items not present in a rank list. Default is 1000. :return: Sorted list of items based on their RRF scores. """ all_items = set(item for rank_list in rank_lists for item, _ in rank_list) item_to_index = {item: idx for idx, item in enumerate(all_items)} rank_matrix = np.full((len(all_items), len(rank_lists)), default_rank) for list_idx, rank_list in enumerate(rank_lists): for item, rank in rank_list: rank_matrix[item_to_index[item], list_idx] = rank rrf_scores = np.sum(1.0 / (alpha + rank_matrix), axis=1) sorted_indices = np.argsort(-rrf_scores) sorted_items = [(list(item_to_index.keys())[idx], rrf_scores[idx]) for idx in sorted_indices] return sorted_items def main(query: str, client: QdrantClient, collection_name: str, llm, dense_model, sparse_model): # name = 'Kia_EV6' # filepath = os.path.join(os.getcwd(), name + '.pdf') # docs = UnstructuredFileLoader( # file_path=filepath, # mode='elements', # strategy='hi_res', # skip_infer_table_types=['png', 'pdf', 'jpg', 'xls', 'xlsx', 'heic'], # hi_res_model_name='yolox', # include_page_breaks=True # ) # docs = docs.load() dense_query = list(dense_model.embed_query(query, 32)) sparse_query = list(sparse_model.embed(query, 32)) search_results = search( client, collection_name, dense_query, sparse_query ) dense_rank_list, sparse_rank_list = rank_list(search_results[0]), rank_list(search_results[1]) rrf_rank_list = rrf([dense_rank_list, sparse_rank_list]) records_list = client.retrieve( collection_name, [item[0] for item in rrf_rank_list] ) docs = [Document(record.payload['text']) for record in records_list[:3]] map_prompt = PromptTemplate( template=MAP_PROMPT, input_variables=['text'] ) combine_prompt = PromptTemplate( template=COMBINE_PROMPT, input_variables=['text'] ) map_chain = load_summarize_chain(llm, "stuff", prompt=map_prompt ) summary_list = [] for doc in docs: chunk_summary = map_chain.invoke([doc]) summary_list.append(chunk_summary['output_text']) summaries = Document(page_content="\n".join(summary_list)) reduce_chain = load_summarize_chain(llm, "stuff", prompt=combine_prompt ) output = reduce_chain.invoke([summaries]) return output['output_text'] def load_models_and_documents(): supported_splade_models[0] = { "model": "prithivida/Splade_PP_en_v2", "vocab_size": 30522, "description": "Implementation of SPLADE++ Model for English v2", "size_in_GB": 0.532, "sources": { "hf": "devve1/Splade_PP_en_v2_onnx" }, "model_file": "model.onnx" } with st.spinner('Load models...'): model_path = hf_hub_download(repo_id='NousResearch/Hermes-2-Theta-Llama-3-8B-GGUF', filename='Hermes-2-Pro-Llama-3-Instruct-Merged-DPO-Q8_0.gguf' ) llm = LlamaCpp( model_path=model_path, n_ctx=8192, max_tokens=3000, n_gpu_layers=-1, n_batch=512, f16_kv=True ) provider = ['CPUExecutionProvider'] dense_model = FastEmbedEmbeddingsLc( model_name='mixedbread-ai/mxbai-embed-large-v1', providers=provider, cache_dir=os.getenv('HF_HOME'), batch_size=32 ) sparse_model = SparseTextEmbedding( 'prithivida/Splade_PP_en_v2', cache_dir=os.getenv('HF_HOME'), providers=provider ) client = QdrantClient(path=os.getenv('HF_HOME')) collection_name = 'collection_demo' if not client.collection_exists(collection_name): client.create_collection( collection_name, { "text-dense": models.VectorParams( size=1024, distance=models.Distance.COSINE, on_disk=True, quantization_config=models.BinaryQuantization( binary=models.BinaryQuantizationConfig( always_ram=False ) ) ) }, { "text-sparse": models.SparseVectorParams( index=models.SparseIndexParams( on_disk=True ) ) }, 2, on_disk_payload=True, optimizers_config=models.OptimizersConfigDiff( memmap_threshold=10000, indexing_threshold=0 ), hnsw_config=models.HnswConfigDiff( on_disk=True, m=16, ef_construct=100 ) ) with st.spinner('Parse and chunk documents...'): name = 'action_rpg' embeddings_path = os.path.join(os.getenv('HF_HOME'), 'collection', 'embeddings') chunks_path = os.path.join(embeddings_path, name + '_chunks.msgpack') dense_path = os.path.join(embeddings_path, name + '_dense.npz') sparse_path = os.path.join(embeddings_path, name + '_sparse.npz') if not os.path.exists(embeddings_path): os.mkdir(embeddings_path) docs = WikipediaLoader(query='Action-RPG').load() chunks, dense_embeddings, sparse_embeddings = chunk_documents(docs, dense_model, sparse_model) with open(chunks_path, "wb") as outfile: packed = msgpack.packb(chunks, use_bin_type=True) outfile.write(packed) np.savez_compressed(dense_path, *dense_embeddings) max_index = max(np.max(embedding.indices) for embedding in sparse_embeddings) sparse_matrices = [] for embedding in sparse_embeddings: data = embedding.values indices = embedding.indices indptr = np.array([0, len(data)]) matrix = csr_matrix((data, indices, indptr), shape=(1, max_index + 1)) sparse_matrices.append(matrix) combined_sparse_matrix = vstack(sparse_matrices) save_npz(sparse_path, combined_sparse_matrix) else: with open(chunks_path, "rb") as data_file: byte_data = data_file.read() chunks = msgpack.unpackb(byte_data, raw=False) dense_embeddings = list(np.load(dense_path).values()) sparse_embeddings = [] loaded_sparse_matrix = load_npz(sparse_path) for i in range(loaded_sparse_matrix.shape[0]): row = loaded_sparse_matrix.getrow(i) values = row.data indices = row.indices embedding = SparseEmbedding(values, indices) sparse_embeddings.append(embedding) with st.spinner('Save documents...'): client.upsert( collection_name, make_points( chunks, dense_embeddings, sparse_embeddings ) ) client.update_collection( collection_name=collection_name, optimizer_config=models.OptimizersConfigDiff(indexing_threshold=20000) ) return client, collection_name, llm, dense_model, sparse_model def chunk_documents(docs, dense_model, sparse_model): text_splitter = SemanticChunker( dense_model, breakpoint_threshold_type='standard_deviation' ) documents = [doc.page_content for doc in text_splitter.transform_documents(list(docs))] dense_embeddings = dense_model.embed_documents(documents,32) sparse_embeddings = list(sparse_model.embed(documents, 32)) return documents, dense_embeddings, sparse_embeddings if __name__ == '__main__': st.set_page_config(page_title="Video Game Assistant", layout="wide" ) st.title("Video Game Assistant :sunglasses:") if 'models_loaded' not in st.session_state: st.session_state.client, st.session_state.collection_name, st.session_state.llm, st.session_state.dense_model, st.session_state.sparse_model = load_models_and_documents() st.session_state.models_loaded = True if st.session.state.models_loaded: if "messages" not in st.session_state: st.session_state.messages = [] for message in st.session_state.messages: with st.chat_message(message["role"]): st.markdown(message["content"]) if prompt := st.chat_input("Message Video Game Assistant"): st.chat_message("user").markdown(prompt) st.session_state.messages.append({"role": "user", "content": prompt}) ai_response = main(prompt, st.session_state.client, st.session_state.collection_name, st.session_state.llm, st.session_state.dense_model, st.session_state.sparse_model) response = f"Echo: {ai_response}" with st.chat_message("assistant"): message_placeholder = st.empty() full_response = "" for chunk in re.split(r'(\s+)', response): full_response += chunk + " " time.sleep(0.01) message_placeholder.markdown(full_response + "▌") st.session_state.messages.append({"role": "assistant", "content": full_response})