Spaces:
Sleeping
Sleeping
import base64 | |
import json | |
from datetime import datetime | |
import gradio as gr | |
import torch | |
import spaces | |
from PIL import Image, ImageDraw | |
from qwen_vl_utils import process_vision_info | |
from transformers import Qwen2VLForConditionalGeneration, AutoProcessor | |
import ast | |
import os | |
import numpy as np | |
from huggingface_hub import hf_hub_download, list_repo_files | |
import boto3 | |
from botocore.exceptions import NoCredentialsError | |
# Define constants | |
DESCRIPTION = "[ShowUI Demo](https://huggingface.co./showlab/ShowUI-2B)" | |
_SYSTEM = "Based on the screenshot of the page, I give a text description and you give its corresponding location. The coordinate represents a clickable location [x, y] for an element, which is a relative coordinate on the screenshot, scaled from 0 to 1." | |
MIN_PIXELS = 256 * 28 * 28 | |
MAX_PIXELS = 1344 * 28 * 28 | |
# Specify the model repository and destination folder | |
model_repo = "showlab/ShowUI-2B" | |
destination_folder = "./showui-2b" | |
# Ensure the destination folder exists | |
os.makedirs(destination_folder, exist_ok=True) | |
# List all files in the repository | |
files = list_repo_files(repo_id=model_repo) | |
# Download each file to the destination folder | |
for file in files: | |
file_path = hf_hub_download(repo_id=model_repo, filename=file, local_dir=destination_folder) | |
print(f"Downloaded {file} to {file_path}") | |
model = Qwen2VLForConditionalGeneration.from_pretrained( | |
destination_folder, | |
torch_dtype=torch.bfloat16, | |
device_map="cpu", | |
) | |
# Load the processor | |
processor = AutoProcessor.from_pretrained("Qwen/Qwen2-VL-2B-Instruct", min_pixels=MIN_PIXELS, max_pixels=MAX_PIXELS) | |
# Helper functions | |
def draw_point(image_input, point=None, radius=5): | |
"""Draw a point on the image.""" | |
if isinstance(image_input, str): | |
image = Image.open(image_input) | |
else: | |
image = Image.fromarray(np.uint8(image_input)) | |
if point: | |
x, y = point[0] * image.width, point[1] * image.height | |
ImageDraw.Draw(image).ellipse((x - radius, y - radius, x + radius, y + radius), fill='red') | |
return image | |
def array_to_image_path(image_array, session_id): | |
"""Save the uploaded image and return its path.""" | |
if image_array is None: | |
raise ValueError("No image provided. Please upload an image before submitting.") | |
img = Image.fromarray(np.uint8(image_array)) | |
filename = f"{session_id}.png" | |
img.save(filename) | |
return os.path.abspath(filename) | |
def upload_to_s3(file_name, bucket, object_name=None): | |
"""Upload a file to an S3 bucket.""" | |
if object_name is None: | |
object_name = file_name | |
s3 = boto3.client('s3') | |
try: | |
s3.upload_file(file_name, bucket, object_name) | |
print(f"Uploaded {file_name} to {bucket}/{object_name}.") | |
return True | |
except FileNotFoundError: | |
print(f"The file {file_name} was not found.") | |
return False | |
except NoCredentialsError: | |
print("Credentials not available.") | |
return False | |
def run_showui(image, query, session_id): | |
"""Main function for inference.""" | |
image_path = array_to_image_path(image, session_id) | |
messages = [ | |
{ | |
"role": "user", | |
"content": [ | |
{"type": "text", "text": _SYSTEM}, | |
{"type": "image", "image": image_path, "min_pixels": MIN_PIXELS, "max_pixels": MAX_PIXELS}, | |
{"type": "text", "text": query} | |
], | |
} | |
] | |
global model | |
model = model.to("cuda") | |
text = processor.apply_chat_template(messages, tokenize=False, add_generation_prompt=True) | |
image_inputs, video_inputs = process_vision_info(messages) | |
inputs = processor( | |
text=[text], | |
images=image_inputs, | |
videos=video_inputs, | |
padding=True, | |
return_tensors="pt" | |
) | |
inputs = inputs.to("cuda") | |
generated_ids = model.generate(**inputs, max_new_tokens=128) | |
generated_ids_trimmed = [ | |
out_ids[len(in_ids):] for in_ids, out_ids in zip(inputs.input_ids, generated_ids) | |
] | |
output_text = processor.batch_decode( | |
generated_ids_trimmed, skip_special_tokens=True, clean_up_tokenization_spaces=False | |
)[0] | |
click_xy = ast.literal_eval(output_text) | |
result_image = draw_point(image_path, click_xy, radius=10) | |
return result_image, str(click_xy), image_path | |
def save_and_upload_data(image_path, query, session_id, is_example_image, votes=None): | |
"""Save the data to a JSON file and upload to S3.""" | |
if is_example_image: | |
print("Example image used. Skipping upload.") | |
return | |
votes = votes or {"upvotes": 0, "downvotes": 0} | |
data = { | |
"image_path": image_path, | |
"query": query, | |
"votes": votes, | |
"timestamp": datetime.now().isoformat() | |
} | |
local_file_name = f"{session_id}.json" | |
with open(local_file_name, "w") as f: | |
json.dump(data, f) | |
upload_to_s3(local_file_name, 'altair.storage', object_name=f"ootb/{local_file_name}") | |
upload_to_s3(image_path, 'altair.storage', object_name=f"ootb/{os.path.basename(image_path)}") | |
return data | |
def update_vote(vote_type, session_id, is_example_image): | |
"""Update the vote count and re-upload the JSON file.""" | |
if is_example_image: | |
print("Example image used. Skipping vote update.") | |
return "Example image used. No vote recorded." | |
local_file_name = f"{session_id}.json" | |
with open(local_file_name, "r") as f: | |
data = json.load(f) | |
if vote_type == "upvote": | |
data["votes"]["upvotes"] += 1 | |
elif vote_type == "downvote": | |
data["votes"]["downvotes"] += 1 | |
with open(local_file_name, "w") as f: | |
json.dump(data, f) | |
upload_to_s3(local_file_name, 'altair.storage', object_name=f"ootb/{local_file_name}") | |
return f"Your {vote_type} has been recorded. Thank you!" | |
with open("./assets/showui.png", "rb") as image_file: | |
base64_image = base64.b64encode(image_file.read()).decode("utf-8") | |
def build_demo(embed_mode, concurrency_count=1): | |
with gr.Blocks(title="ShowUI Demo", theme=gr.themes.Default()) as demo: | |
state_image_path = gr.State(value=None) | |
state_session_id = gr.State(value=None) | |
state_is_example_image = gr.State(value=False) | |
if not embed_mode: | |
gr.HTML( | |
f""" | |
<div style="text-align: center; margin-bottom: 20px;"> | |
<div style="display: flex; justify-content: center;"> | |
<img src="data:image/png;base64,{base64_image}" alt="ShowUI" width="320" style="margin-bottom: 10px;"/> | |
</div> | |
<p>ShowUI is a lightweight vision-language-action model for GUI agents.</p> | |
<div style="display: flex; justify-content: center; gap: 15px; font-size: 20px;"> | |
<a href="https://huggingface.co./showlab/ShowUI-2B" target="_blank"> | |
<img src="https://img.shields.io/badge/%F0%9F%A4%97%20Hugging%20Face-ShowUI--2B-blue" alt="model"/> | |
</a> | |
<a href="https://arxiv.org/abs/2411.17465" target="_blank"> | |
<img src="https://img.shields.io/badge/arXiv%20paper-2411.17465-b31b1b.svg" alt="arXiv"/> | |
</a> | |
<a href="https://github.com/showlab/ShowUI" target="_blank"> | |
<img src="https://img.shields.io/badge/GitHub-ShowUI-black" alt="GitHub"/> | |
</a> | |
</div> | |
</div> | |
""" | |
) | |
with gr.Row(): | |
with gr.Column(scale=3): | |
imagebox = gr.Image(type="numpy", label="Input Screenshot") | |
textbox = gr.Textbox( | |
show_label=True, | |
placeholder="Enter a query (e.g., 'Click Nahant')", | |
label="Query", | |
) | |
submit_btn = gr.Button(value="Submit", variant="primary") | |
gr.Examples( | |
examples=[ | |
["./examples/app_store.png", "Download Kindle."], | |
["./examples/ios_setting.png", "Turn off Do not disturb."], | |
["./examples/apple_music.png", "Star to favorite."], | |
["./examples/map.png", "Boston."], | |
["./examples/wallet.png", "Scan a QR code."], | |
["./examples/word.png", "More shapes."], | |
["./examples/web_shopping.png", "Proceed to checkout."], | |
["./examples/web_forum.png", "Post my comment."], | |
["./examples/safari_google.png", "Click on search bar."], | |
], | |
inputs=[imagebox, textbox], | |
examples_per_page=3 | |
) | |
with gr.Column(scale=8): | |
output_img = gr.Image(type="pil", label="Output Image") | |
gr.HTML( | |
""" | |
<p><strong>Note:</strong> The <span style="color: red;">red point</span> on the output image represents the predicted clickable coordinates.</p> | |
""" | |
) | |
output_coords = gr.Textbox(label="Clickable Coordinates") | |
with gr.Row(elem_id="action-buttons", equal_height=True): | |
upvote_btn = gr.Button(value="Looks good!", variant="secondary") | |
downvote_btn = gr.Button(value="Too bad!", variant="secondary") | |
clear_btn = gr.Button(value="🗑️ Clear", interactive=True) | |
def on_submit(image, query): | |
if image is None: | |
raise ValueError("No image provided. Please upload an image before submitting.") | |
session_id = datetime.now().strftime("%Y%m%d_%H%M%S") | |
is_example_image = isinstance(image, str) and image.startswith("./examples/") | |
result_image, click_coords, image_path = run_showui(image, query, session_id) | |
save_and_upload_data(image_path, query, session_id, is_example_image) | |
return result_image, click_coords, image_path, session_id, is_example_image | |
submit_btn.click( | |
on_submit, | |
[imagebox, textbox], | |
[output_img, output_coords, state_image_path, state_session_id, state_is_example_image], | |
) | |
clear_btn.click( | |
lambda: (None, None, None, None, None), | |
inputs=None, | |
outputs=[imagebox, textbox, output_img, output_coords, state_image_path, state_session_id, state_is_example_image], | |
queue=False | |
) | |
upvote_btn.click( | |
lambda session_id, is_example_image: update_vote("upvote", session_id, is_example_image), | |
inputs=[state_session_id, state_is_example_image], | |
outputs=[], | |
queue=False | |
) | |
downvote_btn.click( | |
lambda session_id, is_example_image: update_vote("downvote", session_id, is_example_image), | |
inputs=[state_session_id, state_is_example_image], | |
outputs=[], | |
queue=False | |
) | |
return demo | |
if __name__ == "__main__": | |
demo = build_demo(embed_mode=False) | |
demo.queue(api_open=False).launch( | |
server_name="0.0.0.0", | |
server_port=7860, | |
ssr_mode=False, | |
debug=True, | |
) | |