Spaces:
Running
on
CPU Upgrade
Running
on
CPU Upgrade
import json | |
import os | |
import sys | |
from functools import cache | |
from pathlib import Path | |
import torch | |
from langchain_community.retrievers import QdrantSparseVectorRetriever | |
from langchain_community.vectorstores import Qdrant | |
from langchain_core.documents import Document | |
from langchain_openai.embeddings import OpenAIEmbeddings | |
from qdrant_client import QdrantClient, models | |
from transformers import AutoModelForMaskedLM, AutoTokenizer | |
from data_processing import excel_to_dataframe | |
class DataProcessor: | |
def __init__(self, data_dir: Path): | |
self.data_dir = data_dir | |
def load_practitioners_data(self): | |
try: | |
df = excel_to_dataframe(self.data_dir) | |
practitioners_data = [] | |
for idx, row in df.iterrows(): | |
# I am using dot as a separator for text embeddings | |
content = ". ".join(f"{key}: {value}" for key, value in row.items()) | |
doc = Document(page_content=content, metadata={"row": idx}) | |
practitioners_data.append(doc) | |
return practitioners_data | |
except FileNotFoundError: | |
sys.exit( | |
"Directory or Excel file not found. Please check the path and try again." | |
) | |
def load_tall_tree_data(self): | |
# Check if the file has a .json extension | |
json_files = [ | |
file for file in self.data_dir.iterdir() if file.suffix == ".json" | |
] | |
if not json_files: | |
raise FileNotFoundError("No JSON files found in the specified directory.") | |
if len(json_files) > 1: | |
raise ValueError( | |
"More than one JSON file found in the specified directory." | |
) | |
path = json_files[0] | |
data = self.load_json_file(path) | |
tall_tree_data = self.process_json_data(data) | |
return tall_tree_data | |
def load_json_file(self, path): | |
try: | |
with open(path, "r") as f: | |
data = json.load(f) | |
return data | |
except json.JSONDecodeError: | |
raise ValueError(f"The file {path} is not a valid JSON file.") | |
def process_json_data(self, data): | |
tall_tree_data = [] | |
for idx, (key, value) in enumerate(data.items()): | |
content = f"{key}: {value}" | |
doc = Document(page_content=content, metadata={"row": idx}) | |
tall_tree_data.append(doc) | |
return tall_tree_data | |
class ValidateQdrantClient: | |
"""Base class for retriever clients to ensure environment variables are set.""" | |
def __init__(self): | |
self.validate_environment_variables() | |
def validate_environment_variables(self): | |
"""Check if the Qdrant environment variables are set.""" | |
required_vars = ["QDRANT_API_KEY", "QDRANT_URL"] | |
missing_vars = [var for var in required_vars if not os.getenv(var)] | |
if missing_vars: | |
raise EnvironmentError( | |
f"Missing environment variable(s): {', '.join(missing_vars)}" | |
) | |
class DenseVectorStore(ValidateQdrantClient): | |
"""Store dense data in Qdrant vector database.""" | |
TEXT_EMBEDDING_MODELS = [ | |
"text-embedding-ada-002", | |
"text-embedding-3-small", | |
"text-embedding-3-large", | |
] | |
def __init__( | |
self, | |
documents: list[Document], | |
embeddings_model: str = "text-embedding-3-small", | |
collection_name: str = "practitioners_db", | |
): | |
super().__init__() | |
if embeddings_model not in self.TEXT_EMBEDDING_MODELS: | |
raise ValueError( | |
f"Invalid embeddings model: {embeddings_model}. Valid options are {', '.join(self.TEXT_EMBEDDING_MODELS)}." | |
) | |
self.documents = documents | |
self.embeddings_model = embeddings_model | |
self.collection_name = collection_name | |
self._qdrant_db = None | |
def qdrant_db(self): | |
if self._qdrant_db is None: | |
self._qdrant_db = Qdrant.from_documents( | |
self.documents, | |
OpenAIEmbeddings(model=self.embeddings_model), | |
url=os.getenv("QDRANT_URL"), | |
api_key=os.getenv("QDRANT_API_KEY"), | |
prefer_grpc=True, | |
collection_name=self.collection_name, | |
force_recreate=True, | |
) | |
return self._qdrant_db | |
class SparseVectorStore(ValidateQdrantClient): | |
"""Store sparse vectors in Qdrant vector database using SPLADE neural retrieval model.""" | |
def __init__( | |
self, | |
documents: list[Document], | |
collection_name: str, | |
vector_name: str, | |
k: int = 4, | |
splade_model_id: str = "naver/splade-cocondenser-ensembledistil", | |
): | |
# Validate Qdrant client | |
super().__init__() | |
self.client = QdrantClient( | |
url=os.getenv("QDRANT_URL"), | |
api_key=os.getenv("QDRANT_API_KEY"), | |
) # TODO: prefer_grpc=True is not working | |
self.model_id = splade_model_id | |
self._tokenizer = None | |
self._model = None | |
self.collection_name = collection_name | |
self.vector_name = vector_name | |
self.k = k | |
self.sparse_retriever = self.create_sparse_retriever() | |
self.add_documents(documents) | |
def tokenizer(self): | |
"""Initialize the tokenizer.""" | |
if self._tokenizer is None: | |
self._tokenizer = AutoTokenizer.from_pretrained(self.model_id) | |
return self._tokenizer | |
def model(self): | |
"""Initialize the SPLADE neural retrieval model.""" | |
if self._model is None: | |
self._model = AutoModelForMaskedLM.from_pretrained(self.model_id) | |
return self._model | |
def sparse_encoder(self, text: str) -> tuple[list[int], list[float]]: | |
"""Encode the input text into a sparse vector.""" | |
tokens = self.tokenizer( | |
text, | |
return_tensors="pt", | |
max_length=512, | |
padding="max_length", | |
truncation=True, | |
) | |
with torch.no_grad(): | |
logits = self.model(**tokens).logits | |
relu_log = torch.log1p(torch.relu(logits)) | |
weighted_log = relu_log * tokens.attention_mask.unsqueeze(-1) | |
max_val = torch.max(weighted_log, dim=1).values.squeeze() | |
indices = torch.nonzero(max_val, as_tuple=False).squeeze().cpu().numpy() | |
values = max_val[indices].cpu().numpy() | |
return indices.tolist(), values.tolist() | |
def create_sparse_retriever(self): | |
self.client.recreate_collection( | |
self.collection_name, | |
vectors_config={}, | |
sparse_vectors_config={ | |
self.vector_name: models.SparseVectorParams( | |
index=models.SparseIndexParams( | |
on_disk=False, | |
) | |
) | |
}, | |
) | |
return QdrantSparseVectorRetriever( | |
client=self.client, | |
collection_name=self.collection_name, | |
sparse_vector_name=self.vector_name, | |
sparse_encoder=self.sparse_encoder, | |
k=self.k, | |
) | |
def add_documents(self, documents): | |
self.sparse_retriever.add_documents(documents) | |
def main(): | |
data_dir = Path().resolve().parent / "data" | |
if not data_dir.exists(): | |
sys.exit(f"The directory {data_dir} does not exist.") | |
processor = DataProcessor(data_dir) | |
print("Loading and cleaning Practitioners data...") | |
practitioners_dataset = processor.load_practitioners_data() | |
print("Loading Tall Tree data from json file...") | |
tall_tree_dataset = processor.load_tall_tree_data() | |
# Set OpenAI embeddings model | |
# TODO: Test new OpenAI text embeddings models | |
# text-embedding-3-large | |
# text-embedding-3-small | |
EMBEDDINGS_MODEL = "text-embedding-3-small" | |
# Store both datasets in Qdrant | |
print(f"Storing dense vectors in Qdrant using {EMBEDDINGS_MODEL}...") | |
practitioners_db = DenseVectorStore( | |
practitioners_dataset, EMBEDDINGS_MODEL, collection_name="practitioners_db" | |
).qdrant_db | |
tall_tree_db = DenseVectorStore( | |
tall_tree_dataset, EMBEDDINGS_MODEL, collection_name="tall_tree_db" | |
).qdrant_db | |
print(f"Storing sparse vectors in Qdrant using SPLADE neural retrieval model...") | |
practitioners_sparse_vector_db = SparseVectorStore( | |
documents=practitioners_dataset, | |
collection_name="practitioners_db_sparse_collection", | |
vector_name="sparse_vector", | |
k=15, | |
splade_model_id="naver/splade-cocondenser-ensembledistil", | |
) | |
if __name__ == "__main__": | |
main() | |