Spaces:
Building
on
T4
Building
on
T4
import requests | |
import time | |
import os | |
import numpy as np | |
from tqdm import tqdm | |
from typing import Any, List, Optional, Dict | |
from langchain_core.embeddings import Embeddings | |
from langchain_core.pydantic_v1 import BaseModel, Extra, root_validator | |
class OptimumEncoder(BaseModel, Embeddings): | |
_tokenizer: Any | |
_model: Any | |
_torch: Any | |
def __init__( | |
self, | |
name: str = "mixedbread-ai/mxbai-embed-large-v1", | |
device: Optional[str] = None, | |
cache_dir: Optional[str] = None, | |
**kwargs: Any | |
)-> None: | |
super().__init__(**kwargs) | |
self.name = name | |
self.device = device | |
self.cache_dir = cache_dir | |
self._tokenizer, self._model = self._initialize_hf_model() | |
def _initialize_hf_model(self): | |
try: | |
import onnxruntime as ort | |
from optimum.onnxruntime import ORTModelForFeatureExtraction | |
except ImportError: | |
raise ImportError( | |
"Please install optimum and onnxruntime to use OptimumEncoder. " | |
"You can install it with: " | |
"`pip install transformers optimum[onnxruntime-gpu]`" | |
) | |
try: | |
import torch | |
except ImportError: | |
raise ImportError( | |
"Please install Pytorch to use OptimumEncoder. " | |
"You can install it with: " | |
"`pip install semantic-router[local]`" | |
) | |
try: | |
from transformers import AutoTokenizer | |
except ImportError: | |
raise ImportError( | |
"Please install transformers to use OptimumEncoder. " | |
"You can install it with: " | |
"`pip install semantic-router[local]`" | |
) | |
self._torch = torch | |
tokenizer = AutoTokenizer.from_pretrained( | |
self.name | |
) | |
#provider_options = { | |
# "trt_engine_cache_enable": True, | |
# "trt_engine_cache_path": os.getenv('HF_HOME'), | |
# "trt_fp16_enable": True | |
#} | |
session_options = ort.SessionOptions() | |
session_options.log_severity_level = 0 | |
ort_model = ORTModelForFeatureExtraction.from_pretrained( | |
model_id=self.name, | |
file_name='model_fp16.onnx', | |
subfolder='onnx', | |
provider='CUDAExecutionProvider', | |
use_io_binding=True, | |
#provider_options=provider_options, | |
session_options=session_options | |
) | |
# print("Building engine for a short sequence...") | |
# short_text = ["short"] | |
# short_encoded_input = tokenizer( | |
# short_text, padding=True, truncation=True, return_tensors="pt" | |
# ).to(self.device) | |
# short_output = ort_model(**short_encoded_input) | |
# print("Building engine for a long sequence...") | |
# long_text = ["a very long input just for demo purpose, this is very long" * 10] | |
# long_encoded_input = tokenizer( | |
# long_text, padding=True, truncation=True, return_tensors="pt" | |
# ).to(self.device) | |
# long_output = ort_model(**long_encoded_input) | |
# text = ["Replace me by any text you'd like."] | |
# encoded_input = tokenizer( | |
# text, padding=True, truncation=True, return_tensors="pt" | |
# ).to(self.device) | |
# for i in range(3): | |
# output = ort_model(**encoded_input) | |
return tokenizer, ort_model | |
class Config: | |
"""Configuration for this pydantic object.""" | |
extra = Extra.allow | |
def embed_documents( | |
self, | |
docs: List[str], | |
batch_size: int = 32, | |
normalize_embeddings: bool = True, | |
pooling_strategy: str = "mean" | |
) -> List[List[float]]: | |
all_embeddings = [] | |
for i in tqdm(range(0, len(docs), batch_size)): | |
batch_docs = docs[i : i + batch_size] | |
encoded_input = self._tokenizer( | |
batch_docs, padding=True, truncation=True, return_tensors="pt" | |
).to(self.device) | |
with self._torch.no_grad(): | |
model_output = self._model(**encoded_input) | |
if pooling_strategy == "mean": | |
embeddings = self._mean_pooling( | |
model_output, encoded_input["attention_mask"] | |
) | |
elif pooling_strategy == "max": | |
embeddings = self._max_pooling( | |
model_output, encoded_input["attention_mask"] | |
) | |
else: | |
raise ValueError( | |
"Invalid pooling_strategy. Please use 'mean' or 'max'." | |
) | |
if normalize_embeddings: | |
embeddings = self._torch.nn.functional.normalize(embeddings, p=2, dim=1) | |
all_embeddings.extend(embeddings.tolist()) | |
return all_embeddings | |
def embed_query( | |
self, | |
docs: str, | |
normalize_embeddings: bool = True, | |
pooling_strategy: str = "mean" | |
) -> List[float]: | |
encoded_input = self._tokenizer( | |
docs, padding=True, truncation=True, return_tensors="pt" | |
).to(self.device) | |
with self._torch.no_grad(): | |
model_output = self._model(**encoded_input) | |
if pooling_strategy == "mean": | |
embeddings = self._mean_pooling( | |
model_output, encoded_input["attention_mask"] | |
) | |
elif pooling_strategy == "max": | |
embeddings = self._max_pooling( | |
model_output, encoded_input["attention_mask"] | |
) | |
else: | |
raise ValueError( | |
"Invalid pooling_strategy. Please use 'mean' or 'max'." | |
) | |
if normalize_embeddings: | |
embeddings = self._torch.nn.functional.normalize(embeddings, p=2, dim=1) | |
print(embeddings) | |
return embeddings.tolist() | |
def _mean_pooling(self, model_output, attention_mask): | |
token_embeddings = model_output[0] | |
input_mask_expanded = ( | |
attention_mask.unsqueeze(-1).expand(token_embeddings.size()).float() | |
) | |
return self._torch.sum( | |
token_embeddings * input_mask_expanded, 1 | |
) / self._torch.clamp(input_mask_expanded.sum(1), min=1e-9) | |
def _max_pooling(self, model_output, attention_mask): | |
token_embeddings = model_output[0] | |
input_mask_expanded = ( | |
attention_mask.unsqueeze(-1).expand(token_embeddings.size()).float() | |
) | |
token_embeddings[input_mask_expanded == 0] = -1e9 | |
return self._torch.max(token_embeddings, 1)[0] |