import gradio as gr import torch import os import numpy as np from groq import Groq import spaces from transformers import AutoModel, AutoTokenizer from diffusers import StableDiffusionXLPipeline, UNet2DConditionModel, EulerDiscreteScheduler from parler_tts import ParlerTTSForConditionalGeneration import soundfile as sf from langchain_community.embeddings import OpenAIEmbeddings from langchain_community.vectorstores import Chroma from langchain.text_splitter import RecursiveCharacterTextSplitter from langchain.chains import RetrievalQA from langchain_community.llms import OpenAI from PIL import Image from decord import VideoReader, cpu import requests from huggingface_hub import hf_hub_download from safetensors.torch import load_file client = Groq(api_key=os.environ.get("GROQ_API_KEY")) MODEL = 'llama3-groq-70b-8192-tool-use-preview' text_model = AutoModel.from_pretrained('openbmb/MiniCPM-V-2', trust_remote_code=True, device_map="auto", torch_dtype=torch.bfloat16) tokenizer = AutoTokenizer.from_pretrained('openbmb/MiniCPM-V-2', trust_remote_code=True) tts_model = ParlerTTSForConditionalGeneration.from_pretrained("parler-tts/parler-tts-large-v1") tts_tokenizer = AutoTokenizer.from_pretrained("parler-tts/parler-tts-large-v1") # Corrected image model and pipeline setup base = "stabilityai/stable-diffusion-xl-base-1.0" repo = "ByteDance/SDXL-Lightning" ckpt = "sdxl_lightning_4step_unet.safetensors" unet = UNet2DConditionModel.from_config(base, subfolder="unet").to("cuda", torch.float16) unet.load_state_dict(load_file(hf_hub_download(repo, ckpt), device="cuda")) image_pipe = StableDiffusionXLPipeline.from_pretrained(base, unet=unet, torch_dtype=torch.float16, variant="fp16").to("cuda") image_pipe.scheduler = EulerDiscreteScheduler.from_config(image_pipe.scheduler.config, timestep_spacing="trailing") # Initialize voice-only mode def play_voice_output(response): description = "Jon's voice is monotone yet slightly fast in delivery, with a very close recording that almost has no background noise." input_ids = tts_tokenizer(description, return_tensors="pt").input_ids.to('cuda') prompt_input_ids = tts_tokenizer(response, return_tensors="pt").input_ids.to('cuda') generation = tts_model.generate(input_ids=input_ids, prompt_input_ids=prompt_input_ids) audio_arr = generation.cpu().numpy().squeeze() sf.write("output.wav", audio_arr, tts_model.config.sampling_rate) return "output.wav" # Web search function def web_search(query): api_key = os.environ.get("BING_API_KEY") search_url = "https://api.bing.microsoft.com/v7.0/search" headers = {"Ocp-Apim-Subscription-Key": api_key} params = {"q": query, "textDecorations": True, "textFormat": "HTML"} response = requests.get(search_url, headers=headers, params=params) response.raise_for_status() search_results = response.json() snippets = [result['snippet'] for result in search_results.get('webPages', {}).get('value', [])] return "\n".join(snippets) # NumPy Calculation function def numpy_calculate(code: str) -> str: try: local_dict = {} exec(code, {"np": np}, local_dict) result = local_dict.get("result", "No result found") return str(result) except Exception as e: return f"An error occurred: {str(e)}" # Function to handle different input types def handle_input(user_prompt, image=None, video=None, audio=None, doc=None): messages = [{"role": "user", "content": user_prompt}] if audio: transcription = client.audio.transcriptions.create( file=(audio.name, audio.read()), model="whisper-large-v3" ) user_prompt = transcription.text if doc: # RAG with Langchain response = use_langchain_rag(doc.name, doc.read(), user_prompt) elif image and not video: image = Image.open(image).convert('RGB') messages[0]['content'] = [image, user_prompt] response = text_model.chat(image=None, msgs=messages, tokenizer=tokenizer) elif video: frames = encode_video(video.name) messages[0]['content'] = frames + [user_prompt] response = text_model.chat(image=None, msgs=messages, tokenizer=tokenizer) else: response = client.chat.completions.create( model=MODEL, messages=messages, tools=initialize_tools() ).choices[0].message.content return response # Function to use Langchain for RAG def use_langchain_rag(file_name, file_content, query): # Split the document into chunks text_splitter = RecursiveCharacterTextSplitter(chunk_size=1000, chunk_overlap=0) docs = text_splitter.create_documents([file_content]) # Create embeddings and store in the vector database embeddings = OpenAIEmbeddings() db = Chroma.from_documents(docs, embeddings, persist_directory=".chroma_db") # Use a persistent directory # Create a question-answering chain qa = RetrievalQA.from_chain_type(llm=OpenAI(), chain_type="stuff", retriever=db.as_retriever()) # Get the answer return qa.run(query) # Function to encode video def encode_video(video_path): MAX_NUM_FRAMES = 64 vr = VideoReader(video_path, ctx=cpu(0)) sample_fps = round(vr.get_avg_fps() / 1) frame_idx = [i for i in range(0, len(vr), sample_fps)] if len(frame_idx) > MAX_NUM_FRAMES: frame_idx = uniform_sample(frame_idx, MAX_NUM_FRAMES) frames = vr.get_batch(frame_idx).asnumpy() frames = [Image.fromarray(v.astype('uint8')) for v in frames] return frames # Initialize tools with web search and NumPy calculation def initialize_tools(): tools = [ { "type": "function", "function": { "name": "calculate", "description": "Evaluate a mathematical expression", "parameters": { "type": "object", "properties": { "expression": {"type": "string", "description": "The mathematical expression to evaluate"} }, "required": ["expression"] }, } }, { "type": "function", "function": { "name": "web_search", "description": "Perform a web search", "parameters": { "type": "object", "properties": { "query": {"type": "string", "description": "The search query"} }, "required": ["query"] }, "implementation": web_search } }, { "type": "function", "function": { "name": "numpy_calculate", "description": "Execute NumPy-based Python code for calculations", "parameters": { "type": "object", "properties": { "code": {"type": "string", "description": "The Python code with NumPy operations"} }, "required": ["code"] }, "implementation": numpy_calculate } } ] return tools @spaces.GPU() def main_interface(user_prompt, image=None, video=None, audio=None, doc=None, voice_only=False): text_model.to(device='cuda', dtype=torch.bfloat16) tts_model.to("cuda") unet.to("cuda", torch.float16) image_pipe.to("cuda") response = handle_input(user_prompt, image=image, video=video, audio=audio, doc=doc) if voice_only: audio_file = play_voice_output(response) return gr.Audio.update(value=audio_file, visible=True) else: return response # Gradio App Setup with gr.Blocks() as demo: user_prompt = gr.Textbox(placeholder="Type your message here...", lines=1) image_input = gr.Image(type="filepath", label="Upload an image") video_input = gr.Video(label="Upload a video") audio_input = gr.Audio(type="filepath", label="Upload audio") doc_input = gr.File(type="filepath", label="Upload a document") voice_only_mode = gr.Checkbox(label="Enable Voice Only Mode") output = gr.Output() submit = gr.Button("Submit") submit.click( fn=main_interface, inputs=[user_prompt, image_input, video_input, audio_input, doc_input, voice_only_mode], outputs=output ) demo.launch(inline=False)