Spaces:
Starting
on
T4
Starting
on
T4
import os | |
import re | |
import time | |
import torch | |
import msgpack | |
import numpy as np | |
import streamlit as st | |
from numpy import ndarray | |
from scipy.sparse import csr_matrix, save_npz, load_npz, vstack | |
from qdrant_client import QdrantClient, models | |
from langchain_community.llms.llamacpp import LlamaCpp | |
from langchain_community.document_loaders.wikipedia import WikipediaLoader | |
from langchain_community.document_loaders.unstructured import UnstructuredFileLoader | |
from langchain_core.prompts import PromptTemplate | |
from langchain.chains.summarize import load_summarize_chain | |
from langchain_experimental.text_splitter import SemanticChunker | |
from fastembed.sparse.splade_pp import supported_splade_models | |
from fastembed import SparseTextEmbedding, SparseEmbedding | |
from fastembed_ext import FastEmbedEmbeddingsLc | |
from langchain_core.documents import Document | |
from huggingface_hub import hf_hub_download | |
from qdrant_client.models import ( | |
NamedSparseVector, | |
NamedVector, | |
SparseVector, | |
PointStruct, | |
SearchRequest, | |
ScoredPoint, | |
) | |
MAP_PROMPT = """ | |
You will be given a single passage of a book. This section will be enclosed in triple backticks (```) | |
Your goal is to give a summary of this section so that a reader will have a full understanding of what happened. | |
Your response should be at least three paragraphs and fully encompass what said in the passage. | |
```{text}``` | |
FULL SUMMARY: | |
""" | |
COMBINE_PROMPT = """ | |
You will be given a series of summaries from a book. The summaries will be enclosed in triple backticks (```) | |
Your goal is to give a verbose summary of what happened in the story. | |
The reader should be able to grasp what happened in the book. | |
```{text}``` | |
VERBOSE SUMMARY: | |
""" | |
def make_points(chunks: list[str], dense: list[ndarray], sparse)-> list[PointStruct]: | |
points = [] | |
for idx, (sparse_vector, chunk, dense_vector) in enumerate(zip(sparse, chunks, dense)): | |
sparse_vec = SparseVector(indices=sparse_vector.indices.tolist(), values=sparse_vector.values.tolist()) | |
point = PointStruct( | |
id=idx, | |
vector={ | |
"text-sparse": sparse_vector, | |
"text-dense": dense_vector, | |
}, | |
payload={ | |
"text": chunk | |
} | |
) | |
points.append(point) | |
return points | |
def search(client: QdrantClient, collection_name: str, dense, sparse): | |
search_results = client.search_batch( | |
collection_name, | |
[ | |
SearchRequest( | |
vector=NamedVector( | |
name="text-dense", | |
vector=dense, | |
), | |
limit=10 | |
), | |
SearchRequest( | |
vector=NamedSparseVector( | |
name="text-sparse", | |
vector=SparseVector( | |
indices=sparse[0].indices.tolist(), | |
values=sparse[0].values.tolist(), | |
), | |
), | |
limit=10 | |
), | |
], | |
) | |
return search_results | |
def rank_list(search_result: list[ScoredPoint]): | |
return [(point.id, rank + 1) for rank, point in enumerate(search_result)] | |
def rrf(rank_lists, alpha=60, default_rank=1000): | |
""" | |
Optimized Reciprocal Rank Fusion (RRF) using NumPy for large rank lists. | |
:param rank_lists: A list of rank lists. Each rank list should be a list of (item, rank) tuples. | |
:param alpha: The parameter alpha used in the RRF formula. Default is 60. | |
:param default_rank: The default rank assigned to items not present in a rank list. Default is 1000. | |
:return: Sorted list of items based on their RRF scores. | |
""" | |
all_items = set(item for rank_list in rank_lists for item, _ in rank_list) | |
item_to_index = {item: idx for idx, item in enumerate(all_items)} | |
rank_matrix = np.full((len(all_items), len(rank_lists)), default_rank) | |
for list_idx, rank_list in enumerate(rank_lists): | |
for item, rank in rank_list: | |
rank_matrix[item_to_index[item], list_idx] = rank | |
rrf_scores = np.sum(1.0 / (alpha + rank_matrix), axis=1) | |
sorted_indices = np.argsort(-rrf_scores) | |
sorted_items = [(list(item_to_index.keys())[idx], rrf_scores[idx]) for idx in sorted_indices] | |
return sorted_items | |
def main(query: str, client: QdrantClient, collection_name: str, llm, dense_model, sparse_model): | |
# name = 'Kia_EV6' | |
# filepath = os.path.join(os.getcwd(), name + '.pdf') | |
# docs = UnstructuredFileLoader( | |
# file_path=filepath, | |
# mode='elements', | |
# strategy='hi_res', | |
# skip_infer_table_types=['png', 'pdf', 'jpg', 'xls', 'xlsx', 'heic'], | |
# hi_res_model_name='yolox', | |
# include_page_breaks=True | |
# ) | |
# docs = docs.load() | |
dense_query = list(dense_model.embed_query(query, 32)) | |
sparse_query = list(sparse_model.embed(query, 32)) | |
search_results = search( | |
client, | |
collection_name, | |
dense_query, | |
sparse_query | |
) | |
dense_rank_list, sparse_rank_list = rank_list(search_results[0]), rank_list(search_results[1]) | |
rrf_rank_list = rrf([dense_rank_list, sparse_rank_list]) | |
records_list = client.retrieve( | |
collection_name, | |
[item[0] for item in rrf_rank_list] | |
) | |
docs = [Document(record.payload['text']) for record in records_list[:3]] | |
map_prompt = PromptTemplate( | |
template=MAP_PROMPT, | |
input_variables=['text'] | |
) | |
combine_prompt = PromptTemplate( | |
template=COMBINE_PROMPT, | |
input_variables=['text'] | |
) | |
map_chain = load_summarize_chain(llm, | |
"stuff", | |
prompt=map_prompt | |
) | |
summary_list = [] | |
for doc in docs: | |
chunk_summary = map_chain.invoke([doc]) | |
summary_list.append(chunk_summary['output_text']) | |
summaries = Document(page_content="\n".join(summary_list)) | |
reduce_chain = load_summarize_chain(llm, | |
"stuff", | |
prompt=combine_prompt | |
) | |
output = reduce_chain.invoke([summaries]) | |
return output['output_text'] | |
def load_models_and_documents(): | |
supported_splade_models[0] = { | |
"model": "prithivida/Splade_PP_en_v2", | |
"vocab_size": 30522, | |
"description": "Implementation of SPLADE++ Model for English v2", | |
"size_in_GB": 0.532, | |
"sources": { | |
"hf": "devve1/Splade_PP_en_v2_onnx" | |
}, | |
"model_file": "model.onnx" | |
} | |
with st.spinner('Load models...'): | |
model_path = hf_hub_download(repo_id='NousResearch/Hermes-2-Theta-Llama-3-8B-GGUF', | |
filename='Hermes-2-Pro-Llama-3-Instruct-Merged-DPO-Q8_0.gguf' | |
) | |
llm = LlamaCpp( | |
model_path=model_path, | |
n_ctx=8192, | |
max_tokens=3000, | |
n_gpu_layers=-1, | |
n_batch=512, | |
f16_kv=True | |
) | |
provider = ['CPUExecutionProvider'] | |
dense_model = FastEmbedEmbeddingsLc( | |
model_name='mixedbread-ai/mxbai-embed-large-v1', | |
providers=provider, | |
cache_dir=os.getenv('HF_HOME'), | |
batch_size=32 | |
) | |
sparse_model = SparseTextEmbedding( | |
'prithivida/Splade_PP_en_v2', | |
cache_dir=os.getenv('HF_HOME'), | |
providers=provider | |
) | |
client = QdrantClient(path=os.getenv('HF_HOME')) | |
collection_name = 'collection_demo' | |
if not client.collection_exists(collection_name): | |
client.create_collection( | |
collection_name, | |
{ | |
"text-dense": models.VectorParams( | |
size=1024, | |
distance=models.Distance.COSINE, | |
on_disk=True, | |
quantization_config=models.BinaryQuantization( | |
binary=models.BinaryQuantizationConfig( | |
always_ram=False | |
) | |
) | |
) | |
}, | |
{ | |
"text-sparse": models.SparseVectorParams( | |
index=models.SparseIndexParams( | |
on_disk=True | |
) | |
) | |
}, | |
2, | |
on_disk_payload=True, | |
optimizers_config=models.OptimizersConfigDiff( | |
memmap_threshold=10000, | |
indexing_threshold=0 | |
), | |
hnsw_config=models.HnswConfigDiff( | |
on_disk=True, | |
m=16, | |
ef_construct=100 | |
) | |
) | |
with st.spinner('Parse and chunk documents...'): | |
name = 'action_rpg' | |
embeddings_path = os.path.join(os.getenv('HF_HOME'), 'collection', 'embeddings') | |
chunks_path = os.path.join(embeddings_path, name + '_chunks.msgpack') | |
dense_path = os.path.join(embeddings_path, name + '_dense.npz') | |
sparse_path = os.path.join(embeddings_path, name + '_sparse.npz') | |
if not os.path.exists(embeddings_path): | |
os.mkdir(embeddings_path) | |
docs = WikipediaLoader(query='Action-RPG').load() | |
chunks, dense_embeddings, sparse_embeddings = chunk_documents(docs, dense_model, sparse_model) | |
with open(chunks_path, "wb") as outfile: | |
packed = msgpack.packb(chunks, use_bin_type=True) | |
outfile.write(packed) | |
np.savez_compressed(dense_path, *dense_embeddings) | |
max_index = max(np.max(embedding.indices) for embedding in sparse_embeddings) | |
sparse_matrices = [] | |
for embedding in sparse_embeddings: | |
data = embedding.values | |
indices = embedding.indices | |
indptr = np.array([0, len(data)]) | |
matrix = csr_matrix((data, indices, indptr), shape=(1, max_index + 1)) | |
sparse_matrices.append(matrix) | |
combined_sparse_matrix = vstack(sparse_matrices) | |
save_npz(sparse_path, combined_sparse_matrix) | |
else: | |
with open(chunks_path, "rb") as data_file: | |
byte_data = data_file.read() | |
chunks = msgpack.unpackb(byte_data, raw=False) | |
dense_embeddings = list(np.load(dense_path).values()) | |
sparse_embeddings = [] | |
loaded_sparse_matrix = load_npz(sparse_path) | |
for i in range(loaded_sparse_matrix.shape[0]): | |
row = loaded_sparse_matrix.getrow(i) | |
values = row.data | |
indices = row.indices | |
embedding = SparseEmbedding(values, indices) | |
sparse_embeddings.append(embedding) | |
with st.spinner('Save documents...'): | |
client.upsert( | |
collection_name, | |
make_points( | |
chunks, | |
dense_embeddings, | |
sparse_embeddings | |
) | |
) | |
client.update_collection( | |
collection_name=collection_name, | |
optimizer_config=models.OptimizersConfigDiff(indexing_threshold=20000) | |
) | |
return client, collection_name, llm, dense_model, sparse_model | |
def chunk_documents(docs, dense_model, sparse_model): | |
text_splitter = SemanticChunker( | |
dense_model, | |
breakpoint_threshold_type='standard_deviation' | |
) | |
documents = [doc.page_content for doc in text_splitter.transform_documents(list(docs))] | |
dense_embeddings = dense_model.embed_documents(documents,32) | |
sparse_embeddings = list(sparse_model.embed(documents, 32)) | |
return documents, dense_embeddings, sparse_embeddings | |
if __name__ == '__main__': | |
st.set_page_config(page_title="Video Game Assistant", | |
layout="wide" | |
) | |
st.title("Video Game Assistant :sunglasses:") | |
if 'models_loaded' not in st.session_state: | |
st.session_state.client, st.session_state.collection_name, st.session_state.llm, st.session_state.dense_model, st.session_state.sparse_model = load_models_and_documents() | |
st.session_state.models_loaded = True | |
if st.session.state.models_loaded: | |
if "messages" not in st.session_state: | |
st.session_state.messages = [] | |
for message in st.session_state.messages: | |
with st.chat_message(message["role"]): | |
st.markdown(message["content"]) | |
if prompt := st.chat_input("Message Video Game Assistant"): | |
st.chat_message("user").markdown(prompt) | |
st.session_state.messages.append({"role": "user", "content": prompt}) | |
ai_response = main(prompt, st.session_state.client, st.session_state.collection_name, st.session_state.llm, st.session_state.dense_model, st.session_state.sparse_model) | |
response = f"Echo: {ai_response}" | |
with st.chat_message("assistant"): | |
message_placeholder = st.empty() | |
full_response = "" | |
for chunk in re.split(r'(\s+)', response): | |
full_response += chunk + " " | |
time.sleep(0.01) | |
message_placeholder.markdown(full_response + "▌") | |
st.session_state.messages.append({"role": "assistant", "content": full_response}) |