text123 / app.py
Uhhy's picture
Update app.py
f61f49b verified
raw
history blame
7.06 kB
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from llama_cpp import Llama
from concurrent.futures import ThreadPoolExecutor, as_completed
from tqdm import tqdm
import uvicorn
from dotenv import load_dotenv
from difflib import SequenceMatcher
import re
import logging
import os
import numpy as np
from functools import lru_cache
from cachetools import TTLCache
from multiprocessing import cpu_count
import queue
logging.basicConfig(level=logging.ERROR)
load_dotenv()
app = FastAPI()
cache_size = 2000
cache_ttl = 7200
cache = TTLCache(maxsize=cache_size, ttl=cache_ttl)
global_data = {
'models': {}
}
model_configs = [
{"repo_id": "Ffftdtd5dtft/gpt2-xl-Q2_K-GGUF", "filename": "gpt2-xl-q2_k.gguf", "name": "GPT-2 XL"},
{"repo_id": "Ffftdtd5dtft/Meta-Llama-3.1-8B-Instruct-Q2_K-GGUF", "filename": "meta-llama-3.1-8b-instruct-q2_k.gguf", "name": "Meta Llama 3.1-8B Instruct"},
{"repo_id": "Ffftdtd5dtft/gemma-2-9b-it-Q2_K-GGUF", "filename": "gemma-2-9b-it-q2_k.gguf", "name": "Gemma 2-9B IT"},
{"repo_id": "Ffftdtd5dtft/gemma-2-27b-Q2_K-GGUF", "filename": "gemma-2-27b-q2_k.gguf", "name": "Gemma 2-27B"},
{"repo_id": "Ffftdtd5dtft/Phi-3-mini-128k-instruct-Q2_K-GGUF", "filename": "phi-3-mini-128k-instruct-q2_k.gguf", "name": "Phi-3 Mini 128K Instruct"},
{"repo_id": "Ffftdtd5dtft/Meta-Llama-3.1-8B-Q2_K-GGUF", "filename": "meta-llama-3.1-8b-q2_k.gguf", "name": "Meta Llama 3.1-8B"},
{"repo_id": "Ffftdtd5dtft/Qwen2-7B-Instruct-Q2_K-GGUF", "filename": "qwen2-7b-instruct-q2_k.gguf", "name": "Qwen2 7B Instruct"},
{"repo_id": "Ffftdtd5dtft/starcoder2-3b-Q2_K-GGUF", "filename": "starcoder2-3b-q2_k.gguf", "name": "Starcoder2 3B"},
{"repo_id": "Ffftdtd5dtft/Qwen2-1.5B-Instruct-Q2_K-GGUF", "filename": "qwen2-1.5b-instruct-q2_k.gguf", "name": "Qwen2 1.5B Instruct"}
]
class ModelManager:
def __init__(self):
self.models = {}
def load_model(self, model_config):
try:
model = Llama.from_pretrained(repo_id=model_config['repo_id'], filename=model_config['filename'])
self.models[model_config['name']] = model
return model
except Exception as e:
logging.error(f"Error al cargar el modelo {model_config['name']}: {e}")
return None
def load_all_models(self):
with ThreadPoolExecutor(max_workers=min(len(model_configs), cpu_count())) as executor:
futures = [executor.submit(self.load_model, config) for config in model_configs]
for future in tqdm(as_completed(futures), total=len(model_configs), desc="Cargando modelos", unit="modelo"):
future.result()
return self.models
model_manager = ModelManager()
model_manager.load_all_models()
global_data['models'] = model_manager.models
class ChatRequest(BaseModel):
message: str
top_k: int = 50
top_p: float = 0.95
temperature: float = 0.7
@lru_cache(maxsize=20000)
def generate_chat_response(request: ChatRequest, model_name: str):
cache_key = f"{request.message}_{model_name}"
if cache_key in cache:
return cache[cache_key]
model = global_data['models'].get(model_name)
if not model:
return {"response": "Error: Modelo no encontrado.", "literal": request.message, "model_name": model_name}
try:
user_input = normalize_input(request.message)
response = model.create_chat_completion(
messages=[{"role": "user", "content": user_input}],
top_k=request.top_k,
top_p=request.top_p,
temperature=request.temperature
)
reply = response['choices'][0]['message']['content']
cache[cache_key] = {"response": reply, "literal": user_input, "model_name": model_name}
return cache[cache_key]
except Exception as e:
logging.error(f"Error en la generación de respuesta con el modelo {model_name}: {e}")
return {"response": f"Error: {str(e)}", "literal": user_input, "model_name": model_name}
def normalize_input(input_text):
return input_text.strip().lower()
def remove_duplicates(text):
text = re.sub(r'(Hello there, how are you\? \[/INST\]){2,}', 'Hello there, how are you? [/INST]', text)
text = re.sub(r'(How are you\? \[/INST\]){2,}', 'How are you? [/INST]', text)
text = text.replace('[/INST]', '')
lines = text.split('\n')
unique_lines = list(dict.fromkeys(lines))
return '\n'.join(unique_lines).strip()
def remove_repetitive_responses(responses):
seen = set()
unique_responses = []
for response in responses:
normalized_response = remove_duplicates(response['response'])
if normalized_response not in seen:
seen.add(normalized_response)
unique_responses.append(response)
return unique_responses
def select_best_response(responses):
responses = remove_repetitive_responses(responses)
responses = [remove_duplicates(response['response']) for response in responses]
unique_responses = list(set(responses))
coherent_responses = filter_by_coherence(unique_responses)
best_response = filter_by_similarity(coherent_responses)
return best_response
def filter_by_coherence(responses):
responses.sort(key=len, reverse=True)
return responses
def filter_by_similarity(responses):
best_response = responses[0]
for i in range(1, len(responses)):
ratio = SequenceMatcher(None, best_response, responses[i]).ratio()
if ratio < 0.9:
best_response = responses[i]
break
return best_response
def worker_function(model_name, request, response_queue):
try:
response = generate_chat_response(request, model_name)
response_queue.put((model_name, response))
except Exception as e:
logging.error(f"Error en la generación de respuesta con el modelo {model_name}: {e}")
response_queue.put((model_name, {"response": f"Error: {str(e)}", "literal": request.message, "model_name": model_name}))
@app.post("/generate_chat")
async def generate_chat(request: ChatRequest):
if not request.message.strip():
raise HTTPException(status_code=400, detail="The message cannot be empty.")
responses = []
num_models = len(global_data['models'])
response_queue = queue.Queue()
with ThreadPoolExecutor(max_workers=min(num_models, cpu_count())) as executor:
futures = [executor.submit(worker_function, model_name, request, response_queue) for model_name in global_data['models']]
for future in tqdm(as_completed(futures), total=num_models, desc="Generando respuestas", unit="modelo"):
future.result()
while not response_queue.empty():
model_name, response = response_queue.get()
responses.append(response)
best_response = select_best_response(responses)
return {
"best_response": best_response,
"all_responses": responses
}
if __name__ == "__main__":
uvicorn.run(app, host="0.0.0.0", port=8000)