Spaces:
Running
on
Zero
Running
on
Zero
import gradio as gr | |
import torch | |
import os | |
import numpy as np | |
from groq import Groq | |
from transformers import AutoModel, AutoTokenizer | |
from diffusers import StableDiffusionXLPipeline, UNet2DConditionModel, EulerDiscreteScheduler | |
from parler_tts import ParlerTTSForConditionalGeneration | |
import soundfile as sf | |
from llama_index import SimpleDirectoryReader, GPTSimpleVectorIndex, LLMPredictor, ServiceContext | |
from llama_index.langchain_helpers.text_splitter import RecursiveCharacterTextSplitter | |
from langchain import OpenAI | |
from PIL import Image | |
from decord import VideoReader, cpu | |
import requests | |
client = Groq(api_key=os.environ.get("GROQ_API_KEY")) | |
MODEL = 'llama3-groq-70b-8192-tool-use-preview' | |
# Load models for text, speech, and image processing | |
text_model = AutoModel.from_pretrained('openbmb/MiniCPM-V-2_6', trust_remote_code=True, | |
attn_implementation='sdpa', torch_dtype=torch.bfloat16).eval().cuda() | |
tokenizer = AutoTokenizer.from_pretrained('openbmb/MiniCPM-V-2_6', trust_remote_code=True) | |
tts_model = ParlerTTSForConditionalGeneration.from_pretrained("parler-tts/parler-tts-large-v1").to('cuda') | |
tts_tokenizer = AutoTokenizer.from_pretrained("parler-tts/parler-tts-large-v1") | |
image_model = UNet2DConditionModel.from_config("stabilityai/stable-diffusion-xl-base-1.0", subfolder="unet").to("cuda", torch.float16) | |
image_pipe = StableDiffusionXLPipeline.from_pretrained("stabilityai/stable-diffusion-xl-base-1.0", unet=image_model, torch_dtype=torch.float16, variant="fp16").to("cuda") | |
image_pipe.scheduler = EulerDiscreteScheduler.from_config(image_pipe.scheduler.config, timestep_spacing="trailing") | |
# Initialize voice-only mode | |
def play_voice_output(response): | |
description = "Jon's voice is monotone yet slightly fast in delivery, with a very close recording that almost has no background noise." | |
input_ids = tts_tokenizer(description, return_tensors="pt").input_ids.to('cuda') | |
prompt_input_ids = tts_tokenizer(response, return_tensors="pt").input_ids.to('cuda') | |
generation = tts_model.generate(input_ids=input_ids, prompt_input_ids=prompt_input_ids) | |
audio_arr = generation.cpu().numpy().squeeze() | |
sf.write("output.wav", audio_arr, tts_model.config.sampling_rate) | |
return "output.wav" | |
# Web search function | |
def web_search(query): | |
api_key = os.environ.get("BING_API_KEY") | |
search_url = "https://api.bing.microsoft.com/v7.0/search" | |
headers = {"Ocp-Apim-Subscription-Key": api_key} | |
params = {"q": query, "textDecorations": True, "textFormat": "HTML"} | |
response = requests.get(search_url, headers=headers, params=params) | |
response.raise_for_status() | |
search_results = response.json() | |
snippets = [result['snippet'] for result in search_results.get('webPages', {}).get('value', [])] | |
return "\n".join(snippets) | |
# NumPy Calculation function | |
def numpy_calculate(code: str) -> str: | |
try: | |
local_dict = {} | |
exec(code, {"np": np}, local_dict) | |
result = local_dict.get("result", "No result found") | |
return str(result) | |
except Exception as e: | |
return f"An error occurred: {str(e)}" | |
# Function to handle different input types | |
def handle_input(user_prompt, image=None, video=None, audio=None, doc=None): | |
messages = [{"role": "user", "content": user_prompt}] | |
if audio: | |
transcription = client.audio.transcriptions.create( | |
file=(audio.name, audio.read()), | |
model="whisper-large-v3" | |
) | |
user_prompt = transcription.text | |
if doc: | |
index = create_rag_index(doc.name, doc.read()) | |
response = index.query(user_prompt) | |
elif image and not video: | |
image = Image.open(image).convert('RGB') | |
messages[0]['content'] = [image, user_prompt] | |
response = text_model.chat(image=None, msgs=messages, tokenizer=tokenizer) | |
elif video: | |
frames = encode_video(video.name) | |
messages[0]['content'] = frames + [user_prompt] | |
response = text_model.chat(image=None, msgs=messages, tokenizer=tokenizer) | |
else: | |
response = client.chat.completions.create( | |
model=MODEL, | |
messages=messages, | |
tools=initialize_tools() | |
).choices[0].message.content | |
return response | |
# Function to create RAG index using LlamaIndex or Langchain | |
def create_rag_index(file_name, file_content): | |
docs = SimpleDirectoryReader(file_name, file_content).load_data() | |
service_context = ServiceContext.from_defaults(llm_predictor=LLMPredictor(llm=OpenAI(temperature=0))) | |
index = GPTSimpleVectorIndex.from_documents(docs, service_context=service_context) | |
return index | |
# Function to encode video | |
def encode_video(video_path): | |
MAX_NUM_FRAMES = 64 | |
vr = VideoReader(video_path, ctx=cpu(0)) | |
sample_fps = round(vr.get_avg_fps() / 1) | |
frame_idx = [i for i in range(0, len(vr), sample_fps)] | |
if len(frame_idx) > MAX_NUM_FRAMES: | |
frame_idx = uniform_sample(frame_idx, MAX_NUM_FRAMES) | |
frames = vr.get_batch(frame_idx).asnumpy() | |
frames = [Image.fromarray(v.astype('uint8')) for v in frames] | |
return frames | |
# Initialize tools with web search and NumPy calculation | |
def initialize_tools(): | |
tools = [ | |
{ | |
"type": "function", | |
"function": { | |
"name": "calculate", | |
"description": "Evaluate a mathematical expression", | |
"parameters": { | |
"type": "object", | |
"properties": { | |
"expression": {"type": "string", "description": "The mathematical expression to evaluate"} | |
}, | |
"required": ["expression"] | |
}, | |
} | |
}, | |
{ | |
"type": "function", | |
"function": { | |
"name": "web_search", | |
"description": "Perform a web search", | |
"parameters": { | |
"type": "object", | |
"properties": { | |
"query": {"type": "string", "description": "The search query"} | |
}, | |
"required": ["query"] | |
}, | |
"implementation": web_search | |
} | |
}, | |
{ | |
"type": "function", | |
"function": { | |
"name": "numpy_calculate", | |
"description": "Execute NumPy-based Python code for calculations", | |
"parameters": { | |
"type": "object", | |
"properties": { | |
"code": {"type": "string", "description": "The Python code with NumPy operations"} | |
}, | |
"required": ["code"] | |
}, | |
"implementation": numpy_calculate | |
} | |
} | |
] | |
return tools | |
# Gradio Interface | |
def main_interface(user_prompt, image=None, video=None, audio=None, doc=None, voice_only=False): | |
response = handle_input(user_prompt, image=image, video=video, audio=audio, doc=doc) | |
if voice_only: | |
audio_file = play_voice_output(response) | |
return gr.Audio.update(value=audio_file, visible=True) | |
else: | |
return response | |
# Gradio App Setup | |
with gr.Blocks() as demo: | |
user_prompt = gr.Textbox(placeholder="Type your message here...", lines=1) | |
image_input = gr.Image(type="file", label="Upload an image") | |
video_input = gr.Video(type="file", label="Upload a video") | |
audio_input = gr.Audio(type="file", label="Upload audio") | |
doc_input = gr.File(type="file", label="Upload a document") | |
voice_only_mode = gr.Checkbox(label="Enable Voice Only Mode") | |
output = gr.Output() | |
submit = gr.Button("Submit") | |
submit.click( | |
fn=main_interface, | |
inputs=[user_prompt, image_input, video_input, audio_input, doc_input, voice_only_mode], | |
outputs=output | |
) | |
demo.launch(inline=False) |