Spaces:
Running
Running
# routers/embedding/__init__.py | |
import os | |
import sys | |
import threading | |
import torch | |
from sentence_transformers import SentenceTransformer, util | |
class EmbeddingContext: | |
# These don't change | |
TOKEN_LEN_MAX_FOR_EMBEDDING = 512 | |
# Set when creating the object | |
lock = None | |
model = None | |
openai_client = None | |
model_name = '' | |
config_type = '' | |
embedding_shape = None | |
embedding_dtype = None | |
embedding_device = None | |
# Updates constantly | |
data = {} | |
def __init__(self): | |
try: | |
from config import settings | |
except: | |
sys.path.append(os.path.abspath( | |
os.path.join(os.path.dirname(__file__), '../..'))) | |
from config import settings | |
self.lock = threading.Lock() | |
config_type = settings.embedding_api | |
model_name = settings.embedding_model | |
if config_type == 'sbert': | |
self.model = SentenceTransformer(model_name, use_auth_token=False) | |
self.model.max_seq_length = self.TOKEN_LEN_MAX_FOR_EMBEDDING | |
print("Max Sequence Length:", self.model.max_seq_length) | |
self.encode = self.encode_sbert | |
if torch.cuda.is_available(): | |
self.model = self.model.to('cuda') | |
elif config_type == 'openai': | |
from openai import OpenAI | |
self.openai_client = OpenAI( | |
# base_url = settings.openai_api_base | |
api_key=settings.OPENAI_API_KEY, | |
) | |
self.encode = self.encode_openai | |
self.model_name = model_name | |
self.config_type = config_type | |
tmp = self.encode(['tmp']) | |
self.embedding_shape = tmp.shape[1:] | |
self.embedding_dtype = tmp.dtype | |
self.embedding_device = tmp.device | |
def encode(self, texts_to_embed): | |
pass | |
def encode_sbert(self, texts_to_embed): | |
return self.model.encode(texts_to_embed, show_progress_bar=True, convert_to_tensor=True, normalize_embeddings=True) | |
def encode_openai(self, texts_to_embed): | |
import math | |
import time | |
tokens_count = 0 | |
for text in texts_to_embed: | |
tokens_count += len(self.get_tokens(text)) | |
chunks_num = math.ceil(tokens_count / 500000) | |
chunk_size = math.ceil(len(texts_to_embed) / chunks_num) | |
embeddings = [] | |
for i in range(chunks_num): | |
start = i * chunk_size | |
end = start + chunk_size | |
chunk = texts_to_embed[start:end] | |
embeddings_tmp = self.openai_client.embeddings.create( | |
model=self.model_name, | |
input=chunk, | |
).data | |
if embeddings_tmp is None: | |
break | |
embeddings.extend(embeddings_tmp) | |
if i < chunks_num - 1: | |
time.sleep(60) # Wait 1 minute before the next call | |
return torch.stack([torch.tensor(embedding.embedding, dtype=torch.float32) for embedding in embeddings]) | |
def get_tokens(self, text): | |
if self.model: | |
return self.model.tokenizer.tokenize(text) | |
tokens = [] | |
for token in re.split(r'(\W|\b)', text): | |
if token.strip(): | |
tokens.append(token) | |
return tokens | |
EMBEDDING_CTX = EmbeddingContext() | |