|
from os.path import join as pjoin |
|
import torch |
|
from torch.utils import data |
|
import numpy as np |
|
from tqdm import tqdm |
|
from torch.utils.data._utils.collate import default_collate |
|
import random |
|
import codecs as cs |
|
|
|
|
|
def collate_fn(batch): |
|
batch.sort(key=lambda x: x[3], reverse=True) |
|
return default_collate(batch) |
|
|
|
class MotionDataset(data.Dataset): |
|
def __init__(self, opt, mean, std, split_file): |
|
self.opt = opt |
|
joints_num = opt.joints_num |
|
|
|
self.data = [] |
|
self.lengths = [] |
|
id_list = [] |
|
with open(split_file, 'r') as f: |
|
for line in f.readlines(): |
|
id_list.append(line.strip()) |
|
|
|
for name in tqdm(id_list): |
|
try: |
|
motion = np.load(pjoin(opt.motion_dir, name + '.npy')) |
|
if motion.shape[0] < opt.window_size: |
|
continue |
|
self.lengths.append(motion.shape[0] - opt.window_size) |
|
self.data.append(motion) |
|
except Exception as e: |
|
|
|
print(e) |
|
pass |
|
|
|
self.cumsum = np.cumsum([0] + self.lengths) |
|
|
|
if opt.is_train: |
|
|
|
std[0:1] = std[0:1] / opt.feat_bias |
|
|
|
std[1:3] = std[1:3] / opt.feat_bias |
|
|
|
std[3:4] = std[3:4] / opt.feat_bias |
|
|
|
std[4: 4 + (joints_num - 1) * 3] = std[4: 4 + (joints_num - 1) * 3] / 1.0 |
|
|
|
std[4 + (joints_num - 1) * 3: 4 + (joints_num - 1) * 9] = std[4 + (joints_num - 1) * 3: 4 + ( |
|
joints_num - 1) * 9] / 1.0 |
|
|
|
std[4 + (joints_num - 1) * 9: 4 + (joints_num - 1) * 9 + joints_num * 3] = std[ |
|
4 + (joints_num - 1) * 9: 4 + ( |
|
joints_num - 1) * 9 + joints_num * 3] / 1.0 |
|
|
|
std[4 + (joints_num - 1) * 9 + joints_num * 3:] = std[ |
|
4 + ( |
|
joints_num - 1) * 9 + joints_num * 3:] / opt.feat_bias |
|
|
|
assert 4 + (joints_num - 1) * 9 + joints_num * 3 + 4 == mean.shape[-1] |
|
np.save(pjoin(opt.meta_dir, 'mean.npy'), mean) |
|
np.save(pjoin(opt.meta_dir, 'std.npy'), std) |
|
|
|
self.mean = mean |
|
self.std = std |
|
print("Total number of motions {}, snippets {}".format(len(self.data), self.cumsum[-1])) |
|
|
|
def inv_transform(self, data): |
|
return data * self.std + self.mean |
|
|
|
def __len__(self): |
|
return self.cumsum[-1] |
|
|
|
def __getitem__(self, item): |
|
if item != 0: |
|
motion_id = np.searchsorted(self.cumsum, item) - 1 |
|
idx = item - self.cumsum[motion_id] - 1 |
|
else: |
|
motion_id = 0 |
|
idx = 0 |
|
motion = self.data[motion_id][idx:idx + self.opt.window_size] |
|
"Z Normalization" |
|
motion = (motion - self.mean) / self.std |
|
|
|
return motion |
|
|
|
|
|
class Text2MotionDatasetEval(data.Dataset): |
|
def __init__(self, opt, mean, std, split_file, w_vectorizer): |
|
self.opt = opt |
|
self.w_vectorizer = w_vectorizer |
|
self.max_length = 20 |
|
self.pointer = 0 |
|
self.max_motion_length = opt.max_motion_length |
|
min_motion_len = 40 if self.opt.dataset_name =='t2m' else 24 |
|
|
|
data_dict = {} |
|
id_list = [] |
|
with cs.open(split_file, 'r') as f: |
|
for line in f.readlines(): |
|
id_list.append(line.strip()) |
|
|
|
|
|
new_name_list = [] |
|
length_list = [] |
|
for name in tqdm(id_list): |
|
try: |
|
motion = np.load(pjoin(opt.motion_dir, name + '.npy')) |
|
if (len(motion)) < min_motion_len or (len(motion) >= 200): |
|
continue |
|
text_data = [] |
|
flag = False |
|
with cs.open(pjoin(opt.text_dir, name + '.txt')) as f: |
|
for line in f.readlines(): |
|
text_dict = {} |
|
line_split = line.strip().split('#') |
|
caption = line_split[0] |
|
tokens = line_split[1].split(' ') |
|
f_tag = float(line_split[2]) |
|
to_tag = float(line_split[3]) |
|
f_tag = 0.0 if np.isnan(f_tag) else f_tag |
|
to_tag = 0.0 if np.isnan(to_tag) else to_tag |
|
|
|
text_dict['caption'] = caption |
|
text_dict['tokens'] = tokens |
|
if f_tag == 0.0 and to_tag == 0.0: |
|
flag = True |
|
text_data.append(text_dict) |
|
else: |
|
try: |
|
n_motion = motion[int(f_tag*20) : int(to_tag*20)] |
|
if (len(n_motion)) < min_motion_len or (len(n_motion) >= 200): |
|
continue |
|
new_name = random.choice('ABCDEFGHIJKLMNOPQRSTUVW') + '_' + name |
|
while new_name in data_dict: |
|
new_name = random.choice('ABCDEFGHIJKLMNOPQRSTUVW') + '_' + name |
|
data_dict[new_name] = {'motion': n_motion, |
|
'length': len(n_motion), |
|
'text':[text_dict]} |
|
new_name_list.append(new_name) |
|
length_list.append(len(n_motion)) |
|
except: |
|
print(line_split) |
|
print(line_split[2], line_split[3], f_tag, to_tag, name) |
|
|
|
|
|
if flag: |
|
data_dict[name] = {'motion': motion, |
|
'length': len(motion), |
|
'text': text_data} |
|
new_name_list.append(name) |
|
length_list.append(len(motion)) |
|
except: |
|
pass |
|
|
|
name_list, length_list = zip(*sorted(zip(new_name_list, length_list), key=lambda x: x[1])) |
|
|
|
self.mean = mean |
|
self.std = std |
|
self.length_arr = np.array(length_list) |
|
self.data_dict = data_dict |
|
self.name_list = name_list |
|
self.reset_max_len(self.max_length) |
|
|
|
def reset_max_len(self, length): |
|
assert length <= self.max_motion_length |
|
self.pointer = np.searchsorted(self.length_arr, length) |
|
print("Pointer Pointing at %d"%self.pointer) |
|
self.max_length = length |
|
|
|
def inv_transform(self, data): |
|
return data * self.std + self.mean |
|
|
|
def __len__(self): |
|
return len(self.data_dict) - self.pointer |
|
|
|
def __getitem__(self, item): |
|
idx = self.pointer + item |
|
data = self.data_dict[self.name_list[idx]] |
|
motion, m_length, text_list = data['motion'], data['length'], data['text'] |
|
|
|
text_data = random.choice(text_list) |
|
caption, tokens = text_data['caption'], text_data['tokens'] |
|
|
|
if len(tokens) < self.opt.max_text_len: |
|
|
|
tokens = ['sos/OTHER'] + tokens + ['eos/OTHER'] |
|
sent_len = len(tokens) |
|
tokens = tokens + ['unk/OTHER'] * (self.opt.max_text_len + 2 - sent_len) |
|
else: |
|
|
|
tokens = tokens[:self.opt.max_text_len] |
|
tokens = ['sos/OTHER'] + tokens + ['eos/OTHER'] |
|
sent_len = len(tokens) |
|
pos_one_hots = [] |
|
word_embeddings = [] |
|
for token in tokens: |
|
word_emb, pos_oh = self.w_vectorizer[token] |
|
pos_one_hots.append(pos_oh[None, :]) |
|
word_embeddings.append(word_emb[None, :]) |
|
pos_one_hots = np.concatenate(pos_one_hots, axis=0) |
|
word_embeddings = np.concatenate(word_embeddings, axis=0) |
|
|
|
if self.opt.unit_length < 10: |
|
coin2 = np.random.choice(['single', 'single', 'double']) |
|
else: |
|
coin2 = 'single' |
|
|
|
if coin2 == 'double': |
|
m_length = (m_length // self.opt.unit_length - 1) * self.opt.unit_length |
|
elif coin2 == 'single': |
|
m_length = (m_length // self.opt.unit_length) * self.opt.unit_length |
|
idx = random.randint(0, len(motion) - m_length) |
|
motion = motion[idx:idx+m_length] |
|
|
|
"Z Normalization" |
|
motion = (motion - self.mean) / self.std |
|
|
|
if m_length < self.max_motion_length: |
|
motion = np.concatenate([motion, |
|
np.zeros((self.max_motion_length - m_length, motion.shape[1])) |
|
], axis=0) |
|
|
|
|
|
return word_embeddings, pos_one_hots, caption, sent_len, motion, m_length, '_'.join(tokens) |
|
|
|
|
|
class Text2MotionDataset(data.Dataset): |
|
def __init__(self, opt, mean, std, split_file): |
|
self.opt = opt |
|
self.max_length = 20 |
|
self.pointer = 0 |
|
self.max_motion_length = opt.max_motion_length |
|
min_motion_len = 40 if self.opt.dataset_name =='t2m' else 24 |
|
|
|
data_dict = {} |
|
id_list = [] |
|
with cs.open(split_file, 'r') as f: |
|
for line in f.readlines(): |
|
id_list.append(line.strip()) |
|
|
|
|
|
new_name_list = [] |
|
length_list = [] |
|
for name in tqdm(id_list): |
|
try: |
|
motion = np.load(pjoin(opt.motion_dir, name + '.npy')) |
|
if (len(motion)) < min_motion_len or (len(motion) >= 200): |
|
continue |
|
text_data = [] |
|
flag = False |
|
with cs.open(pjoin(opt.text_dir, name + '.txt')) as f: |
|
for line in f.readlines(): |
|
text_dict = {} |
|
line_split = line.strip().split('#') |
|
|
|
caption = line_split[0] |
|
tokens = line_split[1].split(' ') |
|
f_tag = float(line_split[2]) |
|
to_tag = float(line_split[3]) |
|
f_tag = 0.0 if np.isnan(f_tag) else f_tag |
|
to_tag = 0.0 if np.isnan(to_tag) else to_tag |
|
|
|
text_dict['caption'] = caption |
|
text_dict['tokens'] = tokens |
|
if f_tag == 0.0 and to_tag == 0.0: |
|
flag = True |
|
text_data.append(text_dict) |
|
else: |
|
try: |
|
n_motion = motion[int(f_tag*20) : int(to_tag*20)] |
|
if (len(n_motion)) < min_motion_len or (len(n_motion) >= 200): |
|
continue |
|
new_name = random.choice('ABCDEFGHIJKLMNOPQRSTUVW') + '_' + name |
|
while new_name in data_dict: |
|
new_name = random.choice('ABCDEFGHIJKLMNOPQRSTUVW') + '_' + name |
|
data_dict[new_name] = {'motion': n_motion, |
|
'length': len(n_motion), |
|
'text':[text_dict]} |
|
new_name_list.append(new_name) |
|
length_list.append(len(n_motion)) |
|
except: |
|
print(line_split) |
|
print(line_split[2], line_split[3], f_tag, to_tag, name) |
|
|
|
|
|
if flag: |
|
data_dict[name] = {'motion': motion, |
|
'length': len(motion), |
|
'text': text_data} |
|
new_name_list.append(name) |
|
length_list.append(len(motion)) |
|
except Exception as e: |
|
|
|
pass |
|
|
|
|
|
name_list, length_list = new_name_list, length_list |
|
|
|
self.mean = mean |
|
self.std = std |
|
self.length_arr = np.array(length_list) |
|
self.data_dict = data_dict |
|
self.name_list = name_list |
|
|
|
def inv_transform(self, data): |
|
return data * self.std + self.mean |
|
|
|
def __len__(self): |
|
return len(self.data_dict) - self.pointer |
|
|
|
def __getitem__(self, item): |
|
idx = self.pointer + item |
|
data = self.data_dict[self.name_list[idx]] |
|
motion, m_length, text_list = data['motion'], data['length'], data['text'] |
|
|
|
text_data = random.choice(text_list) |
|
caption, tokens = text_data['caption'], text_data['tokens'] |
|
|
|
if self.opt.unit_length < 10: |
|
coin2 = np.random.choice(['single', 'single', 'double']) |
|
else: |
|
coin2 = 'single' |
|
|
|
if coin2 == 'double': |
|
m_length = (m_length // self.opt.unit_length - 1) * self.opt.unit_length |
|
elif coin2 == 'single': |
|
m_length = (m_length // self.opt.unit_length) * self.opt.unit_length |
|
idx = random.randint(0, len(motion) - m_length) |
|
motion = motion[idx:idx+m_length] |
|
|
|
"Z Normalization" |
|
motion = (motion - self.mean) / self.std |
|
|
|
if m_length < self.max_motion_length: |
|
motion = np.concatenate([motion, |
|
np.zeros((self.max_motion_length - m_length, motion.shape[1])) |
|
], axis=0) |
|
|
|
|
|
return caption, motion, m_length |
|
|
|
def reset_min_len(self, length): |
|
assert length <= self.max_motion_length |
|
self.pointer = np.searchsorted(self.length_arr, length) |
|
print("Pointer Pointing at %d" % self.pointer) |