jw2yang commited on
Commit
54f748c
·
1 Parent(s): 34f3c9b
Files changed (5) hide show
  1. app_1p.py +0 -213
  2. vlms/llavanext.py +0 -43
  3. vlms/llavaov.py +0 -44
  4. vlms/magma.py +0 -40
  5. vlms/qwen2vl.py +0 -59
app_1p.py DELETED
@@ -1,213 +0,0 @@
1
- import os
2
- # add a command for installing flash-attn
3
- os.system('pip install flash-attn --no-build-isolation')
4
- os.system("pip install gradio==4.44.1")
5
-
6
- import pygame
7
- import numpy as np
8
- import gradio as gr
9
- import time
10
- import torch
11
- from PIL import Image
12
- from transformers import AutoModelForCausalLM, AutoProcessor
13
- import re
14
- import random
15
-
16
- pygame.mixer.quit() # Disable sound
17
-
18
- # Constants
19
- WIDTH, HEIGHT = 800, 800
20
- GRID_SIZE = 80
21
- WHITE = (255, 255, 255)
22
- GREEN = (34, 139, 34) # Forest green - more like an apple
23
- RED = (200, 50, 50)
24
- BLACK = (0, 0, 0)
25
- GRAY = (128, 128, 128)
26
- YELLOW = (218, 165, 32) # Golden yellow color
27
-
28
- # Directions
29
- UP = (0, -1)
30
- DOWN = (0, 1)
31
- LEFT = (-1, 0)
32
- RIGHT = (1, 0)
33
- STATIC = (0, 0)
34
-
35
- ACTIONS = ["up", "down", "left", "right", "static"]
36
-
37
- # Load AI Model
38
- dtype = torch.bfloat16
39
- magma_model_id = "microsoft/Magma-8B"
40
- magam_model = AutoModelForCausalLM.from_pretrained(magma_model_id, trust_remote_code=True, torch_dtype=dtype)
41
- magma_processor = AutoProcessor.from_pretrained(magma_model_id, trust_remote_code=True)
42
- magam_model.to("cuda")
43
-
44
- magma_img = pygame.image.load("./assets/images/magma_game_thin.png")
45
- magma_img = pygame.transform.scale(magma_img, (GRID_SIZE, GRID_SIZE))
46
-
47
- class MagmaFindGPU:
48
- def __init__(self):
49
- self.reset()
50
- self.step_count = 0
51
-
52
- def reset(self):
53
- self.snake = [(5, 5)]
54
- self.direction = RIGHT
55
- self.score = 0
56
- self.game_over = False
57
- self.step_count = 0
58
- self.place_target()
59
-
60
- def place_target(self):
61
- while True:
62
- target_x = np.random.randint(1, WIDTH // GRID_SIZE - 1)
63
- target_y = np.random.randint(1, HEIGHT // GRID_SIZE - 1)
64
- if (target_x, target_y) not in self.snake:
65
- self.target = (target_x, target_y)
66
- break
67
-
68
- def step(self, action):
69
- if action == "up":
70
- self.direction = UP
71
- elif action == "down":
72
- self.direction = DOWN
73
- elif action == "left":
74
- self.direction = LEFT
75
- elif action == "right":
76
- self.direction = RIGHT
77
- elif action == "static":
78
- self.direction = STATIC
79
-
80
- if self.game_over:
81
- self.reset()
82
- return self.render(), self.score
83
-
84
- new_head = (self.snake[0][0] + self.direction[0], self.snake[0][1] + self.direction[1])
85
-
86
- if new_head[0] < 0 or new_head[1] < 0 or new_head[0] >= WIDTH // GRID_SIZE or new_head[1] >= HEIGHT // GRID_SIZE:
87
- self.game_over = True
88
- return self.render(), self.score
89
-
90
- self.snake = [new_head] # Keep only the head (single block snake)
91
- self.step_count += 1
92
-
93
- # Check if the target is covered by four surrounding squares
94
- head_x, head_y = self.snake[0]
95
- neighbors = set([(head_x, head_y - 1), (head_x, head_y + 1), (head_x - 1, head_y), (head_x + 1, head_y)])
96
-
97
- if neighbors.issuperset(set([self.target])):
98
- self.score += 1
99
- self.place_target()
100
-
101
- return self.render(), self.score
102
-
103
- def render(self):
104
- pygame.init()
105
- surface = pygame.Surface((WIDTH, HEIGHT))
106
- surface.fill(BLACK)
107
-
108
- head_x, head_y = self.snake[0]
109
- surface.blit(magma_img, (head_x * GRID_SIZE, head_y * GRID_SIZE))
110
-
111
- # pygame.draw.rect(surface, RED, (self.snake[0][0] * GRID_SIZE, self.snake[0][1] * GRID_SIZE, GRID_SIZE, GRID_SIZE))
112
- pygame.draw.rect(surface, GREEN, (self.target[0] * GRID_SIZE, self.target[1] * GRID_SIZE, GRID_SIZE, GRID_SIZE))
113
-
114
- # Draw four surrounding squares with labels
115
- head_x, head_y = self.snake[0]
116
- neighbors = [(head_x, head_y - 1), (head_x, head_y + 1), (head_x - 1, head_y), (head_x + 1, head_y)]
117
- labels = ["1", "2", "3", "4"]
118
- font = pygame.font.Font(None, 48)
119
-
120
- # clone surface
121
- surface_nomark = surface.copy()
122
- for i, (nx, ny) in enumerate(neighbors):
123
- if 0 <= nx < WIDTH // GRID_SIZE and 0 <= ny < HEIGHT // GRID_SIZE:
124
- pygame.draw.rect(surface, RED, (nx * GRID_SIZE, ny * GRID_SIZE, GRID_SIZE, GRID_SIZE), GRID_SIZE)
125
- # pygame.draw.rect(surface_nomark, RED, (nx * GRID_SIZE, ny * GRID_SIZE, GRID_SIZE, GRID_SIZE), GRID_SIZE)
126
-
127
- text = font.render(labels[i], True, WHITE)
128
- text_rect = text.get_rect(center=(nx * GRID_SIZE + GRID_SIZE // 2, ny * GRID_SIZE + GRID_SIZE // 2))
129
- surface.blit(text, text_rect)
130
-
131
- return np.array(pygame.surfarray.array3d(surface_nomark)).swapaxes(0, 1), np.array(pygame.surfarray.array3d(surface)).swapaxes(0, 1)
132
-
133
- def get_state(self):
134
- return self.render()
135
-
136
- game = MagmaFindGPU()
137
-
138
- def play_game():
139
- state, state_som = game.get_state()
140
- pil_img = Image.fromarray(state_som)
141
- convs = [
142
- {"role": "system", "content": "You are an agent that can see, talk, and act. Avoid hitting the wall."},
143
- {"role": "user", "content": "<image_start><image><image_end>\nWhich mark is closer to green block? Answer with a single number."},
144
- ]
145
- prompt = magma_processor.tokenizer.apply_chat_template(convs, tokenize=False, add_generation_prompt=True)
146
- inputs = magma_processor(images=[pil_img], texts=prompt, return_tensors="pt")
147
- inputs['pixel_values'] = inputs['pixel_values'].unsqueeze(0)
148
- inputs['image_sizes'] = inputs['image_sizes'].unsqueeze(0)
149
- inputs = inputs.to("cuda").to(dtype)
150
- generation_args = {
151
- "max_new_tokens": 10,
152
- "temperature": 0.3,
153
- "do_sample": True,
154
- "use_cache": True,
155
- "num_beams": 1,
156
- }
157
- with torch.inference_mode():
158
- generate_ids = magam_model.generate(**inputs, **generation_args)
159
- generate_ids = generate_ids[:, inputs["input_ids"].shape[-1] :]
160
- action = magma_processor.decode(generate_ids[0], skip_special_tokens=True).strip()
161
- # extract mark id fro action use re
162
- match = re.search(r'\d+', action)
163
- if match:
164
- action = match.group(0)
165
- if action.isdigit() and 1 <= int(action) <= 4:
166
- action = ACTIONS[int(action) - 1]
167
- else:
168
- # random choose one from the pool
169
- action = random.choice(ACTIONS[:-1])
170
- else:
171
- action = random.choice(ACTIONS[:-1])
172
-
173
- img, score = game.step(action)
174
- img = img[0]
175
- return img, f"Score: {score}"
176
-
177
- def reset_game():
178
- game.reset()
179
- return game.render()[0], "Score: 0"
180
-
181
- MARKDOWN = """
182
- <div align="center">
183
- <img src="./assets/images/logo.png" alt="Magma Logo" style="margin-right: 5px; height: 80px;margin-top: -10px;">
184
- <h2>Magma: A Foundation Model for Multimodal AI Agents</h2>
185
-
186
- \[[arXiv Paper](https://www.arxiv.org/pdf/2502.13130)\] &nbsp; \[[Project Page](https://microsoft.github.io/Magma/)\] &nbsp; \[[Github Repo](https://github.com/microsoft/Magma)\] &nbsp; \[[Hugging Face Model](https://huggingface.co/microsoft/Magma-8B)\] &nbsp;
187
-
188
- This demo is powered by [Gradio](https://gradio.app/).
189
-
190
- <b>Goal: Collects the green blocks by automatically moving up, down, left and right.</b>
191
-
192
- </div>
193
- """
194
-
195
- with gr.Blocks() as interface:
196
- gr.Markdown(MARKDOWN)
197
- with gr.Row():
198
- image_output = gr.Image(label="Game Screen")
199
- with gr.Column():
200
- score_output = gr.Text(label="Score", elem_classes="large-text")
201
- gr.HTML("""
202
- <style>
203
- .large-text textarea {
204
- font-size: 24px !important;
205
- }
206
- </style>
207
- """)
208
- start_btn = gr.Button("Start/Reset Game")
209
-
210
- interface.load(fn=play_game, every=1, inputs=[], outputs=[image_output, score_output])
211
- start_btn.click(fn=reset_game, inputs=[], outputs=[image_output, score_output])
212
-
213
- interface.launch()
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
vlms/llavanext.py DELETED
@@ -1,43 +0,0 @@
1
- from transformers import LlavaNextProcessor, LlavaNextForConditionalGeneration
2
- import torch
3
- import torch.nn as nn
4
- from PIL import Image
5
- import requests
6
-
7
- class LLaVANextAgent(nn.Module):
8
- def __init__(self, device="cuda", dtype=torch.float16):
9
- super().__init__()
10
-
11
- self.processor = LlavaNextProcessor.from_pretrained("llava-hf/llava-v1.6-mistral-7b-hf")
12
- self.model = LlavaNextForConditionalGeneration.from_pretrained("llava-hf/llava-v1.6-mistral-7b-hf", torch_dtype=dtype, low_cpu_mem_usage=True)
13
- self.dtype = dtype
14
- self.device = device
15
-
16
- self.model.to(device)
17
-
18
- self.generation_args = {
19
- "max_new_tokens": 10,
20
- "temperature": 0.3,
21
- "do_sample": True,
22
- "use_cache": True,
23
- "num_beams": 1,
24
- }
25
-
26
- def generate_response(self, image, question):
27
- conversation = [
28
- {
29
- "role": "user",
30
- "content": [
31
- {"type": "text", "text": question},
32
- {"type": "image"},
33
- ],
34
- },
35
- ]
36
- prompt = self.processor.apply_chat_template(conversation, add_generation_prompt=True)
37
- inputs = self.processor(images=image, text=prompt, return_tensors="pt").to(self.device)
38
- # autoregressively complete prompt
39
- self.model.generation_config.pad_token_id = self.processor.tokenizer.pad_token_id
40
- with torch.inference_mode():
41
- output = self.model.generate(**inputs, **self.generation_args)
42
- output = output[:, inputs["input_ids"].shape[-1] :]
43
- return self.processor.decode(output[0], skip_special_tokens=True).strip()
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
vlms/llavaov.py DELETED
@@ -1,44 +0,0 @@
1
- from transformers import AutoProcessor, LlavaOnevisionForConditionalGeneration
2
- import torch
3
- import torch.nn as nn
4
- from PIL import Image
5
- import requests
6
-
7
- class LLaVAOVAgent(nn.Module):
8
- model_id = "llava-hf/llava-onevision-qwen2-7b-ov-hf"
9
- def __init__(self, device="cuda", dtype=torch.float16):
10
- super().__init__()
11
-
12
- self.processor = AutoProcessor.from_pretrained(self.model_id)
13
- self.model = LlavaOnevisionForConditionalGeneration.from_pretrained(self.model_id, torch_dtype=dtype, low_cpu_mem_usage=True)
14
- self.dtype = dtype
15
- self.device = device
16
-
17
- self.model.to(device)
18
-
19
- self.generation_args = {
20
- "max_new_tokens": 10,
21
- "temperature": 0.3,
22
- "do_sample": True,
23
- "use_cache": True,
24
- "num_beams": 1,
25
- }
26
-
27
- def generate_response(self, image, question):
28
- conversation = [
29
- {
30
- "role": "user",
31
- "content": [
32
- {"type": "text", "text": question},
33
- {"type": "image"},
34
- ],
35
- },
36
- ]
37
- prompt = self.processor.apply_chat_template(conversation, add_generation_prompt=True)
38
- inputs = self.processor(images=image, text=prompt, return_tensors="pt").to(self.device)
39
- # autoregressively complete prompt
40
- self.model.generation_config.pad_token_id = self.processor.tokenizer.pad_token_id
41
- with torch.inference_mode():
42
- output = self.model.generate(**inputs, **self.generation_args)
43
- output = output[:, inputs["input_ids"].shape[-1] :]
44
- return self.processor.decode(output[0], skip_special_tokens=True).strip()
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
vlms/magma.py DELETED
@@ -1,40 +0,0 @@
1
- from transformers import AutoModelForCausalLM, AutoProcessor
2
- import torch
3
- import torch.nn as nn
4
- from PIL import Image
5
- import requests
6
-
7
- model_id = "microsoft/Magma-8B"
8
- class MagmaAgent(nn.Module):
9
- def __init__(self, device="cuda", dtype=torch.float16):
10
- super().__init__()
11
-
12
- self.model = AutoModelForCausalLM.from_pretrained(model_id, trust_remote_code=True, torch_dtype=dtype, low_cpu_mem_usage=True)
13
- self.processor = AutoProcessor.from_pretrained(model_id, trust_remote_code=True)
14
- self.dtype = dtype
15
- self.device = device
16
- self.model.to(device)
17
-
18
- self.generation_args = {
19
- "max_new_tokens": 10,
20
- "temperature": 0.3,
21
- "do_sample": True,
22
- "use_cache": True,
23
- "num_beams": 1,
24
- }
25
-
26
- def generate_response(self, image, question):
27
- convs = [
28
- {"role": "system", "content": "You are an agent that can see, talk, and act."},
29
- {"role": "user", "content": "<image_start><image><image_end>\n{}".format(question)},
30
- ]
31
- prompt = self.processor.tokenizer.apply_chat_template(convs, tokenize=False, add_generation_prompt=True)
32
- inputs = self.processor(images=[image], texts=prompt, return_tensors="pt").to(self.dtype).to(self.device)
33
- inputs['pixel_values'] = inputs['pixel_values'].unsqueeze(0)
34
- inputs['image_sizes'] = inputs['image_sizes'].unsqueeze(0)
35
-
36
- with torch.inference_mode():
37
- generate_ids = self.model.generate(**inputs, **self.generation_args)
38
- generate_ids = generate_ids[:, inputs["input_ids"].shape[-1] :]
39
- action = self.processor.decode(generate_ids[0], skip_special_tokens=True).strip()
40
- return action
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
vlms/qwen2vl.py DELETED
@@ -1,59 +0,0 @@
1
- from transformers import Qwen2VLForConditionalGeneration, AutoTokenizer, AutoProcessor
2
- from qwen_vl_utils import process_vision_info
3
-
4
- import torch
5
- import torch.nn as nn
6
- from PIL import Image
7
- import requests
8
-
9
- class Qwen2VLAgent(nn.Module):
10
- model_id = "Qwen/Qwen2-VL-7B-Instruct"
11
- def __init__(self, device="cuda", dtype=torch.float16):
12
- super().__init__()
13
-
14
- self.processor = AutoProcessor.from_pretrained(self.model_id)
15
- self.model = Qwen2VLForConditionalGeneration.from_pretrained(self.model_id, torch_dtype=dtype, low_cpu_mem_usage=True)
16
- self.dtype = dtype
17
- self.device = device
18
-
19
- self.model.to(device)
20
-
21
- self.generation_args = {
22
- "max_new_tokens": 10,
23
- "temperature": 0.3,
24
- "do_sample": True,
25
- "use_cache": True,
26
- "num_beams": 1,
27
- }
28
-
29
- def generate_response(self, image, question):
30
- image.save('qwen25vl.png')
31
- conversation = [
32
- {
33
- "role": "user",
34
- "content": [
35
- {"type": "text", "text": question},
36
- {"type": "image", "image": "qwen25vl.png"},
37
- ],
38
- },
39
- ]
40
-
41
- # Preparation for inference
42
- text = self.processor.apply_chat_template(
43
- conversation, tokenize=False, add_generation_prompt=True
44
- )
45
- image_inputs, video_inputs = process_vision_info(conversation)
46
- inputs = self.processor(
47
- text=[text],
48
- images=image_inputs,
49
- videos=video_inputs,
50
- padding=True,
51
- return_tensors="pt",
52
- ).to(self.device)
53
-
54
- # autoregressively complete prompt
55
- self.model.generation_config.pad_token_id = self.processor.tokenizer.pad_token_id
56
- with torch.inference_mode():
57
- output = self.model.generate(**inputs, **self.generation_args)
58
- output = output[:, inputs["input_ids"].shape[-1] :]
59
- return self.processor.decode(output[0], skip_special_tokens=True).strip()