Spaces:
Runtime error
Runtime error
File size: 18,565 Bytes
04fbff5 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 |
import os
import json
import numpy as np
import logging
import subprocess
import torch
from PIL import Image, ImageSequence
from decord import VideoReader, cpu
from torchvision import transforms
from torchvision.transforms import Compose, Resize, CenterCrop, ToTensor, Normalize, ToPILImage
try:
from torchvision.transforms import InterpolationMode
BICUBIC = InterpolationMode.BICUBIC
BILINEAR = InterpolationMode.BILINEAR
except ImportError:
BICUBIC = Image.BICUBIC
BILINEAR = Image.BILINEAR
CACHE_DIR = os.environ.get('VBENCH_CACHE_DIR')
if CACHE_DIR is None:
CACHE_DIR = os.path.join(os.path.expanduser('~'), '.cache', 'vbench')
logging.basicConfig(level = logging.INFO,format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
def clip_transform(n_px):
return Compose([
Resize(n_px, interpolation=BICUBIC),
CenterCrop(n_px),
transforms.Lambda(lambda x: x.float().div(255.0)),
Normalize((0.48145466, 0.4578275, 0.40821073), (0.26862954, 0.26130258, 0.27577711)),
])
def clip_transform_Image(n_px):
return Compose([
Resize(n_px, interpolation=BICUBIC),
CenterCrop(n_px),
ToTensor(),
Normalize((0.48145466, 0.4578275, 0.40821073), (0.26862954, 0.26130258, 0.27577711)),
])
def dino_transform(n_px):
return Compose([
Resize(size=n_px),
transforms.Lambda(lambda x: x.float().div(255.0)),
Normalize((0.485, 0.456, 0.406), (0.229, 0.224, 0.225))
])
def dino_transform_Image(n_px):
return Compose([
Resize(size=n_px),
ToTensor(),
Normalize((0.485, 0.456, 0.406), (0.229, 0.224, 0.225))
])
def tag2text_transform(n_px):
normalize = Normalize(mean=[0.485, 0.456, 0.406],
std=[0.229, 0.224, 0.225])
return Compose([ToPILImage(),Resize((n_px, n_px)),ToTensor(),normalize])
def get_frame_indices(num_frames, vlen, sample='rand', fix_start=None, input_fps=1, max_num_frames=-1):
if sample in ["rand", "middle"]: # uniform sampling
acc_samples = min(num_frames, vlen)
# split the video into `acc_samples` intervals, and sample from each interval.
intervals = np.linspace(start=0, stop=vlen, num=acc_samples + 1).astype(int)
ranges = []
for idx, interv in enumerate(intervals[:-1]):
ranges.append((interv, intervals[idx + 1] - 1))
if sample == 'rand':
try:
frame_indices = [random.choice(range(x[0], x[1])) for x in ranges]
except:
frame_indices = np.random.permutation(vlen)[:acc_samples]
frame_indices.sort()
frame_indices = list(frame_indices)
elif fix_start is not None:
frame_indices = [x[0] + fix_start for x in ranges]
elif sample == 'middle':
frame_indices = [(x[0] + x[1]) // 2 for x in ranges]
else:
raise NotImplementedError
if len(frame_indices) < num_frames: # padded with last frame
padded_frame_indices = [frame_indices[-1]] * num_frames
padded_frame_indices[:len(frame_indices)] = frame_indices
frame_indices = padded_frame_indices
elif "fps" in sample: # fps0.5, sequentially sample frames at 0.5 fps
output_fps = float(sample[3:])
duration = float(vlen) / input_fps
delta = 1 / output_fps # gap between frames, this is also the clip length each frame represents
frame_seconds = np.arange(0 + delta / 2, duration + delta / 2, delta)
frame_indices = np.around(frame_seconds * input_fps).astype(int)
frame_indices = [e for e in frame_indices if e < vlen]
if max_num_frames > 0 and len(frame_indices) > max_num_frames:
frame_indices = frame_indices[:max_num_frames]
# frame_indices = np.linspace(0 + delta / 2, duration + delta / 2, endpoint=False, num=max_num_frames)
else:
raise ValueError
return frame_indices
def load_video(video_path, data_transform=None, num_frames=None, return_tensor=True, width=None, height=None):
"""
Load a video from a given path and apply optional data transformations.
The function supports loading video in GIF (.gif), PNG (.png), and MP4 (.mp4) formats.
Depending on the format, it processes and extracts frames accordingly.
Parameters:
- video_path (str): The file path to the video or image to be loaded.
- data_transform (callable, optional): A function that applies transformations to the video data.
Returns:
- frames (torch.Tensor): A tensor containing the video frames with shape (T, C, H, W),
where T is the number of frames, C is the number of channels, H is the height, and W is the width.
Raises:
- NotImplementedError: If the video format is not supported.
The function first determines the format of the video file by its extension.
For GIFs, it iterates over each frame and converts them to RGB.
For PNGs, it reads the single frame, converts it to RGB.
For MP4s, it reads the frames using the VideoReader class and converts them to NumPy arrays.
If a data_transform is provided, it is applied to the buffer before converting it to a tensor.
Finally, the tensor is permuted to match the expected (T, C, H, W) format.
"""
if video_path.endswith('.gif'):
frame_ls = []
img = Image.open(video_path)
for frame in ImageSequence.Iterator(img):
frame = frame.convert('RGB')
frame = np.array(frame).astype(np.uint8)
frame_ls.append(frame)
buffer = np.array(frame_ls).astype(np.uint8)
elif video_path.endswith('.png'):
frame = Image.open(video_path)
frame = frame.convert('RGB')
frame = np.array(frame).astype(np.uint8)
frame_ls = [frame]
buffer = np.array(frame_ls)
elif video_path.endswith('.mp4'):
import decord
decord.bridge.set_bridge('native')
if width:
video_reader = VideoReader(video_path, width=width, height=height, num_threads=1)
else:
video_reader = VideoReader(video_path, num_threads=1)
frames = video_reader.get_batch(range(len(video_reader))) # (T, H, W, C), torch.uint8
buffer = frames.asnumpy().astype(np.uint8)
else:
raise NotImplementedError
frames = buffer
if num_frames:
frame_indices = get_frame_indices(
num_frames, len(frames), sample="middle"
)
frames = frames[frame_indices]
if data_transform:
frames = data_transform(frames)
elif return_tensor:
frames = torch.Tensor(frames)
frames = frames.permute(0, 3, 1, 2) # (T, C, H, W), torch.uint8
return frames
def read_frames_decord_by_fps(
video_path, sample_fps=2, sample='rand', fix_start=None,
max_num_frames=-1, trimmed30=False, num_frames=8
):
import decord
decord.bridge.set_bridge("torch")
video_reader = VideoReader(video_path, num_threads=1)
vlen = len(video_reader)
fps = video_reader.get_avg_fps()
duration = vlen / float(fps)
if trimmed30 and duration > 30:
duration = 30
vlen = int(30 * float(fps))
frame_indices = get_frame_indices(
num_frames, vlen, sample=sample, fix_start=fix_start,
input_fps=fps, max_num_frames=max_num_frames
)
frames = video_reader.get_batch(frame_indices) # (T, H, W, C), torch.uint8
frames = frames.permute(0, 3, 1, 2) # (T, C, H, W), torch.uint8
return frames
def load_dimension_info(json_dir, dimension, lang):
"""
Load video list and prompt information based on a specified dimension and language from a JSON file.
Parameters:
- json_dir (str): The directory path where the JSON file is located.
- dimension (str): The dimension for evaluation to filter the video prompts.
- lang (str): The language key used to retrieve the appropriate prompt text.
Returns:
- video_list (list): A list of video file paths that match the specified dimension.
- prompt_dict_ls (list): A list of dictionaries, each containing a prompt and its corresponding video list.
The function reads the JSON file to extract video information. It filters the prompts based on the specified
dimension and compiles a list of video paths and associated prompts in the specified language.
Notes:
- The JSON file is expected to contain a list of dictionaries with keys 'dimension', 'video_list', and language-based prompts.
- The function assumes that the 'video_list' key in the JSON can either be a list or a single string value.
"""
video_list = []
prompt_dict_ls = []
full_prompt_list = load_json(json_dir)
for prompt_dict in full_prompt_list:
if dimension in prompt_dict['dimension'] and 'video_list' in prompt_dict:
prompt = prompt_dict[f'prompt_{lang}']
cur_video_list = prompt_dict['video_list'] if isinstance(prompt_dict['video_list'], list) else [prompt_dict['video_list']]
video_list += cur_video_list
if 'auxiliary_info' in prompt_dict and dimension in prompt_dict['auxiliary_info']:
prompt_dict_ls += [{'prompt': prompt, 'video_list': cur_video_list, 'auxiliary_info': prompt_dict['auxiliary_info'][dimension]}]
else:
prompt_dict_ls += [{'prompt': prompt, 'video_list': cur_video_list}]
return video_list, prompt_dict_ls
def init_submodules(dimension_list, local=False, read_frame=False):
submodules_dict = {}
if local:
logger.info("\x1b[32m[Local Mode]\x1b[0m Working in local mode, please make sure that the pre-trained model has been fully downloaded.")
for dimension in dimension_list:
os.makedirs(CACHE_DIR, exist_ok=True)
if dimension == 'background_consistency':
# read_frame = False
if local:
vit_b_path = f'{CACHE_DIR}/clip_model/ViT-B-32.pt'
if not os.path.isfile(vit_b_path):
wget_command = ['wget', 'https://openaipublic.azureedge.net/clip/models/40d365715913c9da98579312b702a82c18be219cc2a73407c4526f58eba950af/ViT-B-32.pt', '-P', os.path.dirname(vit_b_path)]
subprocess.run(wget_command, check=True)
else:
vit_b_path = 'ViT-B/32'
submodules_dict[dimension] = [vit_b_path, read_frame]
elif dimension == 'human_action':
umt_path = f'{CACHE_DIR}/umt_model/l16_ptk710_ftk710_ftk400_f16_res224.pth'
if not os.path.isfile(umt_path):
wget_command = ['wget', 'https://pjlab-gvm-data.oss-cn-shanghai.aliyuncs.com/umt/single_modality/l16_ptk710_ftk710_ftk400_f16_res224.pth', '-P', os.path.dirname(umt_path)]
subprocess.run(wget_command, check=True)
submodules_dict[dimension] = [umt_path,]
elif dimension == 'temporal_flickering':
submodules_dict[dimension] = []
elif dimension == 'motion_smoothness':
CUR_DIR = os.path.dirname(os.path.abspath(__file__))
submodules_dict[dimension] = {
'config': f'{CUR_DIR}/third_party/amt/cfgs/AMT-S.yaml',
'ckpt': f'{CACHE_DIR}/amt_model/amt-s.pth'
}
details = submodules_dict[dimension]
# Check if the file exists, if not, download it with wget
if not os.path.isfile(details['ckpt']):
print(f"File {details['ckpt']} does not exist. Downloading...")
wget_command = ['wget', '-P', os.path.dirname(details['ckpt']),
'https://huggingface.co./lalala125/AMT/resolve/main/amt-s.pth']
subprocess.run(wget_command, check=True)
elif dimension == 'dynamic_degree':
submodules_dict[dimension] = {
'model': f'{CACHE_DIR}/raft_model/models/raft-things.pth'
}
details = submodules_dict[dimension]
if not os.path.isfile(details['model']):
# raise NotImplementedError
print(f"File {details['model']} does not exist. Downloading...")
wget_command = ['wget', '-P', f'{CACHE_DIR}/raft_model/', 'https://dl.dropboxusercontent.com/s/4j4z58wuv8o0mfz/models.zip']
unzip_command = ['unzip', '-d', f'{CACHE_DIR}/raft_model/', f'{CACHE_DIR}/raft_model/models.zip']
remove_command = ['rm', '-r', f'{CACHE_DIR}/raft_model/models.zip']
try:
subprocess.run(wget_command, check=True)
subprocess.run(unzip_command, check=True)
subprocess.run(remove_command, check=True)
except subprocess.CalledProcessError as err:
print(f"Error during downloading RAFT model: {err}")
# Assign the DINO model path for subject consistency dimension
elif dimension == 'subject_consistency':
if local:
submodules_dict[dimension] = {
'repo_or_dir': f'{CACHE_DIR}/dino_model/facebookresearch_dino_main/',
'path': f'{CACHE_DIR}/dino_model/dino_vitbase16_pretrain.pth',
'model': 'dino_vitb16',
'source': 'local',
'read_frame': read_frame
}
details = submodules_dict[dimension]
# Check if the file exists, if not, download it with wget
if not os.path.isdir(details['repo_or_dir']):
print(f"Directory {details['repo_or_dir']} does not exist. Cloning repository...")
subprocess.run(['git', 'clone', 'https://github.com/facebookresearch/dino', details['repo_or_dir']], check=True)
if not os.path.isfile(details['path']):
print(f"File {details['path']} does not exist. Downloading...")
wget_command = ['wget', '-P', os.path.dirname(details['path']),
'https://dl.fbaipublicfiles.com/dino/dino_vitbase16_pretrain/dino_vitbase16_pretrain.pth']
subprocess.run(wget_command, check=True)
else:
submodules_dict[dimension] = {
'repo_or_dir':'facebookresearch/dino:main',
'source':'github',
'model': 'dino_vitb16',
'read_frame': read_frame
}
elif dimension == 'aesthetic_quality':
aes_path = f'{CACHE_DIR}/aesthetic_model/emb_reader'
if local:
vit_l_path = f'{CACHE_DIR}/clip_model/ViT-L-14.pt'
if not os.path.isfile(vit_l_path):
wget_command = ['wget' ,'https://openaipublic.azureedge.net/clip/models/b8cca3fd41ae0c99ba7e8951adf17d267cdb84cd88be6f7c2e0eca1737a03836/ViT-L-14.pt', '-P', os.path.dirname(vit_l_path)]
subprocess.run(wget_command, check=True)
else:
vit_l_path = 'ViT-L/14'
submodules_dict[dimension] = [vit_l_path, aes_path]
elif dimension == 'imaging_quality':
musiq_spaq_path = f'{CACHE_DIR}/pyiqa_model/musiq_spaq_ckpt-358bb6af.pth'
if not os.path.isfile(musiq_spaq_path):
wget_command = ['wget', 'https://github.com/chaofengc/IQA-PyTorch/releases/download/v0.1-weights/musiq_spaq_ckpt-358bb6af.pth', '-P', os.path.dirname(musiq_spaq_path)]
subprocess.run(wget_command, check=True)
submodules_dict[dimension] = {'model_path': musiq_spaq_path}
elif dimension in ["object_class", "multiple_objects", "color", "spatial_relationship" ]:
submodules_dict[dimension] = {
"model_weight": f'{CACHE_DIR}/grit_model/grit_b_densecap_objectdet.pth'
}
if not os.path.exists(submodules_dict[dimension]['model_weight']):
wget_command = ['wget', 'https://datarelease.blob.core.windows.net/grit/models/grit_b_densecap_objectdet.pth', '-P', os.path.dirname(submodules_dict[dimension]["model_weight"])]
subprocess.run(wget_command, check=True)
elif dimension == 'scene':
submodules_dict[dimension] = {
"pretrained": f'{CACHE_DIR}/caption_model/tag2text_swin_14m.pth',
"image_size":384,
"vit":"swin_b"
}
if not os.path.exists(submodules_dict[dimension]['pretrained']):
wget_command = ['wget', 'https://huggingface.co./spaces/xinyu1205/recognize-anything/resolve/main/tag2text_swin_14m.pth', '-P', os.path.dirname(submodules_dict[dimension]["pretrained"])]
subprocess.run(wget_command, check=True)
elif dimension == 'appearance_style':
if local:
submodules_dict[dimension] = {"name": f'{CACHE_DIR}/clip_model/ViT-B-32.pt'}
if not os.path.isfile(submodules_dict[dimension]["name"]):
wget_command = ['wget', 'https://openaipublic.azureedge.net/clip/models/40d365715913c9da98579312b702a82c18be219cc2a73407c4526f58eba950af/ViT-B-32.pt', '-P', os.path.dirname(submodules_dict[dimension]["name"])]
subprocess.run(wget_command, check=True)
else:
submodules_dict[dimension] = {"name": 'ViT-B/32'}
elif dimension in ["temporal_style", "overall_consistency"]:
submodules_dict[dimension] = {
"pretrain": f'{CACHE_DIR}/ViCLIP/ViClip-InternVid-10M-FLT.pth',
}
if not os.path.exists(submodules_dict[dimension]['pretrain']):
wget_command = ['wget', 'https://pjlab-gvm-data.oss-cn-shanghai.aliyuncs.com/internvideo/viclip/ViClip-InternVid-10M-FLT.pth', '-P', os.path.dirname(submodules_dict[dimension]["pretrain"])]
subprocess.run(wget_command, check=True)
return submodules_dict
def save_json(data, path, indent=4):
with open(path, 'w', encoding='utf-8') as f:
json.dump(data, f, indent=indent)
def load_json(path):
"""
Load a JSON file from the given file path.
Parameters:
- file_path (str): The path to the JSON file.
Returns:
- data (dict or list): The data loaded from the JSON file, which could be a dictionary or a list.
"""
with open(path, 'r', encoding='utf-8') as f:
return json.load(f)
|