Spaces:
Starting
on
T4
Starting
on
T4
import os | |
import sys | |
import copy | |
import time | |
import numpy as np | |
import streamlit as st | |
from typing import Optional | |
from stqdm import stqdm | |
from numpy import ndarray | |
from typing import Iterable | |
from qdrant_client import QdrantClient, models | |
from fastembed.sparse.splade_pp import supported_splade_models | |
from fastembed import SparseTextEmbedding, SparseEmbedding | |
from langchain_community.llms.exllamav2 import ExLlamaV2 | |
from langchain_core.callbacks import CallbackManager, StreamingStdOutCallbackHandler | |
from fastembed_ext import FastEmbedEmbeddingsLc | |
from langchain_community.document_loaders.wikipedia import WikipediaLoader | |
from langchain_community.document_loaders.unstructured import UnstructuredFileLoader | |
from langchain_experimental.text_splitter import SemanticChunker | |
from langchain_core.documents import Document | |
from qdrant_client.models import ( | |
NamedSparseVector, | |
NamedVector, | |
SparseVector, | |
PointStruct, | |
SearchRequest, | |
ScoredPoint, | |
) | |
from langchain_core.prompts import PromptTemplate | |
from langchain.chains.summarize import load_summarize_chain | |
from huggingface_hub import snapshot_download | |
from exllamav2.generator import ExLlamaV2Sampler | |
MAP_PROMPT = """ | |
You will be given a single passage of a book. This section will be enclosed in triple backticks (```) | |
Your goal is to give a summary of this section so that a reader will have a full understanding of what happened. | |
Your response should be at least three paragraphs and fully encompass what said in the passage. | |
```{text}``` | |
FULL SUMMARY: | |
""" | |
COMBINE_PROMPT = """ | |
You will be given a series of summaries from a book. The summaries will be enclosed in triple backticks (```) | |
Your goal is to give a verbose summary of what happened in the story. | |
The reader should be able to grasp what happened in the book. | |
```{text}``` | |
VERBOSE SUMMARY: | |
""" | |
supported_splade_models[0] = { | |
"model": "prithivida/Splade_PP_en_v2", | |
"vocab_size": 30522, | |
"description": "Implementation of SPLADE++ Model for English v2", | |
"size_in_GB": 0.532, | |
"sources": { | |
"hf": "devve1/Splade_PP_en_v2_onnx" | |
}, | |
"model_file": "model.onnx" | |
} | |
def make_points(chunks: list[str], dense: list[ndarray], sparse)-> Iterable[PointStruct]: | |
for idx, ((indices, values), chunk, dense_vector) in stqdm(enumerate(zip(sparse, chunks, dense)), desc='Save embeddings on disk...', backend=True): | |
sparse_vector = SparseVector(indices=indices.tolist(), values=values.tolist()) | |
point = PointStruct( | |
id=idx, | |
vector={ | |
"text-sparse": sparse_vector, | |
"text-dense": dense_vector, | |
}, | |
payload={ | |
"text": chunk | |
} | |
) | |
yield point | |
def search(client: QdrantClient, collection_name: str, dense: ndarray, sparse: list[SparseEmbedding]): | |
search_results = client.search_batch( | |
collection_name, | |
[ | |
SearchRequest( | |
vector=NamedVector( | |
name="text-dense", | |
vector=dense, | |
), | |
limit=10 | |
), | |
SearchRequest( | |
vector=NamedSparseVector( | |
name="text-sparse", | |
vector=SparseVector( | |
indices=sparse[0].indices.tolist(), | |
values=sparse[0].values.tolist(), | |
), | |
), | |
limit=10 | |
), | |
], | |
) | |
return search_results | |
def rank_list(search_result: list[ScoredPoint]): | |
return [(point.id, rank + 1) for rank, point in enumerate(search_result)] | |
def rrf(rank_lists, alpha=60, default_rank=1000): | |
""" | |
Optimized Reciprocal Rank Fusion (RRF) using NumPy for large rank lists. | |
:param rank_lists: A list of rank lists. Each rank list should be a list of (item, rank) tuples. | |
:param alpha: The parameter alpha used in the RRF formula. Default is 60. | |
:param default_rank: The default rank assigned to items not present in a rank list. Default is 1000. | |
:return: Sorted list of items based on their RRF scores. | |
""" | |
all_items = set(item for rank_list in rank_lists for item, _ in rank_list) | |
item_to_index = {item: idx for idx, item in enumerate(all_items)} | |
rank_matrix = np.full((len(all_items), len(rank_lists)), default_rank) | |
for list_idx, rank_list in enumerate(rank_lists): | |
for item, rank in rank_list: | |
rank_matrix[item_to_index[item], list_idx] = rank | |
rrf_scores = np.sum(1.0 / (alpha + rank_matrix), axis=1) | |
sorted_indices = np.argsort(-rrf_scores) | |
sorted_items = [(list(item_to_index.keys())[idx], rrf_scores[idx]) for idx in sorted_indices] | |
return sorted_items | |
def main(query: str, client: QdrantClient, collection_name: str, llm, dense_model, sparse_model): | |
# name = 'Kia_EV6' | |
# filepath = os.path.join(os.getcwd(), name + '.pdf') | |
# docs = UnstructuredFileLoader( | |
# file_path=filepath, | |
# mode='elements', | |
# strategy='hi_res', | |
# skip_infer_table_types=['png', 'pdf', 'jpg', 'xls', 'xlsx', 'heic'], | |
# hi_res_model_name='yolox', | |
# include_page_breaks=True | |
# ) | |
# docs = docs.load() | |
dense_query = list(dense_model.embed_query(query, 32)) | |
sparse_query = list(sparse_model.embed(query, 32)) | |
search_results = search( | |
client, | |
collection_name, | |
dense_query, | |
sparse_query | |
) | |
dense_rank_list, sparse_rank_list = rank_list(search_results[0]), rank_list(search_results[1]) | |
rrf_rank_list = rrf([dense_rank_list, sparse_rank_list]) | |
records_list = client.retrieve( | |
collection_name, | |
[item[0] for item in stqdm(rrf_rank_list, desc='Retrieve relevant chunk of texts...', backend=True)] | |
) | |
docs = [Document(record.payload['text']) for record in records_list[:3]] | |
print(docs) | |
map_prompt = PromptTemplate( | |
template=MAP_PROMPT, | |
input_variables=['text'] | |
) | |
combine_prompt = PromptTemplate( | |
template=COMBINE_PROMPT, | |
input_variables=['text'] | |
) | |
map_chain = load_summarize_chain(llm, | |
"stuff", | |
prompt=map_prompt | |
) | |
summary_list = [] | |
for doc in docs: | |
chunk_summary = map_chain.invoke([doc]) | |
summary_list.append(chunk_summary['output_text']) | |
summaries = Document(page_content="\n".join(summary_list)) | |
reduce_chain = load_summarize_chain(llm, | |
"stuff", | |
prompt=combine_prompt | |
) | |
output = reduce_chain.invoke([summaries]) | |
return output['output_text'] | |
def load_models_and_components(show_spinner="Loading models..."): | |
settings = ExLlamaV2Sampler.Settings() | |
settings.temperature = 0.75 | |
settings.top_k = 50 | |
settings.top_p = 0.8 | |
settings.token_repetition_penalty = 1.05 | |
model_path = snapshot_download(repo_id='Zoyd/NousResearch_Hermes-2-Theta-Llama-3-8B-6_5bpw_exl2') | |
callbacks = [StreamingStdOutCallbackHandler()] | |
llm = ExLlamaV2( | |
model_path=model_path, | |
callbacks=callbacks, | |
settings=settings, | |
streaming=True, | |
max_new_tokens=3000 | |
) | |
provider = ['CPUExecutionProvider'] | |
sparse_model = SparseTextEmbedding( | |
'prithivida/Splade_PP_en_v2', | |
cache_dir=os.getenv('HF_HOME'), | |
providers=provider | |
) | |
dense_model = FastEmbedEmbeddingsLc( | |
model_name='mixedbread-ai/mxbai-embed-large-v1', | |
providers=provider, | |
cache_dir=os.getenv('HF_HOME'), | |
batch_size=32 | |
) | |
return llm, dense_model, sparse_model | |
def chunk_documents(docs, dense_model, sparse_model, show_spinner="Parsing and chunking texts..."): | |
text_splitter = SemanticChunker( | |
dense_model, | |
breakpoint_threshold_type='standard_deviation' | |
) | |
documents = [doc.page_content for doc in text_splitter.transform_documents(list(docs))] | |
dense_embeddings = _dense_model.embed_documents(stqdm(documents,desc='Generate dense embeddings...', backend=True), 32) | |
sparse_embeddings = list(_sparse_model.embed(stqdm(documents, desc='Generate sparse embeddings...', backend=True), 32)) | |
return documents, dense_embeddings, sparse_embeddings | |
if __name__ == '__main__': | |
st.set_page_config(page_title="Video Game Assistant", | |
layout="wide" | |
) | |
llm, dense_model, sparse_model = load_models_and_components() | |
client = QdrantClient(url="http://localhost:6333") | |
collection_name = 'collection_demo' | |
if not client.collection_exists(collection_name): | |
client.create_collection( | |
collection_name, | |
{ | |
"text-dense": models.VectorParams( | |
size=1024, | |
distance=models.Distance.COSINE, | |
on_disk=True, | |
quantization_config=models.BinaryQuantization( | |
binary=models.BinaryQuantizationConfig( | |
always_ram=False | |
) | |
) | |
) | |
}, | |
{ | |
"text-sparse": models.SparseVectorParams( | |
index=models.SparseIndexParams( | |
on_disk=True | |
) | |
) | |
}, | |
2, | |
optimizers_config=models.OptimizersConfigDiff( | |
memmap_threshold=10000 | |
), | |
hnsw_config=models.HnswConfigDiff( | |
on_disk=True, | |
m=16, | |
ef_construct=100 | |
) | |
) | |
docs = WikipediaLoader(query='Action-RPG').load() | |
chunks, dense, sparse = chunk_documents(docs, dense_model, sparse_model) | |
client.upload_points( | |
collection_name, | |
make_points( | |
chunks, | |
dense, | |
sparse | |
) | |
) | |
st.title("Video Game Assistant") | |
if "messages" not in st.session_state: | |
st.session_state.messages = [] | |
for message in st.session_state.messages: | |
with st.chat_message(message["role"]): | |
st.markdown(message["content"]) | |
if prompt := st.chat_input("Message Video Game Assistant"): | |
st.chat_message("user").markdown(prompt) | |
st.session_state.messages.append({"role": "user", "content": prompt}) | |
ai_response = main(prompt, client, collection_name, llm, dense_model, sparse_model) | |
response = f"Echo: {ai_response}" | |
with st.chat_message("assistant"): | |
message_placeholder = st.empty() | |
full_response = "" | |
for chunk in re.split(r'(\s+)', response): | |
full_response += chunk + " " | |
time.sleep(0.01) | |
message_placeholder.markdown(full_response + "▌") | |
st.session_state.messages.append({"role": "assistant", "content": full_response}) | |