tools / routers /embedding /__init__.py
Germano Cavalcante
New tool Wiki Search
ed15883
raw
history blame
3.36 kB
# routers/embedding/__init__.py
import os
import sys
import threading
import torch
from sentence_transformers import SentenceTransformer, util
class EmbeddingContext:
# These don't change
TOKEN_LEN_MAX_FOR_EMBEDDING = 512
# Set when creating the object
lock = None
model = None
openai_client = None
model_name = ''
config_type = ''
embedding_shape = None
embedding_dtype = None
embedding_device = None
# Updates constantly
data = {}
def __init__(self):
try:
from config import settings
except:
sys.path.append(os.path.abspath(
os.path.join(os.path.dirname(__file__), '../..')))
from config import settings
self.lock = threading.Lock()
config_type = settings.embedding_api
model_name = settings.embedding_model
if config_type == 'sbert':
self.model = SentenceTransformer(model_name, use_auth_token=False)
self.model.max_seq_length = self.TOKEN_LEN_MAX_FOR_EMBEDDING
print("Max Sequence Length:", self.model.max_seq_length)
self.encode = self.encode_sbert
if torch.cuda.is_available():
self.model = self.model.to('cuda')
elif config_type == 'openai':
from openai import OpenAI
self.openai_client = OpenAI(
# base_url = settings.openai_api_base
api_key=settings.OPENAI_API_KEY,
)
self.encode = self.encode_openai
self.model_name = model_name
self.config_type = config_type
tmp = self.encode(['tmp'])
self.embedding_shape = tmp.shape[1:]
self.embedding_dtype = tmp.dtype
self.embedding_device = tmp.device
def encode(self, texts_to_embed):
pass
def encode_sbert(self, texts_to_embed):
return self.model.encode(texts_to_embed, show_progress_bar=True, convert_to_tensor=True, normalize_embeddings=True)
def encode_openai(self, texts_to_embed):
import math
import time
tokens_count = 0
for text in texts_to_embed:
tokens_count += len(self.get_tokens(text))
chunks_num = math.ceil(tokens_count / 500000)
chunk_size = math.ceil(len(texts_to_embed) / chunks_num)
embeddings = []
for i in range(chunks_num):
start = i * chunk_size
end = start + chunk_size
chunk = texts_to_embed[start:end]
embeddings_tmp = self.openai_client.embeddings.create(
model=self.model_name,
input=chunk,
).data
if embeddings_tmp is None:
break
embeddings.extend(embeddings_tmp)
if i < chunks_num - 1:
time.sleep(60) # Wait 1 minute before the next call
return torch.stack([torch.tensor(embedding.embedding, dtype=torch.float32) for embedding in embeddings])
def get_tokens(self, text):
if self.model:
return self.model.tokenizer.tokenize(text)
tokens = []
for token in re.split(r'(\W|\b)', text):
if token.strip():
tokens.append(token)
return tokens
EMBEDDING_CTX = EmbeddingContext()