import numpy as np import pandas as pd from loading.loadpickledataset import LoadPickleDataSet from preprocessing.augmentation.gaussiannoise import GaussianNoise from preprocessing.augmentation.imurotation import IMURotation from preprocessing.filter_imu import FilterIMU from preprocessing.filter_opensim import FilterOpenSim from preprocessing.remove_outlier import remove_outlier from preprocessing.resample import Resample from preprocessing.segmentation.fixwindowsegmentation import FixWindowSegmentation from preprocessing.segmentation.gaitcyclesegmentation import GaitCycleSegmentation from preprocessing.segmentation.zeropaddingsegmentation import ZeroPaddingSegmentation class DataSet: def __init__(self, config, load_dataset=True): self.config = config self.x = [] self.y = [] self.labels = [] self.selected_trial_type = config['selected_trial_type'] self.selected_activity_label = config['selected_activity_label'] self.segmentation_method = config['segmentation_method'] if self.config['gc_dataset']: self.segmentation_method = 'zeropadding' self.resample = config['resample'] self.n_sample = len(self.y) if load_dataset: self.load_dataset() self.train_subjects = config['train_subjects'] self.test_subjects = config['test_subjects'] self.train_activity = config['train_activity'] self.test_activity = config['test_activity'] # self.winsize = 128 self.train_dataset = {} self.test_dataset = {} def load_dataset(self): getdata_handler = LoadPickleDataSet(self.config) x, y, labels = getdata_handler.run_get_dataset() self.x, self.y, self.labels = self.run_activity_based_filter(x, y, labels) self._preprocess() def _preprocess(self): self.x, self.y, self.labels = remove_outlier(self.x, self.y, self.labels) if self.resample: self.x, self.y, self.labels = self.run_resample_signal(self.x, self.y, self.labels) if self.config['opensim_filter']: filteropensim_handler = FilterOpenSim(self.y, lowcut=6, fs=100, order=2) self.y = filteropensim_handler.run_lowpass_filter() if self.config['imu_filter']: filterimu_handler = FilterIMU(self.x, lowcut=10, fs=100, order=2) self.x = filterimu_handler.run_lowpass_filter() def run_resample_signal(self, x, y, labels): resample_handler = Resample(x, y, labels, 200, 100) x, y, labels = resample_handler._run_resample() return x, y, labels def run_segmentation(self, x, y, labels): if self.segmentation_method == 'fixedwindow': segmentation_handler = FixWindowSegmentation(x, y, labels, winsize=self.config['target_padding_length'], overlap=0.5, start_over=True) self.x, self.y, self.labels = segmentation_handler._run_segmentation() elif self.segmentation_method == 'zeropadding': segmentation_handler = ZeroPaddingSegmentation(x, y, labels, target_padding_length=self.config['target_padding_length'], start_over=True) self.x, self.y, self.labels = segmentation_handler._run_segmentation() elif self.segmentation_method == 'gaitcycle': segmentation_handler = GaitCycleSegmentation(x, y, labels, winsize=128, overlap=0.5, start_over=True) self.x, self.y, self.labels = segmentation_handler._run_segmentation() if self.config['opensim_filter']: filteropensim_handler = FilterOpenSim(self.y, lowcut=6, fs=100, order=2) self.y = filteropensim_handler.run_lowpass_filter() if self.config['rotation']: imu_rotation_handler = IMURotation(knom=10) self.x, self.y, self.labels = imu_rotation_handler.run_rotation(self.x.copy(), self.y.copy(), self.labels.copy()) if self.config['gaussian_noise']: gaussian_noise_handler = GaussianNoise(0, .05) self.x, self.y, self.labels = gaussian_noise_handler.run_add_noise(self.x, self.y, self.labels) del x, y, labels return self.x, self.y, self.labels def run_activity_based_filter(self, x, y, label): ''' :return: updated x, y, and labels which contains only the selected labels (activity section) ''' updated_x = [] update_y = [] updated_label = [] s = 0 for ll, xx, yy, in zip(label, x, y): # print(ll['subject'][0]) # print(ll['trialNum'][0]) if self.config['dataset_name']=='camargo' and ll['trialType'].isin(self.selected_trial_type).all() and self.selected_activity_label == ['all_idle']: l_temp = ll[ll['trialType'].isin(self.selected_trial_type)] l_temp_index = l_temp.index.values xx_temp = xx[l_temp_index] yy_temp = yy[l_temp_index] updated_x.append(xx_temp) update_y.append(yy_temp) updated_label.append(l_temp) elif self.config['dataset_name']=='camargo' and ll['trialType'].isin(self.selected_trial_type).all() and self.selected_activity_label == ['all']: update_selected_activity_label = list(ll['Label'].unique()) update_selected_activity_label = [i for i in update_selected_activity_label if i not in ['idle', 'stand']] l_temp = ll[(ll['trialType'].isin(self.selected_trial_type)) & (ll['Label'].isin(update_selected_activity_label))] l_temp_index = l_temp.index.values xx_temp = xx[l_temp_index] yy_temp = yy[l_temp_index] updated_x.append(xx_temp) update_y.append(yy_temp) updated_label.append(l_temp) elif self.config['dataset_name'] == 'camargo' and ll['trialType'].isin(self.selected_trial_type).all() and self.selected_activity_label == ['all_split']: ll_temp = ll.copy() ll_temp['trialType2'] =ll_temp['Label'] if ll['trialType'][0] =='levelground': # get the turn index if it's there turn1_indx = ll_temp[ll_temp['Label'] == 'turn1'].index.values turn2_indx = ll_temp[ll_temp['Label'] == 'turn2'].index.values # check which turn is turn 1 if turn1_indx[0]save stand-walk and walk into one trial and walk-stand into another trial. all samples would be continues # if ramp or stair--> save trial for ascent and descent individually if isinstance(activity_label, str): l_temp = ll[(ll['trialType'].isin(self.selected_trial_type)) & (ll['trialType2']==activity_label)] l_temp_index = l_temp.index.values xx_temp = xx[l_temp_index] yy_temp = yy[l_temp_index] updated_x.append(xx_temp) update_y.append(yy_temp) updated_label.append(l_temp) if len(xx_temp)==0: print(i) elif self.config['dataset_name']=='camargo': l_temp = ll[(ll['trialType'].isin(self.selected_trial_type)) & (ll['Label'].isin(self.selected_activity_label))] l_temp_index = l_temp.index.values xx_temp = xx[l_temp_index] yy_temp = yy[l_temp_index] updated_x.append(xx_temp) update_y.append(yy_temp) updated_label.append(l_temp) elif self.config['dataset_name']=='kiha': l_temp = ll[(ll['trialType'].isin(self.selected_trial_type))] l_temp_index = l_temp.index.values xx_temp = xx[l_temp_index] yy_temp = yy[l_temp_index] updated_x.append(xx_temp) update_y.append(yy_temp) updated_label.append(l_temp) # else: # continue return updated_x, update_y, updated_label def concatenate_data(self): self.labels = pd.concat(self.labels, axis=0, ignore_index = True) self.x = np.concatenate(self.x, axis=0) self.y = np.concatenate(self.y, axis=0) def run_dataset_split_loop(self): train_labels = [] test_labels = [] train_x = [] train_y = [] test_x = [] test_y = [] for t, trial in enumerate(self.labels): if all(trial['subject'].isin(self.train_subjects)) and all(trial['trialType2'].isin(self.train_activity)): train_labels.append(trial) train_x.append(self.x[t]) train_y.append(self.y[t]) elif all(trial['subject'].isin(self.test_subjects)) and all(trial['trialType2'].isin(self.test_activity)): test_labels.append(trial) test_x.append(self.x[t]) test_y.append(self.y[t]) self.train_dataset['x'] = train_x self.train_dataset['y'] = train_y self.train_dataset['labels'] = train_labels self.test_dataset['x'] = test_x self.test_dataset['y'] = test_y self.test_dataset['labels'] = test_labels return self.train_dataset, self.test_dataset def run_dataset_split(self): if set(self.test_subjects).issubset(self.train_subjects): train_labels = self.labels[~self.labels['subject'].isin(self.test_subjects)] test_labels = self.labels[(self.labels['subjects'].isin(self.test_subjects))] else: train_labels = self.labels[self.labels['subject'].isin(self.train_subjects)] test_labels = self.labels[(self.labels['subject'].isin(self.test_subjects))] print(train_labels['subject'].unique()) print(test_labels['subject'].unique()) train_index = train_labels.index.values test_index = test_labels.index.values print('training length', len(train_index)) print('test length', len(test_index)) train_x = self.x[train_index] train_y = self.y[train_index] # self.train_dataset['x'] = train_x.reshape([int(train_x.shape[0]/self.config['target_padding_length']), self.config['target_padding_length'], train_x.shape[1]]) # self.train_dataset['y'] = train_y.reshape([int(train_y.shape[0]/self.config['target_padding_length']), self.config['target_padding_length'], train_y.shape[1]]) self.train_dataset['x'] = train_x self.train_dataset['y'] = train_y self.train_dataset['labels'] = train_labels.reset_index(drop=True) test_x = self.x[test_index] test_y = self.y[test_index] # self.test_dataset['x'] = test_x.reshape([int(test_x.shape[0]/self.config['target_padding_length']), self.config['target_padding_length'], test_x.shape[1]]) # self.test_dataset['y'] = test_y.reshape([int(test_y.shape[0]/self.config['target_padding_length']), self.config['target_padding_length'], test_y.shape[1]]) self.test_dataset['x'] = test_x self.test_dataset['y'] = test_y self.test_dataset['labels'] = test_labels.reset_index(drop=True) del train_labels, test_labels, train_x, train_y, test_x, test_y return self.train_dataset, self.test_dataset