from pydantic import BaseModel from llama_cpp import Llama from concurrent.futures import ThreadPoolExecutor, as_completed import re import gradio as gr import os import urllib3 import pickle from functools import lru_cache from dotenv import load_dotenv from fastapi import FastAPI, Request, HTTPException from fastapi.responses import JSONResponse from tqdm import tqdm urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) app = FastAPI() load_dotenv() HUGGINGFACE_TOKEN = os.getenv("HUGGINGFACE_TOKEN") global_data = { 'tokens': {'eos': 'eos_token', 'pad': 'pad_token', 'padding': 'padding_token', 'unk': 'unk_token', 'bos': 'bos_token', 'sep': 'sep_token', 'cls': 'cls_token', 'mask': 'mask_token'}, 'model_configs': [ {"repo_id": "Ffftdtd5dtft/gpt2-xl-Q2_K-GGUF", "filename": "gpt2-xl-q2_k.gguf", "name": "GPT-2 XL"}, {"repo_id": "Ffftdtd5dtft/gemma-2-27b-Q2_K-GGUF", "filename": "gemma-2-27b-q2_k.gguf", "name": "Gemma 2-27B"}, {"repo_id": "Ffftdtd5dtft/Phi-3-mini-128k-instruct-Q2_K-GGUF", "filename": "phi-3-mini-128k-instruct-q2_k.gguf", "name": "Phi-3 Mini 128K Instruct"}, {"repo_id": "Ffftdtd5dtft/starcoder2-3b-Q2_K-GGUF", "filename": "starcoder2-3b-q2_k.gguf", "name": "Starcoder2 3B"}, {"repo_id": "Ffftdtd5dtft/Qwen2-1.5B-Instruct-Q2_K-GGUF", "filename": "qwen2-1.5b-instruct-q2_k.gguf", "name": "Qwen2 1.5B Instruct"}, {"repo_id": "Ffftdtd5dtft/Mistral-Nemo-Instruct-2407-Q2_K-GGUF", "filename": "mistral-nemo-instruct-2407-q2_k.gguf", "name": "Mistral Nemo Instruct 2407"}, {"repo_id": "Ffftdtd5dtft/Phi-3-mini-128k-instruct-IQ2_XXS-GGUF", "filename": "phi-3-mini-128k-instruct-iq2_xxs-imat.gguf", "name": "Phi 3 Mini 128K Instruct XXS"}, {"repo_id": "Ffftdtd5dtft/TinyLlama-1.1B-Chat-v1.0-IQ1_S-GGUF", "filename": "tinyllama-1.1b-chat-v1.0-iq1_s-imat.gguf", "name": "TinyLlama 1.1B Chat"}, {"repo_id": "Ffftdtd5dtft/Meta-Llama-3.1-8B-Q2_K-GGUF", "filename": "meta-llama-3.1-8b-q2_k.gguf", "name": "Meta Llama 3.1-8B"}, {"repo_id": "Ffftdtd5dtft/codegemma-2b-IQ1_S-GGUF", "filename": "codegemma-2b-iq1_s-imat.gguf", "name": "Codegemma 2B"}, ] } response_cache = {} model_cache_dir = "model_cache" os.makedirs(model_cache_dir, exist_ok=True) class ModelManager: def __init__(self): self.models = {} self.model_cache_dir = model_cache_dir self.load_all_models() def load_all_models(self): with ThreadPoolExecutor(max_workers=len(global_data['model_configs'])) as executor: futures = [executor.submit(self._load_model, config) for config in tqdm(global_data['model_configs'], desc="Loading models")] for future in tqdm(as_completed(futures), total=len(global_data['model_configs']), desc="Loading models complete"): future.result() def _load_model(self, model_config): model_name = model_config['name'] cache_file = os.path.join(self.model_cache_dir, f"{model_name}.pkl") if model_name not in self.models: try: if os.path.exists(cache_file): with open(cache_file, "rb") as f: self.models[model_name] = pickle.load(f) else: self.models[model_name] = Llama.from_pretrained(repo_id=model_config['repo_id'], filename=model_config['filename'], use_auth_token=HUGGINGFACE_TOKEN) with open(cache_file, "wb") as f: pickle.dump(self.models[model_name], f) except Exception as e: print(f"Error loading model {model_name}: {e}") self.models[model_name] = None def get_model(self, model_name): return self.models.get(model_name) model_manager = ModelManager() class ChatRequest(BaseModel): message: str def normalize_input(input_text): return input_text.strip() def remove_duplicates(text): text = re.sub(r'(Hello there, how are you\? \[/INST\]){2,}', 'Hello there, how are you?', text) text = re.sub(r'(How are you\? \[/INST\]){2,}', 'How are you?', text) text = text.replace('[/INST]', '') lines = text.split('\n') unique_lines = [] seen_lines = set() for line in lines: if line not in seen_lines: unique_lines.append(line) seen_lines.add(line) return '\n'.join(unique_lines) @lru_cache(maxsize=128) def generate_model_response(model, inputs): response = model(inputs, max_tokens=150) return remove_duplicates(response['choices'][0]['text']) async def process_message(message): inputs = normalize_input(message) if inputs in response_cache: return response_cache[inputs] responses = {} with ThreadPoolExecutor(max_workers=len(global_data['model_configs'])) as executor: futures = [executor.submit(generate_model_response, model_manager.get_model(config['name']), inputs) for config in global_data['model_configs'] if model_manager.get_model(config['name'])] for i, future in enumerate(tqdm(as_completed(futures), total=len([f for f in futures]), desc="Generating responses")): model_name = global_data['model_configs'][i]['name'] responses[model_name] = future.result() formatted_response = "\n\n".join([f"**{model}:**\n{response}" for model, response in responses.items()]) response_cache[inputs] = formatted_response return formatted_response @app.post("/generate_multimodel") async def api_generate_multimodel(request: Request): try: data = await request.json() message = data.get("message") if not message: raise HTTPException(status_code=400, detail="Missing message") response = await process_message(message) return JSONResponse({"response": response}) except HTTPException as e: raise e except Exception as e: return JSONResponse({"error": str(e)}, status_code=500) iface = gr.Interface( fn=process_message, inputs=gr.Textbox(lines=2, placeholder="Enter your message here..."), outputs=gr.Markdown(), title="Multi-Model LLM API", description="Enter a message and get responses from multiple LLMs.", live=False ) if __name__ == "__main__": port = int(os.environ.get("PORT", 7860)) iface.launch(server_port=port)