Spaces:
Runtime error
Runtime error
import torch | |
import torch.nn.functional as F | |
from torch import Tensor | |
from transformers import AutoTokenizer, AutoModel | |
import threading | |
import queue | |
import gradio as gr | |
import os | |
title = """ | |
# 👋🏻Welcome to 🙋🏻♂️Tonic's 🐣e5-mistral🛌🏻Embeddings """ | |
description = """ | |
You can use this ZeroGPU Space to test out the current model [intfloat/e5-mistral-7b-instruct](https://huggingface.co./intfloat/e5-mistral-7b-instruct). 🐣e5-mistral🛌🏻 has a larger context🪟window, a different prompting/return🛠️mechanism and generally better results than other embedding models. use it via API to create embeddings or try out the sentence similarity to see how various optimization parameters affect performance. | |
You can also use 🐣e5-mistral🛌🏻 by cloning this space. 🧬🔬🔍 Simply click here: <a style="display:inline-block" href="https://huggingface.co./spaces/Tonic/e5?duplicate=true"><img src="https://img.shields.io/badge/-Duplicate%20Space-blue?labelColor=white&style=flat&logo=data:image/png;base64,iVBORw0KGgoAAAANSUhEUgAAABAAAAAQCAYAAAAf8/9hAAAAAXNSR0IArs4c6QAAAP5JREFUOE+lk7FqAkEURY+ltunEgFXS2sZGIbXfEPdLlnxJyDdYB62sbbUKpLbVNhyYFzbrrA74YJlh9r079973psed0cvUD4A+4HoCjsA85X0Dfn/RBLBgBDxnQPfAEJgBY+A9gALA4tcbamSzS4xq4FOQAJgCDwV2CPKV8tZAJcAjMMkUe1vX+U+SMhfAJEHasQIWmXNN3abzDwHUrgcRGmYcgKe0bxrblHEB4E/pndMazNpSZGcsZdBlYJcEL9Afo75molJyM2FxmPgmgPqlWNLGfwZGG6UiyEvLzHYDmoPkDDiNm9JR9uboiONcBXrpY1qmgs21x1QwyZcpvxt9NS09PlsPAAAAAElFTkSuQmCC&logoWidth=14" alt="Duplicate Space"></a></h3> | |
Join us : 🌟TeamTonic🌟 is always making cool demos! Join our active builder's🛠️community 👻 [![Join us on Discord](https://img.shields.io/discord/1109943800132010065?label=Discord&logo=discord&style=flat-square)](https://discord.gg/GWpVpekp) On 🤗Huggingface: [TeamTonic](https://huggingface.co./TeamTonic) & [MultiTransformer](https://huggingface.co./MultiTransformer) On 🌐Github: [Tonic-AI](https://github.com/tonic-ai) & contribute to 🌟 [DataTonic](https://github.com/Tonic-AI/DataTonic) 🤗Big thanks to Yuvi Sharma and all the folks at huggingface for the community grant 🤗 | |
""" | |
os.environ['PYTORCH_CUDA_ALLOC_CONF'] = 'max_split_size_mb:30' | |
device = torch.device("cuda" if torch.cuda.is_available() else "cpu") | |
tasks = { | |
'ArguAna': 'Given a claim, find documents that refute the claim', | |
'ClimateFEVER': 'Given a claim about climate change, retrieve documents that support or refute the claim', | |
'DBPedia': 'Given a query, retrieve relevant entity descriptions from DBPedia', | |
'FEVER': 'Given a claim, retrieve documents that support or refute the claim', | |
'FiQA2018': 'Given a financial question, retrieve user replies that best answer the question', | |
'HotpotQA': 'Given a multi-hop question, retrieve documents that can help answer the question', | |
'MSMARCO': 'Given a web search query, retrieve relevant passages that answer the query', | |
'NFCorpus': 'Given a question, retrieve relevant documents that best answer the question', | |
'NQ': 'Given a question, retrieve Wikipedia passages that answer the question', | |
'QuoraRetrieval': 'Given a question, retrieve questions that are semantically equivalent to the given question', | |
'SCIDOCS': 'Given a scientific paper title, retrieve paper abstracts that are cited by the given paper', | |
'SciFact': 'Given a scientific claim, retrieve documents that support or refute the claim', | |
'Touche2020': 'Given a question, retrieve detailed and persuasive arguments that answer the question', | |
'TRECCOVID': 'Given a query on COVID-19, retrieve documents that answer the query', | |
} | |
# Global queue for embedding requests | |
embedding_request_queue = queue.Queue() | |
embedding_response_queue = queue.Queue() | |
tokenizer = AutoTokenizer.from_pretrained('intfloat/e5-mistral-7b-instruct') | |
model = AutoModel.from_pretrained('intfloat/e5-mistral-7b-instruct', torch_dtype=torch.float16, device_map=device) | |
def last_token_pool(last_hidden_states: Tensor, attention_mask: Tensor) -> Tensor: | |
left_padding = (attention_mask[:, -1].sum() == attention_mask.shape[0]) | |
if left_padding: | |
return last_hidden_states[:, -1] | |
else: | |
sequence_lengths = attention_mask.sum(dim=1) - 1 | |
batch_size = last_hidden_states.shape[0] | |
return last_hidden_states[torch.arange(batch_size, device=last_hidden_states.device), sequence_lengths] | |
def clear_cuda_cache(): | |
torch.cuda.empty_cache() | |
def free_memory(*args): | |
for arg in args: | |
del arg | |
def load_corpus_from_json(file_path): | |
with open(file_path, 'r') as file: | |
data = json.load(file) | |
return data | |
def embedding_worker(): | |
while True: | |
# Wait for an item in the queue | |
item = embedding_request_queue.get() | |
if item is None: | |
break | |
selected_task, input_text = item | |
embeddings = compute_embeddings(selected_task, input_text) | |
formatted_response = format_response(embeddings) | |
embedding_response_queue.put(formatted_response) | |
embedding_request_queue.task_done() | |
clear_cuda_cache() | |
threading.Thread(target=embedding_worker, daemon=True).start() | |
def compute_embeddings(selected_task, input_text): | |
try: | |
task_description = tasks[selected_task] | |
except KeyError: | |
print(f"Selected task not found: {selected_task}") | |
return f"Error: Task '{selected_task}' not found. Please select a valid task." | |
max_length = 2048 | |
processed_texts = [f'Instruct: {task_description}\nQuery: {input_text}'] | |
batch_dict = tokenizer(processed_texts, max_length=max_length - 1, return_attention_mask=False, padding=False, truncation=True) | |
batch_dict['input_ids'] = [input_ids + [tokenizer.eos_token_id] for input_ids in batch_dict['input_ids']] | |
batch_dict = tokenizer.pad(batch_dict, padding=True, return_attention_mask=True, return_tensors='pt') | |
batch_dict = {k: v.to(device) for k, v in batch_dict.items()} | |
outputs = model(**batch_dict) | |
embeddings = last_token_pool(outputs.last_hidden_state, batch_dict['attention_mask']) | |
embeddings = F.normalize(embeddings, p=2, dim=1) | |
embeddings_list = embeddings.detach().cpu().numpy().tolist() | |
clear_cuda_cache() | |
return embeddings_list | |
def decode_embedding(embedding_str): | |
try: | |
embedding = [float(num) for num in embedding_str.split(',')] | |
embedding_tensor = torch.tensor(embedding, dtype=torch.float16, device=device) | |
decoded_embedding = tokenizer.decode(embedding_tensor[0], skip_special_tokens=True) | |
return decoded_embedding.cpu().numpy().tolist() | |
except Exception as e: | |
return f"Error in decoding: {str(e)}" | |
def compute_similarity(selected_task, sentence1, sentence2, extra_sentence1, extra_sentence2): | |
try: | |
task_description = tasks[selected_task] | |
except KeyError: | |
print(f"Selected task not found: {selected_task}") | |
return f"Error: Task '{selected_task}' not found. Please select a valid task." | |
# Compute embeddings for each sentence | |
embeddings1 = compute_embeddings(selected_task, sentence1) | |
embeddings2 = compute_embeddings(selected_task, sentence2) | |
embeddings3 = compute_embeddings(selected_task, extra_sentence1) | |
embeddings4 = compute_embeddings(selected_task, extra_sentence2) | |
# Convert embeddings to tensors | |
embeddings_tensor1 = torch.tensor(embeddings1).to(device).half() | |
embeddings_tensor2 = torch.tensor(embeddings2).to(device).half() | |
embeddings_tensor3 = torch.tensor(embeddings3).to(device).half() | |
embeddings_tensor4 = torch.tensor(embeddings4).to(device).half() | |
# Compute cosine similarity | |
similarity1 = compute_cosine_similarity(embeddings1, embeddings2) | |
similarity2 = compute_cosine_similarity(embeddings1, embeddings3) | |
similarity3 = compute_cosine_similarity(embeddings1, embeddings4) | |
# Free memory | |
free_memory(embeddings1, embeddings2, embeddings3, embeddings4) | |
similarity_scores = {"Similarity 1-2": similarity1, "Similarity 1-3": similarity2, "Similarity 1-4": similarity3} | |
clear_cuda_cache() | |
return similarity_scores | |
def compute_cosine_similarity(emb1, emb2): | |
tensor1 = torch.tensor(emb1).to(device).half() | |
tensor2 = torch.tensor(emb2).to(device).half() | |
similarity = F.cosine_similarity(tensor1, tensor2).item() | |
free_memory(tensor1, tensor2) | |
clear_cuda_cache() | |
return similarity | |
def compute_embeddings_batch(input_texts): | |
max_length = 2042 | |
processed_texts = [f'Instruct: {task_description}\nQuery: {text}' for text in input_texts] | |
batch_dict = tokenizer(processed_texts, max_length=max_length - 1, return_attention_mask=False, padding=False, truncation=True) | |
batch_dict['input_ids'] = [input_ids + [tokenizer.eos_token_id] for input_ids in batch_dict['input_ids']] | |
batch_dict = tokenizer.pad(batch_dict, padding=True, return_attention_mask=True, return_tensors='pt') | |
batch_dict = {k: v.to(device) for k, v in batch_dict.items()} | |
outputs = model(**batch_dict) | |
embeddings = last_token_pool(outputs.last_hidden_state, batch_dict['attention_mask']) | |
embeddings = F.normalize(embeddings, p=2, dim=1) | |
clear_cuda_cache() | |
return embeddings.detach().cpu().numpy() | |
def semantic_search(query_embedding, corpus_embeddings, top_k=5): | |
scores = np.dot(corpus_embeddings, query_embedding.T).flatten() | |
top_k_indices = np.argsort(scores)[::-1][:top_k] | |
return top_k_indices, scores[top_k_indices] | |
def search_similar_sentences(input_question, corpus_sentences, corpus_embeddings): | |
question_embedding = compute_embeddings_batch([input_question])[0] | |
top_k_indices, top_k_scores = semantic_search(question_embedding, corpus_embeddings) | |
results = [(corpus_sentences[i], top_k_scores[i]) for i in top_k_indices] | |
return results | |
# openai response object formatting | |
def format_response(embeddings): | |
return { | |
"data": [ | |
{ | |
"embedding": embeddings, | |
"index": 0, | |
"object": "embedding" | |
} | |
], | |
"model": "e5-mistral", | |
"object": "list", | |
"usage": { | |
"prompt_tokens": 17, | |
"total_tokens": 17 | |
} | |
} | |
def generate_and_format_embeddings(selected_task, input_text): | |
embedding_request_queue.put((selected_task, input_text)) | |
response = embedding_response_queue.get() | |
embedding_response_queue.task_done() | |
clear_cuda_cache() | |
return response | |
def app_interface(): | |
corpus_sentences = [] | |
corpus_embeddings = [] | |
with gr.Blocks() as demo: | |
gr.Markdown(title) | |
gr.Markdown(description) | |
with gr.Row(): | |
task_dropdown = gr.Dropdown(list(tasks.keys()), label="Select a Task", value=list(tasks.keys())[0]) | |
with gr.Tab("Embedding Generation"): | |
input_text_box = gr.Textbox(label="📖Input Text") | |
compute_button = gr.Button("Try🐣🛌🏻e5") | |
output_display = gr.Textbox(label="🐣e5-mistral🛌🏻 Embeddings") | |
compute_button.click( | |
fn=compute_embeddings, | |
inputs=[task_dropdown, input_text_box], | |
outputs=output_display | |
) | |
with gr.Tab("Sentence Similarity"): | |
sentence1_box = gr.Textbox(label="'Focus Sentence' - The 'Subject'") | |
sentence2_box = gr.Textbox(label="'Input Sentence' - 1") | |
extra_sentence1_box = gr.Textbox(label="'Input Sentence' - 2") | |
extra_sentence2_box = gr.Textbox(label="'Input Sentence' - 3") | |
similarity_button = gr.Button("Compute Similarity") | |
similarity_output = gr.Textbox(label="🐣e5-mistral🛌🏻 Similarity Scores") | |
similarity_button.click( | |
fn=compute_similarity, | |
inputs=[task_dropdown, sentence1_box, sentence2_box, extra_sentence1_box, extra_sentence2_box], | |
outputs=similarity_output | |
) | |
with gr.Tab("Load Corpus"): | |
json_uploader = gr.File(label="Upload JSON File") | |
load_corpus_button = gr.Button("Load Corpus") | |
corpus_status = gr.Textbox(label="Corpus Status", value="Corpus not loaded") | |
def load_corpus(file_info): | |
if file_info is None: | |
return "No file uploaded. Please upload a JSON file." | |
try: | |
global corpus_sentences, corpus_embeddings | |
corpus_sentences = load_corpus_from_json(file_info['name']) | |
corpus_embeddings = compute_embeddings_batch(corpus_sentences) | |
return "Corpus loaded successfully with {} sentences.".format(len(corpus_sentences)) | |
except Exception as e: | |
return "Error loading corpus: {}".format(e) | |
load_corpus_button.click( | |
fn=load_corpus, | |
inputs=json_uploader, | |
outputs=corpus_status | |
) | |
with gr.Tab("Semantic Search"): | |
input_question_box = gr.Textbox(label="Enter your question") | |
search_button = gr.Button("Search") | |
search_results_output = gr.Textbox(label="Search Results") | |
def perform_search(input_question): | |
if not corpus_sentences or not corpus_embeddings: | |
return "Corpus is not loaded. Please load a corpus first." | |
return search_similar_sentences(input_question, corpus_sentences, corpus_embeddings) | |
search_button.click( | |
fn=perform_search, | |
inputs=input_question_box, | |
outputs=search_results_output | |
) | |
with gr.Tab("Connector-like Embeddings"): | |
with gr.Row(): | |
input_text_box_connector = gr.Textbox(label="Input Text", placeholder="Enter text or array of texts") | |
model_dropdown_connector = gr.Dropdown(label="Model", choices=["ArguAna", "ClimateFEVER", "DBPedia", "FEVER", "FiQA2018", "HotpotQA", "MSMARCO", "NFCorpus", "NQ", "QuoraRetrieval", "SCIDOCS", "SciFact", "Touche2020", "TRECCOVID"], value="text-embedding-ada-002") | |
encoding_format_connector = gr.Radio(label="Encoding Format", choices=["float", "base64"], value="float") | |
user_connector = gr.Textbox(label="User", placeholder="Enter user identifier (optional)") | |
submit_button_connector = gr.Button("Generate Embeddings") | |
output_display_connector = gr.JSON(label="Embeddings Output") | |
submit_button_connector.click( | |
fn=generate_and_format_embeddings, | |
inputs=[model_dropdown_connector, input_text_box_connector], | |
outputs=output_display_connector | |
) | |
# with gr.Tab("Decode Embedding"): | |
# embedding_input = gr.Textbox(label="Enter Embedding (comma-separated floats)") | |
# decode_button = gr.Button("Decode") | |
# decoded_output = gr.Textbox(label="Decoded Embedding") | |
# | |
# decode_button.click( | |
# fn=decode_embedding, | |
# inputs=embedding_input, | |
# outputs=decoded_output | |
# ) | |
with gr.Row(): | |
with gr.Column(): | |
input_text_box | |
with gr.Column(): | |
compute_button | |
output_display | |
return demo | |
app_interface().queue() | |
app_interface().launch(share=True) |