from pydantic import BaseModel from llama_cpp import Llama from concurrent.futures import ThreadPoolExecutor, as_completed import re import gradio as gr import os import urllib3 import pickle from functools import lru_cache from dotenv import load_dotenv from fastapi import FastAPI, Request, HTTPException from fastapi.responses import JSONResponse import time from tqdm import tqdm urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) app = FastAPI() load_dotenv() HUGGINGFACE_TOKEN = os.getenv("HUGGINGFACE_TOKEN") global_data = { 'tokens': {'eos': 'eos_token', 'pad': 'pad_token', 'padding': 'padding_token', 'unk': 'unk_token', 'bos': 'bos_token', 'sep': 'sep_token', 'cls': 'cls_token', 'mask': 'mask_token'}, 'model_configs': [ {"repo_id": "Ffftdtd5dtft/gpt2-xl-Q2_K-GGUF", "filename": "gpt2-xl-q2_k.gguf", "name": "GPT-2 XL"}, {"repo_id": "Ffftdtd5dtft/gemma-2-27b-Q2_K-GGUF", "filename": "gemma-2-27b-q2_k.gguf", "name": "Gemma 2-27B"}, {"repo_id": "Ffftdtd5dtft/Phi-3-mini-128k-instruct-Q2_K-GGUF", "filename": "phi-3-mini-128k-instruct-q2_k.gguf", "name": "Phi-3 Mini 128K Instruct"}, {"repo_id": "Ffftdtd5dtft/starcoder2-3b-Q2_K-GGUF", "filename": "starcoder2-3b-q2_k.gguf", "name": "Starcoder2 3B"}, {"repo_id": "Ffftdtd5dtft/Qwen2-1.5B-Instruct-Q2_K-GGUF", "filename": "qwen2-1.5b-instruct-q2_k.gguf", "name": "Qwen2 1.5B Instruct"}, {"repo_id": "Ffftdtd5dtft/Mistral-Nemo-Instruct-2407-Q2_K-GGUF", "filename": "mistral-nemo-instruct-2407-q2_k.gguf", "name": "Mistral Nemo Instruct 2407"}, {"repo_id": "Ffftdtd5dtft/Phi-3-mini-128k-instruct-IQ2_XXS-GGUF", "filename": "phi-3-mini-128k-instruct-iq2_xxs-imat.gguf", "name": "Phi 3 Mini 128K Instruct XXS"}, {"repo_id": "Ffftdtd5dtft/TinyLlama-1.1B-Chat-v1.0-IQ1_S-GGUF", "filename": "tinyllama-1.1b-chat-v1.0-iq1_s-imat.gguf", "name": "TinyLlama 1.1B Chat"}, {"repo_id": "Ffftdtd5dtft/Meta-Llama-3.1-8B-Q2_K-GGUF", "filename": "meta-llama-3.1-8b-q2_k.gguf", "name": "Meta Llama 3.1-8B"}, {"repo_id": "Ffftdtd5dtft/codegemma-2b-IQ1_S-GGUF", "filename": "codegemma-2b-iq1_s-imat.gguf", "name": "Codegemma 2B"}, ] } response_cache = {} model_cache_dir = "model_cache" os.makedirs(model_cache_dir, exist_ok=True) class ModelManager: def __init__(self, max_models=2): self.models = {} self.max_models = max_models self.model_cache_dir = model_cache_dir def load_model(self, model_config): model_name = model_config['name'] cache_file = os.path.join(self.model_cache_dir, f"{model_name}.pkl") if model_name not in self.models: try: if os.path.exists(cache_file): with open(cache_file, "rb") as f: self.models[model_name] = pickle.load(f) print(f"Modelo {model_name} cargado desde caché.") else: self.models[model_name] = Llama.from_pretrained(repo_id=model_config['repo_id'], filename=model_config['filename'], use_auth_token=HUGGINGFACE_TOKEN) with open(cache_file, "wb") as f: pickle.dump(self.models[model_name], f) print(f"Modelo {model_name} cargado y guardado en caché.") except Exception as e: print(f"Error al cargar el modelo {model_name}: {e}") self.models[model_name] = None def get_model(self, model_name): return self.models.get(model_name) def unload_model(self, model_name): if model_name in self.models and self.models[model_name] is not None: cache_file = os.path.join(self.model_cache_dir, f"{model_name}.pkl") with open(cache_file, "wb") as f: pickle.dump(self.models[model_name], f) del self.models[model_name] print(f"Modelo {model_name} descargado y guardado en caché.") model_manager = ModelManager() class ChatRequest(BaseModel): message: str def normalize_input(input_text): return input_text.strip() def remove_duplicates(text): text = re.sub(r'(Hello there, how are you\? \[/INST\]){2,}', 'Hello there, how are you? [/INST]', text) text = re.sub(r'(How are you\? \[/INST\]){2,}', 'How are you? [/INST]', text) text = text.replace('[/INST]', '') lines = text.split('\n') unique_lines = [] seen_lines = set() for line in lines: if line not in seen_lines: unique_lines.append(line) seen_lines.add(line) return '\n'.join(unique_lines) @lru_cache(maxsize=128) def generate_model_response(model, inputs): try: start_time = time.time() response = model(inputs, max_tokens=150) end_time = time.time() print(f"Tiempo de generación del modelo: {end_time - start_time:.4f} segundos") return remove_duplicates(response['choices'][0]['text']) except Exception as e: print(f"Error en la generación del modelo: {e}") return "" async def process_message(message): inputs = normalize_input(message) if inputs in response_cache: return response_cache[inputs] responses = {} start_time = time.time() with ThreadPoolExecutor(max_workers=model_manager.max_models) as executor: futures = [executor.submit(model_manager.load_model, config) for config in tqdm(global_data['model_configs'], desc="Cargando modelos")] for future in as_completed(futures): future.result() for config in global_data['model_configs']: model = model_manager.get_model(config['name']) if model: responses[config['name']] = generate_model_response(model, inputs) model_manager.unload_model(config['name']) end_time = time.time() print(f"Tiempo total de procesamiento: {end_time - start_time:.4f} segundos") formatted_response = "\n\n".join([f"**{model}:**\n{response}" for model, response in responses.items()]) response_cache[inputs] = formatted_response return formatted_response @app.post("/generate_multimodel") async def api_generate_multimodel(request: Request): try: data = await request.json() message = data.get("message") if not message: raise HTTPException(status_code=400, detail="Mensaje faltante") response = await process_message(message) return JSONResponse({"response": response}) except HTTPException as e: raise e except Exception as e: return JSONResponse({"error": str(e)}, status_code=500) iface = gr.Interface( fn=process_message, inputs=gr.Textbox(lines=2, placeholder="Enter your message here..."), outputs=gr.Markdown(), title="Multi-Model LLM API", description="Enter a message and get responses from multiple LLMs.", live=False ) if __name__ == "__main__": port = int(os.environ.get("PORT", 7860)) iface.launch(server_port=port)