Spaces:
Runtime error
Runtime error
import numpy as np | |
import pandas as pd | |
from loading.loadpickledataset import LoadPickleDataSet | |
from preprocessing.augmentation.gaussiannoise import GaussianNoise | |
from preprocessing.augmentation.imurotation import IMURotation | |
from preprocessing.filter_imu import FilterIMU | |
from preprocessing.filter_opensim import FilterOpenSim | |
from preprocessing.remove_outlier import remove_outlier | |
from preprocessing.resample import Resample | |
from preprocessing.segmentation.fixwindowsegmentation import FixWindowSegmentation | |
from preprocessing.segmentation.gaitcyclesegmentation import GaitCycleSegmentation | |
from preprocessing.segmentation.zeropaddingsegmentation import ZeroPaddingSegmentation | |
class DataSet: | |
def __init__(self, config, load_dataset=True): | |
self.config = config | |
self.x = [] | |
self.y = [] | |
self.labels = [] | |
self.selected_trial_type = config['selected_trial_type'] | |
self.selected_activity_label = config['selected_activity_label'] | |
self.segmentation_method = config['segmentation_method'] | |
if self.config['gc_dataset']: | |
self.segmentation_method = 'zeropadding' | |
self.resample = config['resample'] | |
self.n_sample = len(self.y) | |
if load_dataset: | |
self.load_dataset() | |
self.train_subjects = config['train_subjects'] | |
self.test_subjects = config['test_subjects'] | |
self.train_activity = config['train_activity'] | |
self.test_activity = config['test_activity'] | |
# self.winsize = 128 | |
self.train_dataset = {} | |
self.test_dataset = {} | |
def load_dataset(self): | |
getdata_handler = LoadPickleDataSet(self.config) | |
x, y, labels = getdata_handler.run_get_dataset() | |
self.x, self.y, self.labels = self.run_activity_based_filter(x, y, labels) | |
self._preprocess() | |
def _preprocess(self): | |
self.x, self.y, self.labels = remove_outlier(self.x, self.y, self.labels) | |
if self.resample: | |
self.x, self.y, self.labels = self.run_resample_signal(self.x, self.y, self.labels) | |
if self.config['opensim_filter']: | |
filteropensim_handler = FilterOpenSim(self.y, lowcut=6, fs=100, order=2) | |
self.y = filteropensim_handler.run_lowpass_filter() | |
if self.config['imu_filter']: | |
filterimu_handler = FilterIMU(self.x, lowcut=10, fs=100, order=2) | |
self.x = filterimu_handler.run_lowpass_filter() | |
def run_resample_signal(self, x, y, labels): | |
resample_handler = Resample(x, y, labels, 200, 100) | |
x, y, labels = resample_handler._run_resample() | |
return x, y, labels | |
def run_segmentation(self, x, y, labels): | |
if self.segmentation_method == 'fixedwindow': | |
segmentation_handler = FixWindowSegmentation(x, y, labels, winsize=self.config['target_padding_length'], overlap=0.5, start_over=True) | |
self.x, self.y, self.labels = segmentation_handler._run_segmentation() | |
elif self.segmentation_method == 'zeropadding': | |
segmentation_handler = ZeroPaddingSegmentation(x, y, labels, target_padding_length=self.config['target_padding_length'], start_over=True) | |
self.x, self.y, self.labels = segmentation_handler._run_segmentation() | |
elif self.segmentation_method == 'gaitcycle': | |
segmentation_handler = GaitCycleSegmentation(x, y, labels, winsize=128, overlap=0.5, start_over=True) | |
self.x, self.y, self.labels = segmentation_handler._run_segmentation() | |
if self.config['opensim_filter']: | |
filteropensim_handler = FilterOpenSim(self.y, lowcut=6, fs=100, order=2) | |
self.y = filteropensim_handler.run_lowpass_filter() | |
if self.config['rotation']: | |
imu_rotation_handler = IMURotation(knom=10) | |
self.x, self.y, self.labels = imu_rotation_handler.run_rotation(self.x.copy(), self.y.copy(), self.labels.copy()) | |
if self.config['gaussian_noise']: | |
gaussian_noise_handler = GaussianNoise(0, .05) | |
self.x, self.y, self.labels = gaussian_noise_handler.run_add_noise(self.x, self.y, self.labels) | |
del x, y, labels | |
return self.x, self.y, self.labels | |
def run_activity_based_filter(self, x, y, label): | |
''' | |
:return: updated x, y, and labels which contains only the selected labels (activity section) | |
''' | |
updated_x = [] | |
update_y = [] | |
updated_label = [] | |
s = 0 | |
for ll, xx, yy, in zip(label, x, y): | |
# print(ll['subject'][0]) | |
# print(ll['trialNum'][0]) | |
if self.config['dataset_name']=='camargo' and ll['trialType'].isin(self.selected_trial_type).all() and self.selected_activity_label == ['all_idle']: | |
l_temp = ll[ll['trialType'].isin(self.selected_trial_type)] | |
l_temp_index = l_temp.index.values | |
xx_temp = xx[l_temp_index] | |
yy_temp = yy[l_temp_index] | |
updated_x.append(xx_temp) | |
update_y.append(yy_temp) | |
updated_label.append(l_temp) | |
elif self.config['dataset_name']=='camargo' and ll['trialType'].isin(self.selected_trial_type).all() and self.selected_activity_label == ['all']: | |
update_selected_activity_label = list(ll['Label'].unique()) | |
update_selected_activity_label = [i for i in update_selected_activity_label if i not in ['idle', 'stand']] | |
l_temp = ll[(ll['trialType'].isin(self.selected_trial_type)) & (ll['Label'].isin(update_selected_activity_label))] | |
l_temp_index = l_temp.index.values | |
xx_temp = xx[l_temp_index] | |
yy_temp = yy[l_temp_index] | |
updated_x.append(xx_temp) | |
update_y.append(yy_temp) | |
updated_label.append(l_temp) | |
elif self.config['dataset_name'] == 'camargo' and ll['trialType'].isin(self.selected_trial_type).all() and self.selected_activity_label == ['all_split']: | |
ll_temp = ll.copy() | |
ll_temp['trialType2'] =ll_temp['Label'] | |
if ll['trialType'][0] =='levelground': | |
# get the turn index if it's there | |
turn1_indx = ll_temp[ll_temp['Label'] == 'turn1'].index.values | |
turn2_indx = ll_temp[ll_temp['Label'] == 'turn2'].index.values | |
# check which turn is turn 1 | |
if turn1_indx[0]<turn2_indx[0]: | |
pass | |
else: | |
turn2_indx_temp = turn1_indx | |
turn1_indx = turn2_indx | |
turn2_indx = turn2_indx_temp | |
# devide into two segments | |
seg1 = ll_temp.iloc[0:turn1_indx[-1]+1] | |
seg2 = ll_temp.iloc[turn2_indx[0]:] | |
seg1_trialType2 = seg1['trialType2'].replace({'idle': 'idle', 'stand': 'idle', 'turn1': 'idle', 'turn2': 'idle', | |
'stand-walk':'levelground1', 'walk':'levelground1', | |
'walk-stand': 'levelground1'}) | |
seg2_trialType2 = seg2['trialType2'].replace({'idle': 'idle', 'stand': 'idle', 'turn1': 'idle','turn2': 'idle', | |
'stand-walk':'levelground2', 'walk':'levelground2', | |
'walk-stand': 'levelground2'}) | |
ll_temp['trialType2'] = pd.concat([seg1_trialType2, seg2_trialType2]) | |
ll = ll_temp | |
elif ll['trialType'][0] =='ramp': | |
ll_temp['trialType2'] = ll_temp['trialType2'].replace({'idle': 'idle', | |
'walk-rampascent': 'rampascent', 'rampascent':'rampascent','rampascent-walk': 'rampascent', | |
'walk-rampdescent': 'rampdescent', 'rampdescent':'rampdescent','rampdescent-walk': 'rampdescent'}) | |
ll = ll_temp | |
elif ll['trialType'][0] == 'stair': | |
ll_temp['trialType2'] = ll_temp['trialType2'].replace({'idle': 'idle', | |
'walk-stairascent': 'stairascent', 'stairascent':'stairascent','stairascent-walk': 'stairascent', | |
'walk-stairdescent': 'stairdescent', 'stairdescent':'stairdescent','stairdescent-walk': 'stairdescent'}) | |
ll = ll_temp | |
update_selected_activity_label = list(ll['trialType2'].unique()) | |
# remove stand, idle, turn1, turn2 samples | |
update_selected_activity_label = [i for i in update_selected_activity_label if | |
i not in ['idle']] | |
for activity_label in update_selected_activity_label: | |
# if trial type == levelground ->save stand-walk and walk into one trial and walk-stand into another trial. all samples would be continues | |
# if ramp or stair--> save trial for ascent and descent individually | |
if isinstance(activity_label, str): | |
l_temp = ll[(ll['trialType'].isin(self.selected_trial_type)) & (ll['trialType2']==activity_label)] | |
l_temp_index = l_temp.index.values | |
xx_temp = xx[l_temp_index] | |
yy_temp = yy[l_temp_index] | |
updated_x.append(xx_temp) | |
update_y.append(yy_temp) | |
updated_label.append(l_temp) | |
if len(xx_temp)==0: | |
print(i) | |
elif self.config['dataset_name']=='camargo': | |
l_temp = ll[(ll['trialType'].isin(self.selected_trial_type)) & (ll['Label'].isin(self.selected_activity_label))] | |
l_temp_index = l_temp.index.values | |
xx_temp = xx[l_temp_index] | |
yy_temp = yy[l_temp_index] | |
updated_x.append(xx_temp) | |
update_y.append(yy_temp) | |
updated_label.append(l_temp) | |
elif self.config['dataset_name']=='kiha': | |
l_temp = ll[(ll['trialType'].isin(self.selected_trial_type))] | |
l_temp_index = l_temp.index.values | |
xx_temp = xx[l_temp_index] | |
yy_temp = yy[l_temp_index] | |
updated_x.append(xx_temp) | |
update_y.append(yy_temp) | |
updated_label.append(l_temp) | |
# else: | |
# continue | |
return updated_x, update_y, updated_label | |
def concatenate_data(self): | |
self.labels = pd.concat(self.labels, axis=0, ignore_index = True) | |
self.x = np.concatenate(self.x, axis=0) | |
self.y = np.concatenate(self.y, axis=0) | |
def run_dataset_split_loop(self): | |
train_labels = [] | |
test_labels = [] | |
train_x = [] | |
train_y = [] | |
test_x = [] | |
test_y = [] | |
for t, trial in enumerate(self.labels): | |
if all(trial['subject'].isin(self.train_subjects)) and all(trial['trialType2'].isin(self.train_activity)): | |
train_labels.append(trial) | |
train_x.append(self.x[t]) | |
train_y.append(self.y[t]) | |
elif all(trial['subject'].isin(self.test_subjects)) and all(trial['trialType2'].isin(self.test_activity)): | |
test_labels.append(trial) | |
test_x.append(self.x[t]) | |
test_y.append(self.y[t]) | |
self.train_dataset['x'] = train_x | |
self.train_dataset['y'] = train_y | |
self.train_dataset['labels'] = train_labels | |
self.test_dataset['x'] = test_x | |
self.test_dataset['y'] = test_y | |
self.test_dataset['labels'] = test_labels | |
return self.train_dataset, self.test_dataset | |
def run_dataset_split(self): | |
if set(self.test_subjects).issubset(self.train_subjects): | |
train_labels = self.labels[~self.labels['subject'].isin(self.test_subjects)] | |
test_labels = self.labels[(self.labels['subjects'].isin(self.test_subjects))] | |
else: | |
train_labels = self.labels[self.labels['subject'].isin(self.train_subjects)] | |
test_labels = self.labels[(self.labels['subject'].isin(self.test_subjects))] | |
print(train_labels['subject'].unique()) | |
print(test_labels['subject'].unique()) | |
train_index = train_labels.index.values | |
test_index = test_labels.index.values | |
print('training length', len(train_index)) | |
print('test length', len(test_index)) | |
train_x = self.x[train_index] | |
train_y = self.y[train_index] | |
# self.train_dataset['x'] = train_x.reshape([int(train_x.shape[0]/self.config['target_padding_length']), self.config['target_padding_length'], train_x.shape[1]]) | |
# self.train_dataset['y'] = train_y.reshape([int(train_y.shape[0]/self.config['target_padding_length']), self.config['target_padding_length'], train_y.shape[1]]) | |
self.train_dataset['x'] = train_x | |
self.train_dataset['y'] = train_y | |
self.train_dataset['labels'] = train_labels.reset_index(drop=True) | |
test_x = self.x[test_index] | |
test_y = self.y[test_index] | |
# self.test_dataset['x'] = test_x.reshape([int(test_x.shape[0]/self.config['target_padding_length']), self.config['target_padding_length'], test_x.shape[1]]) | |
# self.test_dataset['y'] = test_y.reshape([int(test_y.shape[0]/self.config['target_padding_length']), self.config['target_padding_length'], test_y.shape[1]]) | |
self.test_dataset['x'] = test_x | |
self.test_dataset['y'] = test_y | |
self.test_dataset['labels'] = test_labels.reset_index(drop=True) | |
del train_labels, test_labels, train_x, train_y, test_x, test_y | |
return self.train_dataset, self.test_dataset | |