import gradio as gr import torch import os import numpy as np from groq import Groq from transformers import AutoModel, AutoTokenizer, BitsAndBytesConfig from diffusers import StableDiffusionXLPipeline, UNet2DConditionModel, EulerDiscreteScheduler from parler_tts import ParlerTTSForConditionalGeneration import soundfile as sf from langchain.embeddings import OpenAIEmbeddings from langchain.vectorstores import Chroma from langchain.text_splitter import RecursiveCharacterTextSplitter from langchain.chains import RetrievalQA from langchain.llms import OpenAI from PIL import Image from decord import VideoReader, cpu import requests client = Groq(api_key=os.environ.get("GROQ_API_KEY")) MODEL = 'llama3-groq-70b-8192-tool-use-preview' # Configure transformers to load the model with 4-bit quantization bnb_config = BitsAndBytesConfig( load_in_4bit=True, bnb_4bit_use_double_quant=True, bnb_4bit_quant_type="nf4", bnb_4bit_compute_dtype=torch.bfloat16 ) # Load models for text, speech, and image processing text_model = AutoModel.from_pretrained('openbmb/MiniCPM-V-2_6', trust_remote_code=True, quantization_config=bnb_config, device_map="auto") tokenizer = AutoTokenizer.from_pretrained('openbmb/MiniCPM-V-2_6', trust_remote_code=True) tts_model = ParlerTTSForConditionalGeneration.from_pretrained("parler-tts/parler-tts-large-v1").to('cuda') tts_tokenizer = AutoTokenizer.from_pretrained("parler-tts/parler-tts-large-v1") image_model = UNet2DConditionModel.from_config("stabilityai/stable-diffusion-xl-base-1.0", subfolder="unet").to("cuda", torch.float16) image_pipe = StableDiffusionXLPipeline.from_pretrained("stabilityai/stable-diffusion-xl-base-1.0", unet=image_model, torch_dtype=torch.float16, variant="fp16").to("cuda") image_pipe.scheduler = EulerDiscreteScheduler.from_config(image_pipe.scheduler.config, timestep_spacing="trailing") # Initialize voice-only mode def play_voice_output(response): description = "Jon's voice is monotone yet slightly fast in delivery, with a very close recording that almost has no background noise." input_ids = tts_tokenizer(description, return_tensors="pt").input_ids.to('cuda') prompt_input_ids = tts_tokenizer(response, return_tensors="pt").input_ids.to('cuda') generation = tts_model.generate(input_ids=input_ids, prompt_input_ids=prompt_input_ids) audio_arr = generation.cpu().numpy().squeeze() sf.write("output.wav", audio_arr, tts_model.config.sampling_rate) return "output.wav" # Web search function def web_search(query): api_key = os.environ.get("BING_API_KEY") search_url = "https://api.bing.microsoft.com/v7.0/search" headers = {"Ocp-Apim-Subscription-Key": api_key} params = {"q": query, "textDecorations": True, "textFormat": "HTML"} response = requests.get(search_url, headers=headers, params=params) response.raise_for_status() search_results = response.json() snippets = [result['snippet'] for result in search_results.get('webPages', {}).get('value', [])] return "\n".join(snippets) # NumPy Calculation function def numpy_calculate(code: str) -> str: try: local_dict = {} exec(code, {"np": np}, local_dict) result = local_dict.get("result", "No result found") return str(result) except Exception as e: return f"An error occurred: {str(e)}" # Function to handle different input types def handle_input(user_prompt, image=None, video=None, audio=None, doc=None): messages = [{"role": "user", "content": user_prompt}] if audio: transcription = client.audio.transcriptions.create( file=(audio.name, audio.read()), model="whisper-large-v3" ) user_prompt = transcription.text if doc: # RAG with Langchain response = use_langchain_rag(doc.name, doc.read(), user_prompt) elif image and not video: image = Image.open(image).convert('RGB') messages[0]['content'] = [image, user_prompt] response = text_model.chat(image=None, msgs=messages, tokenizer=tokenizer) elif video: frames = encode_video(video.name) messages[0]['content'] = frames + [user_prompt] response = text_model.chat(image=None, msgs=messages, tokenizer=tokenizer) else: response = client.chat.completions.create( model=MODEL, messages=messages, tools=initialize_tools() ).choices[0].message.content return response # Function to use Langchain for RAG def use_langchain_rag(file_name, file_content, query): # Split the document into chunks text_splitter = RecursiveCharacterTextSplitter(chunk_size=1000, chunk_overlap=0) docs = text_splitter.create_documents([file_content]) # Create embeddings and store in the vector database embeddings = OpenAIEmbeddings() db = Chroma.from_documents(docs, embeddings, persist_directory=".chroma_db") # Use a persistent directory # Create a question-answering chain qa = RetrievalQA.from_chain_type(llm=OpenAI(), chain_type="stuff", retriever=db.as_retriever()) # Get the answer return qa.run(query) # Function to encode video def encode_video(video_path): MAX_NUM_FRAMES = 64 vr = VideoReader(video_path, ctx=cpu(0)) sample_fps = round(vr.get_avg_fps() / 1) frame_idx = [i for i in range(0, len(vr), sample_fps)] if len(frame_idx) > MAX_NUM_FRAMES: frame_idx = uniform_sample(frame_idx, MAX_NUM_FRAMES) frames = vr.get_batch(frame_idx).asnumpy() frames = [Image.fromarray(v.astype('uint8')) for v in frames] return frames # Initialize tools with web search and NumPy calculation def initialize_tools(): tools = [ { "type": "function", "function": { "name": "calculate", "description": "Evaluate a mathematical expression", "parameters": { "type": "object", "properties": { "expression": {"type": "string", "description": "The mathematical expression to evaluate"} }, "required": ["expression"] }, } }, { "type": "function", "function": { "name": "web_search", "description": "Perform a web search", "parameters": { "type": "object", "properties": { "query": {"type": "string", "description": "The search query"} }, "required": ["query"] }, "implementation": web_search } }, { "type": "function", "function": { "name": "numpy_calculate", "description": "Execute NumPy-based Python code for calculations", "parameters": { "type": "object", "properties": { "code": {"type": "string", "description": "The Python code with NumPy operations"} }, "required": ["code"] }, "implementation": numpy_calculate } } ] return tools @spaces.GPU() # Gradio Interface def main_interface(user_prompt, image=None, video=None, audio=None, doc=None, voice_only=False): response = handle_input(user_prompt, image=image, video=video, audio=audio, doc=doc) if voice_only: audio_file = play_voice_output(response) return gr.Audio.update(value=audio_file, visible=True) else: return response # Gradio App Setup with gr.Blocks() as demo: user_prompt = gr.Textbox(placeholder="Type your message here...", lines=1) image_input = gr.Image(type="file", label="Upload an image") video_input = gr.Video(type="file", label="Upload a video") audio_input = gr.Audio(type="file", label="Upload audio") doc_input = gr.File(type="file", label="Upload a document") voice_only_mode = gr.Checkbox(label="Enable Voice Only Mode") output = gr.Output() submit = gr.Button("Submit") submit.click( fn=main_interface, inputs=[user_prompt, image_input, video_input, audio_input, doc_input, voice_only_mode], outputs=output ) demo.launch(inline=False)