# routers/embedding/__init__.py import os import sys import threading import torch from sentence_transformers import SentenceTransformer, util class EmbeddingContext: # These don't change TOKEN_LEN_MAX_FOR_EMBEDDING = 512 # Set when creating the object lock = None model = None openai_client = None model_name = '' config_type = '' embedding_shape = None embedding_dtype = None embedding_device = None # Updates constantly data = {} def __init__(self): try: from config import settings except: sys.path.append(os.path.abspath( os.path.join(os.path.dirname(__file__), '../..'))) from config import settings self.lock = threading.Lock() config_type = settings.embedding_api model_name = settings.embedding_model if config_type == 'sbert': self.model = SentenceTransformer(model_name, use_auth_token=False) self.model.max_seq_length = self.TOKEN_LEN_MAX_FOR_EMBEDDING print("Max Sequence Length:", self.model.max_seq_length) self.encode = self.encode_sbert if torch.cuda.is_available(): self.model = self.model.to('cuda') elif config_type == 'openai': from openai import OpenAI self.openai_client = OpenAI( # base_url = settings.openai_api_base api_key=settings.OPENAI_API_KEY, ) self.encode = self.encode_openai self.model_name = model_name self.config_type = config_type tmp = self.encode(['tmp']) self.embedding_shape = tmp.shape[1:] self.embedding_dtype = tmp.dtype self.embedding_device = tmp.device def encode(self, texts_to_embed): pass def encode_sbert(self, texts_to_embed): return self.model.encode(texts_to_embed, show_progress_bar=True, convert_to_tensor=True, normalize_embeddings=True) def encode_openai(self, texts_to_embed): import math import time tokens_count = 0 for text in texts_to_embed: tokens_count += len(self.get_tokens(text)) chunks_num = math.ceil(tokens_count / 500000) chunk_size = math.ceil(len(texts_to_embed) / chunks_num) embeddings = [] for i in range(chunks_num): start = i * chunk_size end = start + chunk_size chunk = texts_to_embed[start:end] embeddings_tmp = self.openai_client.embeddings.create( model=self.model_name, input=chunk, ).data if embeddings_tmp is None: break embeddings.extend(embeddings_tmp) if i < chunks_num - 1: time.sleep(60) # Wait 1 minute before the next call return torch.stack([torch.tensor(embedding.embedding, dtype=torch.float32) for embedding in embeddings]) def get_tokens(self, text): if self.model: return self.model.tokenizer.tokenize(text) tokens = [] for token in re.split(r'(\W|\b)', text): if token.strip(): tokens.append(token) return tokens EMBEDDING_CTX = EmbeddingContext()