VanguardAI's picture
Update app.py
2e5cfb3 verified
raw
history blame
8.52 kB
import gradio as gr
import torch
import os
import numpy as np
from groq import Groq
import spaces
from transformers import AutoModel, AutoTokenizer
from diffusers import StableDiffusionXLPipeline, UNet2DConditionModel, EulerDiscreteScheduler
from parler_tts import ParlerTTSForConditionalGeneration
import soundfile as sf
from langchain_community.embeddings import OpenAIEmbeddings
from langchain_community.vectorstores import Chroma
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain.chains import RetrievalQA
from langchain_community.llms import OpenAI
from PIL import Image
from decord import VideoReader, cpu
import requests
from huggingface_hub import hf_hub_download
from safetensors.torch import load_file
client = Groq(api_key=os.environ.get("GROQ_API_KEY"))
MODEL = 'llama3-groq-70b-8192-tool-use-preview'
text_model = AutoModel.from_pretrained('openbmb/MiniCPM-V-2', trust_remote_code=True,
device_map="auto", torch_dtype=torch.bfloat16)
tokenizer = AutoTokenizer.from_pretrained('openbmb/MiniCPM-V-2', trust_remote_code=True)
tts_model = ParlerTTSForConditionalGeneration.from_pretrained("parler-tts/parler-tts-large-v1")
tts_tokenizer = AutoTokenizer.from_pretrained("parler-tts/parler-tts-large-v1")
# Corrected image model and pipeline setup
base = "stabilityai/stable-diffusion-xl-base-1.0"
repo = "ByteDance/SDXL-Lightning"
ckpt = "sdxl_lightning_4step_unet.safetensors"
unet = UNet2DConditionModel.from_config(base, subfolder="unet").to("cuda", torch.float16)
unet.load_state_dict(load_file(hf_hub_download(repo, ckpt), device="cuda"))
image_pipe = StableDiffusionXLPipeline.from_pretrained(base, unet=unet, torch_dtype=torch.float16, variant="fp16").to("cuda")
image_pipe.scheduler = EulerDiscreteScheduler.from_config(image_pipe.scheduler.config, timestep_spacing="trailing")
# Initialize voice-only mode
def play_voice_output(response):
description = "Jon's voice is monotone yet slightly fast in delivery, with a very close recording that almost has no background noise."
input_ids = tts_tokenizer(description, return_tensors="pt").input_ids.to('cuda')
prompt_input_ids = tts_tokenizer(response, return_tensors="pt").input_ids.to('cuda')
generation = tts_model.generate(input_ids=input_ids, prompt_input_ids=prompt_input_ids)
audio_arr = generation.cpu().numpy().squeeze()
sf.write("output.wav", audio_arr, tts_model.config.sampling_rate)
return "output.wav"
# Web search function
def web_search(query):
api_key = os.environ.get("BING_API_KEY")
search_url = "https://api.bing.microsoft.com/v7.0/search"
headers = {"Ocp-Apim-Subscription-Key": api_key}
params = {"q": query, "textDecorations": True, "textFormat": "HTML"}
response = requests.get(search_url, headers=headers, params=params)
response.raise_for_status()
search_results = response.json()
snippets = [result['snippet'] for result in search_results.get('webPages', {}).get('value', [])]
return "\n".join(snippets)
# NumPy Calculation function
def numpy_calculate(code: str) -> str:
try:
local_dict = {}
exec(code, {"np": np}, local_dict)
result = local_dict.get("result", "No result found")
return str(result)
except Exception as e:
return f"An error occurred: {str(e)}"
# Function to handle different input types
def handle_input(user_prompt, image=None, video=None, audio=None, doc=None):
messages = [{"role": "user", "content": user_prompt}]
if audio:
transcription = client.audio.transcriptions.create(
file=(audio.name, audio.read()),
model="whisper-large-v3"
)
user_prompt = transcription.text
if doc:
# RAG with Langchain
response = use_langchain_rag(doc.name, doc.read(), user_prompt)
elif image and not video:
image = Image.open(image).convert('RGB')
messages[0]['content'] = [image, user_prompt]
response = text_model.chat(image=None, msgs=messages, tokenizer=tokenizer)
elif video:
frames = encode_video(video.name)
messages[0]['content'] = frames + [user_prompt]
response = text_model.chat(image=None, msgs=messages, tokenizer=tokenizer)
else:
response = client.chat.completions.create(
model=MODEL,
messages=messages,
tools=initialize_tools()
).choices[0].message.content
return response
# Function to use Langchain for RAG
def use_langchain_rag(file_name, file_content, query):
# Split the document into chunks
text_splitter = RecursiveCharacterTextSplitter(chunk_size=1000, chunk_overlap=0)
docs = text_splitter.create_documents([file_content])
# Create embeddings and store in the vector database
embeddings = OpenAIEmbeddings()
db = Chroma.from_documents(docs, embeddings, persist_directory=".chroma_db") # Use a persistent directory
# Create a question-answering chain
qa = RetrievalQA.from_chain_type(llm=OpenAI(), chain_type="stuff", retriever=db.as_retriever())
# Get the answer
return qa.run(query)
# Function to encode video
def encode_video(video_path):
MAX_NUM_FRAMES = 64
vr = VideoReader(video_path, ctx=cpu(0))
sample_fps = round(vr.get_avg_fps() / 1)
frame_idx = [i for i in range(0, len(vr), sample_fps)]
if len(frame_idx) > MAX_NUM_FRAMES:
frame_idx = uniform_sample(frame_idx, MAX_NUM_FRAMES)
frames = vr.get_batch(frame_idx).asnumpy()
frames = [Image.fromarray(v.astype('uint8')) for v in frames]
return frames
# Initialize tools with web search and NumPy calculation
def initialize_tools():
tools = [
{
"type": "function",
"function": {
"name": "calculate",
"description": "Evaluate a mathematical expression",
"parameters": {
"type": "object",
"properties": {
"expression": {"type": "string", "description": "The mathematical expression to evaluate"}
},
"required": ["expression"]
},
}
},
{
"type": "function",
"function": {
"name": "web_search",
"description": "Perform a web search",
"parameters": {
"type": "object",
"properties": {
"query": {"type": "string", "description": "The search query"}
},
"required": ["query"]
},
"implementation": web_search
}
},
{
"type": "function",
"function": {
"name": "numpy_calculate",
"description": "Execute NumPy-based Python code for calculations",
"parameters": {
"type": "object",
"properties": {
"code": {"type": "string", "description": "The Python code with NumPy operations"}
},
"required": ["code"]
},
"implementation": numpy_calculate
}
}
]
return tools
@spaces.GPU()
def main_interface(user_prompt, image=None, video=None, audio=None, doc=None, voice_only=False):
text_model.to(device='cuda', dtype=torch.bfloat16)
tts_model.to("cuda")
unet.to("cuda", torch.float16)
image_pipe.to("cuda")
response = handle_input(user_prompt, image=image, video=video, audio=audio, doc=doc)
if voice_only:
audio_file = play_voice_output(response)
return gr.Audio.update(value=audio_file, visible=True)
else:
return response
# Gradio App Setup
with gr.Blocks() as demo:
user_prompt = gr.Textbox(placeholder="Type your message here...", lines=1)
image_input = gr.Image(type="filepath", label="Upload an image")
video_input = gr.Video(label="Upload a video")
audio_input = gr.Audio(type="filepath", label="Upload audio")
doc_input = gr.File(type="filepath", label="Upload a document")
voice_only_mode = gr.Checkbox(label="Enable Voice Only Mode")
output = gr.Output()
submit = gr.Button("Submit")
submit.click(
fn=main_interface,
inputs=[user_prompt, image_input, video_input, audio_input, doc_input, voice_only_mode],
outputs=output
)
demo.launch(inline=False)