|
import os |
|
import sys |
|
import copy |
|
import time |
|
import numpy as np |
|
import streamlit as st |
|
from typing import Optional |
|
from stqdm import stqdm |
|
from numpy import ndarray |
|
from typing import Iterable |
|
from huggingface_hub import hf_hub_download |
|
from qdrant_client import QdrantClient, models |
|
from fastembed.sparse.splade_pp import supported_splade_models |
|
from fastembed import SparseTextEmbedding, SparseEmbedding |
|
from langchain_community.chat_models.ollama import ChatOllama |
|
from langchain_core.callbacks import CallbackManager, StreamingStdOutCallbackHandler |
|
from fastembed_ext import FastEmbedEmbeddingsLc |
|
from langchain_community.document_loaders.wikipedia import WikipediaLoader |
|
from langchain_community.document_loaders.unstructured import UnstructuredFileLoader |
|
from langchain_experimental.text_splitter import SemanticChunker |
|
from langchain_core.documents import Document |
|
from qdrant_client.models import ( |
|
NamedSparseVector, |
|
NamedVector, |
|
SparseVector, |
|
PointStruct, |
|
SearchRequest, |
|
ScoredPoint, |
|
) |
|
from langchain_core.prompts import PromptTemplate |
|
from langchain.chains.summarize import load_summarize_chain |
|
|
|
MAP_PROMPT = """ |
|
You will be given a single passage of a book. This section will be enclosed in triple backticks (```) |
|
Your goal is to give a summary of this section so that a reader will have a full understanding of what happened. |
|
Your response should be at least three paragraphs and fully encompass what said in the passage. |
|
|
|
```{text}``` |
|
FULL SUMMARY: |
|
""" |
|
|
|
COMBINE_PROMPT = """ |
|
You will be given a series of summaries from a book. The summaries will be enclosed in triple backticks (```) |
|
Your goal is to give a verbose summary of what happened in the story. |
|
The reader should be able to grasp what happened in the book. |
|
|
|
```{text}``` |
|
VERBOSE SUMMARY: |
|
""" |
|
|
|
supported_splade_models[0] = { |
|
"model": "prithivida/Splade_PP_en_v2", |
|
"vocab_size": 30522, |
|
"description": "Implementation of SPLADE++ Model for English v2", |
|
"size_in_GB": 0.532, |
|
"sources": { |
|
"hf": "devve1/Splade_PP_en_v2_onnx" |
|
}, |
|
"model_file": "model.onnx" |
|
} |
|
|
|
def make_points(chunks: list[str], dense: list[ndarray], sparse: list[SparseEmbedding])-> Iterable[PointStruct]: |
|
for idx, (chunk, sparse_vector, dense_vector) in stqdm(enumerate(zip(chunks, sparse, dense)), desc='Save embeddings on disk...', backend=True): |
|
sparse_vector = SparseVector(indices=sparse_vector.indices.tolist(), values=sparse_vector.values.tolist()) |
|
point = PointStruct( |
|
id=idx, |
|
vector={ |
|
"text-sparse": sparse_vector, |
|
"text-dense": dense_vector, |
|
}, |
|
payload={ |
|
"text": chunk |
|
} |
|
) |
|
yield point |
|
|
|
def search(client: QdrantClient, collection_name: str, dense: ndarray, sparse: list[SparseEmbedding]): |
|
search_results = client.search_batch( |
|
collection_name, |
|
[ |
|
SearchRequest( |
|
vector=NamedVector( |
|
name="text-dense", |
|
vector=dense, |
|
), |
|
limit=10 |
|
), |
|
SearchRequest( |
|
vector=NamedSparseVector( |
|
name="text-sparse", |
|
vector=SparseVector( |
|
indices=sparse[0].indices.tolist(), |
|
values=sparse[0].values.tolist(), |
|
), |
|
), |
|
limit=10 |
|
), |
|
], |
|
) |
|
|
|
return search_results |
|
|
|
def rank_list(search_result: list[ScoredPoint]): |
|
return [(point.id, rank + 1) for rank, point in enumerate(search_result)] |
|
|
|
def rrf(rank_lists, alpha=60, default_rank=1000): |
|
""" |
|
Optimized Reciprocal Rank Fusion (RRF) using NumPy for large rank lists. |
|
|
|
:param rank_lists: A list of rank lists. Each rank list should be a list of (item, rank) tuples. |
|
:param alpha: The parameter alpha used in the RRF formula. Default is 60. |
|
:param default_rank: The default rank assigned to items not present in a rank list. Default is 1000. |
|
:return: Sorted list of items based on their RRF scores. |
|
""" |
|
all_items = set(item for rank_list in rank_lists for item, _ in rank_list) |
|
item_to_index = {item: idx for idx, item in enumerate(all_items)} |
|
rank_matrix = np.full((len(all_items), len(rank_lists)), default_rank) |
|
|
|
for list_idx, rank_list in enumerate(rank_lists): |
|
for item, rank in rank_list: |
|
rank_matrix[item_to_index[item], list_idx] = rank |
|
|
|
rrf_scores = np.sum(1.0 / (alpha + rank_matrix), axis=1) |
|
sorted_indices = np.argsort(-rrf_scores) |
|
sorted_items = [(list(item_to_index.keys())[idx], rrf_scores[idx]) for idx in sorted_indices] |
|
|
|
return sorted_items |
|
|
|
|
|
def main(query: str, chunks: list[str], llm, dense_model, sparse_model): |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
client = QdrantClient(location=':memory:') |
|
collection_name = 'collection_demo' |
|
|
|
if not client.collection_exists(collection_name): |
|
client.create_collection( |
|
collection_name, |
|
{ |
|
"text-dense": models.VectorParams( |
|
size=1024, |
|
distance=models.Distance.COSINE, |
|
on_disk=True, |
|
quantization_config=models.BinaryQuantization( |
|
binary=models.BinaryQuantizationConfig( |
|
always_ram=False |
|
) |
|
) |
|
) |
|
}, |
|
{ |
|
"text-sparse": models.SparseVectorParams( |
|
index=models.SparseIndexParams( |
|
on_disk=True |
|
) |
|
) |
|
}, |
|
2, |
|
optimizers_config=models.OptimizersConfigDiff( |
|
memmap_threshold=10000 |
|
), |
|
hnsw_config=models.HnswConfigDiff( |
|
on_disk=True, |
|
m=16, |
|
ef_construct=100 |
|
) |
|
) |
|
|
|
dense_embeddings = dense_model.embed_documents(stqdm(chunks,desc='Generate dense embeddings...', backend=True), 32) |
|
sparse_embeddings = list(sparse_model.embed(stqdm(chunks, desc='Generate sparse embeddings...', backend=True), 32)) |
|
|
|
client.upload_points( |
|
collection_name, |
|
make_points( |
|
chunks, |
|
dense_embeddings, |
|
sparse_embeddings |
|
) |
|
) |
|
|
|
dense_query = list(dense_model.embed_query(query, 32)) |
|
sparse_query = list(sparse_model.embed(query, 32)) |
|
|
|
search_results = search( |
|
client, |
|
collection_name, |
|
dense_query, |
|
sparse_query |
|
) |
|
|
|
dense_rank_list, sparse_rank_list = rank_list(search_results[0]), rank_list(search_results[1]) |
|
rrf_rank_list = rrf([dense_rank_list, sparse_rank_list]) |
|
|
|
records_list = client.retrieve( |
|
collection_name, |
|
[item[0] for item in stqdm(rrf_rank_list, desc='Retrieve relevant chunk of texts...', backend=True)] |
|
) |
|
|
|
docs = [Document(record.payload['text']) for record in records_list[:3]] |
|
|
|
map_prompt = PromptTemplate( |
|
template=MAP_PROMPT, |
|
input_variables=['text'] |
|
) |
|
combine_prompt = PromptTemplate( |
|
template=COMBINE_PROMPT, |
|
input_variables=['text'] |
|
) |
|
|
|
map_chain = load_summarize_chain(llm, |
|
"stuff", |
|
prompt=map_prompt |
|
) |
|
|
|
summary_list = [] |
|
|
|
for doc in docs: |
|
chunk_summary = map_chain.invoke([doc]) |
|
summary_list.append(chunk_summary['output_text']) |
|
|
|
summaries = Document(page_content="\n".join(summary_list)) |
|
|
|
reduce_chain = load_summarize_chain(llm, |
|
"stuff", |
|
prompt=combine_prompt |
|
) |
|
|
|
output = reduce_chain.invoke([summaries]) |
|
return output['output_text'] |
|
|
|
@st.cache_resource |
|
def load_models_and_components(show_spinner="Loading models..."): |
|
model_path = hf_hub_download( |
|
repo_id='NousResearch/Hermes-2-Theta-Llama-3-8B-GGUF', |
|
filename='Hermes-2-Pro-Llama-3-Instruct-Merged-DPO-Q8_0.gguf' |
|
) |
|
|
|
llm = ChatOllama( |
|
model=model_name, |
|
num_ctx=8192, |
|
temperature=0, |
|
num_gpu=0, |
|
num_predict=3000 |
|
) |
|
|
|
llm = LlamaCpp( |
|
model_path=model_path, |
|
temperature=0.75, |
|
max_tokens=3000, |
|
n_ctx=8192, |
|
top_p=1, |
|
callback_manager=CallbackManager([StreamingStdOutCallbackHandler()]), |
|
verbose=True, |
|
n_gpu_layers=0, |
|
n_batch=512 |
|
) |
|
|
|
provider = ['CPUExecutionProvider'] |
|
|
|
sparse_model = SparseTextEmbedding( |
|
'prithivida/Splade_PP_en_v2', |
|
providers=provider |
|
) |
|
|
|
dense_model = FastEmbedEmbeddingsLc( |
|
model_name='mixedbread-ai/mxbai-embed-large-v1', |
|
providers=provider, |
|
batch_size=32 |
|
) |
|
|
|
return llm, dense_model, sparse_model |
|
|
|
@st.cache_data |
|
def chunk_documents(_dense_model, show_spinner="Parsing and chunking texts..."): |
|
docs = WikipediaLoader(query='Action-RPG').load() |
|
|
|
text_splitter = SemanticChunker( |
|
_dense_model, |
|
breakpoint_threshold_type='standard_deviation' |
|
) |
|
|
|
documents = [doc.page_content for doc in text_splitter.transform_documents(list(docs))] |
|
return documents |
|
|
|
if __name__ == '__main__': |
|
st.set_page_config(page_title="Video Game Assistant", |
|
layout="wide" |
|
) |
|
|
|
llm, dense_model, sparse_model = load_models_and_components() |
|
chunks = chunk_documents(dense_model) |
|
|
|
st.title("Video Game Assistant") |
|
|
|
if "messages" not in st.session_state: |
|
st.session_state.messages = [] |
|
|
|
for message in st.session_state.messages: |
|
with st.chat_message(message["role"]): |
|
st.markdown(message["content"]) |
|
|
|
if prompt := st.chat_input("Message Video Game Assistant"): |
|
st.chat_message("user").markdown(prompt) |
|
st.session_state.messages.append({"role": "user", "content": prompt}) |
|
|
|
ai_response = main(prompt, chunks, llm, dense_model, sparse_model) |
|
response = f"Echo: {ai_response}" |
|
with st.chat_message("assistant"): |
|
message_placeholder = st.empty() |
|
full_response = "" |
|
for chunk in re.split(r'(\s+)', response): |
|
full_response += chunk + " " |
|
time.sleep(0.01) |
|
message_placeholder.markdown(full_response + "▌") |
|
st.session_state.messages.append({"role": "assistant", "content": full_response}) |
|
|