|
import rich |
|
import random |
|
import pickle |
|
import os |
|
import numpy as np |
|
import codecs as cs |
|
from torch.utils import data |
|
from os.path import join as pjoin |
|
from rich.progress import track |
|
import json |
|
import spacy |
|
|
|
class Text2MotionDatasetCB(data.Dataset): |
|
def __init__( |
|
self, |
|
data_root, |
|
split, |
|
mean, |
|
std, |
|
max_motion_length=196, |
|
min_motion_length=20, |
|
unit_length=4, |
|
fps=20, |
|
tmpFile=True, |
|
tiny=False, |
|
debug=False, |
|
stage='lm_pretrain', |
|
code_path='VQVAE', |
|
task_path=None, |
|
std_text=False, |
|
**kwargs, |
|
): |
|
self.tiny = tiny |
|
self.unit_length = unit_length |
|
|
|
|
|
self.mean = mean |
|
self.std = std |
|
|
|
|
|
split = 'train' |
|
split_file = pjoin(data_root, split + '.txt') |
|
motion_dir = pjoin(data_root, code_path) |
|
text_dir = pjoin(data_root, 'texts') |
|
|
|
if task_path: |
|
instructions = task_path |
|
elif stage == 'lm_pretrain': |
|
instructions = pjoin(data_root, 'template_pretrain.json') |
|
elif stage in ['lm_instruct', "lm_rl"]: |
|
instructions = pjoin(data_root, 'template_instructions.json') |
|
else: |
|
raise NotImplementedError(f"stage {stage} not implemented") |
|
|
|
|
|
self.id_list = [] |
|
with cs.open(split_file, "r") as f: |
|
for line in f.readlines(): |
|
self.id_list.append(line.strip()) |
|
|
|
|
|
if tiny or debug: |
|
enumerator = enumerate(self.id_list) |
|
maxdata = 100 |
|
subset = '_tiny' |
|
else: |
|
enumerator = enumerate( |
|
track( |
|
self.id_list, |
|
f"Loading HumanML3D {split}", |
|
)) |
|
maxdata = 1e10 |
|
subset = '' |
|
|
|
new_name_list = [] |
|
data_dict = {} |
|
|
|
|
|
for i, name in enumerator: |
|
if len(new_name_list) > maxdata: |
|
break |
|
try: |
|
|
|
m_token_list = np.load(pjoin(motion_dir, f'{name}.npy')) |
|
|
|
with cs.open(pjoin(text_dir, name + '.txt')) as f: |
|
text_data = [] |
|
flag = False |
|
lines = f.readlines() |
|
|
|
for line in lines: |
|
try: |
|
text_dict = {} |
|
line_split = line.strip().split('#') |
|
caption = line_split[0] |
|
t_tokens = line_split[1].split(' ') |
|
f_tag = float(line_split[2]) |
|
to_tag = float(line_split[3]) |
|
f_tag = 0.0 if np.isnan(f_tag) else f_tag |
|
to_tag = 0.0 if np.isnan(to_tag) else to_tag |
|
|
|
text_dict['caption'] = caption |
|
text_dict['tokens'] = t_tokens |
|
if f_tag == 0.0 and to_tag == 0.0: |
|
flag = True |
|
text_data.append(text_dict) |
|
else: |
|
m_token_list_new = [ |
|
tokens[int(f_tag * fps / unit_length |
|
):int(to_tag * fps / |
|
unit_length)] |
|
for tokens in m_token_list |
|
if int(f_tag * fps / unit_length) < |
|
int(to_tag * fps / unit_length) |
|
] |
|
|
|
if len(m_token_list_new) == 0: |
|
continue |
|
new_name = '%s_%f_%f' % (name, f_tag, |
|
to_tag) |
|
|
|
data_dict[new_name] = { |
|
'm_token_list': m_token_list_new, |
|
'text': [text_dict] |
|
} |
|
new_name_list.append(new_name) |
|
except: |
|
pass |
|
|
|
if flag: |
|
data_dict[name] = { |
|
'm_token_list': m_token_list, |
|
'text': text_data |
|
} |
|
new_name_list.append(name) |
|
except: |
|
pass |
|
|
|
if tmpFile: |
|
os.makedirs(pjoin(data_root, 'tmp'), exist_ok=True) |
|
with open( |
|
pjoin(data_root, |
|
f'tmp/{split}{subset}_tokens_data.pkl'), |
|
'wb') as file: |
|
pickle.dump(data_dict, file) |
|
with open( |
|
pjoin(data_root, |
|
f'tmp/{split}{subset}_tokens_index.pkl'), |
|
'wb') as file: |
|
pickle.dump(new_name_list, file) |
|
|
|
self.data_dict = data_dict |
|
self.name_list = new_name_list |
|
self.nlp = spacy.load('en_core_web_sm') |
|
self.std_text = std_text |
|
self.instructions = json.load(open(instructions, 'r')) |
|
self.tasks = [] |
|
for task in self.instructions.keys(): |
|
for subtask in self.instructions[task].keys(): |
|
self.tasks.append(self.instructions[task][subtask]) |
|
|
|
def __len__(self): |
|
return len(self.name_list) * len(self.tasks) |
|
|
|
def __getitem__(self, item): |
|
data_idx = item % len(self.name_list) |
|
task_idx = item // len(self.name_list) |
|
|
|
data = self.data_dict[self.name_list[data_idx]] |
|
m_token_list, text_list = data['m_token_list'], data['text'] |
|
|
|
m_tokens = random.choice(m_token_list) |
|
text_data = random.choice(text_list) |
|
caption = text_data['caption'] |
|
if self.std_text: |
|
doc = self.nlp(caption) |
|
word_list = [] |
|
pos_list = [] |
|
for token in doc: |
|
word = token.text |
|
if not word.isalpha(): |
|
continue |
|
if (token.pos_ == 'NOUN' |
|
or token.pos_ == 'VERB') and (word != 'left'): |
|
word_list.append(token.lemma_) |
|
else: |
|
word_list.append(word) |
|
pos_list.append(token.pos_) |
|
|
|
caption = ' '.join(word_list) |
|
|
|
all_captions = [ |
|
' '.join([token.split('/')[0] for token in text_dic['tokens']]) |
|
for text_dic in text_list |
|
] |
|
|
|
coin = np.random.choice([False, False, True]) |
|
|
|
if coin: |
|
|
|
coin2 = np.random.choice([True, False]) |
|
if coin2: |
|
m_tokens = m_tokens[:-1] |
|
else: |
|
m_tokens = m_tokens[1:] |
|
|
|
m_tokens_len = m_tokens.shape[0] |
|
|
|
tasks = self.tasks[task_idx] |
|
|
|
return caption, m_tokens, m_tokens_len, None, None, None, None, all_captions, tasks |
|
|