Spaces:
Sleeping
Sleeping
File size: 10,245 Bytes
bacf16b 64e42c0 bacf16b 82579ab bacf16b 64e42c0 bacf16b |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 |
'''Defines a generic data formatter for CGM data sets.'''
import sys
import warnings
import numpy as np
import pandas as pd
import sklearn.preprocessing
import data_formatter.types as types
import data_formatter.utils as utils
DataTypes = types.DataTypes
InputTypes = types.InputTypes
dict_data_type = {'categorical': DataTypes.CATEGORICAL,
'real_valued': DataTypes.REAL_VALUED,
'date': DataTypes.DATE}
dict_input_type = {'target': InputTypes.TARGET,
'observed_input': InputTypes.OBSERVED_INPUT,
'known_input': InputTypes.KNOWN_INPUT,
'static_input': InputTypes.STATIC_INPUT,
'id': InputTypes.ID,
'time': InputTypes.TIME}
class DataFormatter:
# Defines and formats data.
def __init__(self, cnf
#, df
):
"""Initialises formatter."""
# load parameters from the config file
self.params = cnf
# write progress to file if specified
# load column definition
print('-'*32)
print('Loading column definition...')
self.__process_column_definition()
# check that column definition is valid
print('Checking column definition...')
self.__check_column_definition()
# load data
# check if data table has index col: -1 if not, index >= 0 if yes
print('Loading data...')
self.params['index_col'] = False if self.params['index_col'] == -1 else self.params['index_col']
# read data table
self.data = pd.read_csv(self.params['data_csv_path'], index_col=self.params['index_col'])
# drop columns / rows
print('Dropping columns / rows...')
self.__drop()
# check NA values
print('Checking for NA values...')
self.__check_nan()
# set data types in DataFrame to match column definition
print('Setting data types...')
self.__set_data_types()
# drop columns / rows
print('Dropping columns / rows...')
self.__drop()
# encode
print('Encoding data...')
self._encoding_params = self.params['encoding_params']
self.__encode()
# interpolate
print('Interpolating data...')
self._interpolation_params = self.params['interpolation_params']
self._interpolation_params['interval_length'] = self.params['observation_interval']
self.__interpolate()
# split data
print('Splitting data...')
self._split_params = self.params['split_params']
self._split_params['max_length_input'] = self.params['max_length_input']
self.__split_data()
# scale
print('Scaling data...')
self._scaling_params = self.params['scaling_params']
self.__scale()
print('Data formatting complete.')
print('-'*32)
def __process_column_definition(self):
self._column_definition = []
for col in self.params['column_definition']:
self._column_definition.append((col['name'],
dict_data_type[col['data_type']],
dict_input_type[col['input_type']]))
def __check_column_definition(self):
# check that there is unique ID column
assert len([col for col in self._column_definition if col[2] == InputTypes.ID]) == 1, 'There must be exactly one ID column.'
# check that there is unique time column
assert len([col for col in self._column_definition if col[2] == InputTypes.TIME]) == 1, 'There must be exactly one time column.'
# check that there is at least one target column
assert len([col for col in self._column_definition if col[2] == InputTypes.TARGET]) >= 1, 'There must be at least one target column.'
def __set_data_types(self):
# set time column as datetime format in pandas
for col in self._column_definition:
if col[1] == DataTypes.DATE:
self.data[col[0]] = pd.to_datetime(self.data[col[0]])
if col[1] == DataTypes.CATEGORICAL:
self.data[col[0]] = self.data[col[0]].astype('category')
if col[1] == DataTypes.REAL_VALUED:
self.data[col[0]] = self.data[col[0]].astype(np.float32)
def __check_nan(self):
# delete rows where target, time, or id are na
self.data = self.data.dropna(subset=[col[0]
for col in self._column_definition
if col[2] in [InputTypes.TARGET, InputTypes.TIME, InputTypes.ID]])
# assert that there are no na values in the data
assert self.data.isna().sum().sum() == 0, 'There are NA values in the data even after dropping with missing time, glucose, or id.'
def __drop(self):
# drop columns that are not in the column definition
self.data = self.data[[col[0] for col in self._column_definition]]
# drop rows based on conditions set in the formatter
if self.params['drop'] is not None:
if self.params['drop']['rows'] is not None:
# drop row at indices in the list self.params['drop']['rows']
self.data = self.data.drop(self.params['drop']['rows'])
self.data = self.data.reset_index(drop=True)
if self.params['drop']['columns'] is not None:
for col in self.params['drop']['columns'].keys():
# drop rows where specified columns have values in the list self.params['drop']['columns'][col]
self.data = self.data.loc[~self.data[col].isin(self.params['drop']['columns'][col])].copy()
def __interpolate(self):
self.data, self._column_definition = utils.interpolate(self.data,
self._column_definition,
**self._interpolation_params)
def __split_data(self):
if self.params['split_params']['test_percent_subjects'] == 0 or \
self.params['split_params']['length_segment'] == 0:
print('\tNo splitting performed since test_percent_subjects or length_segment is 0.')
self.train_idx, self.val_idx, self.test_idx, self.test_idx_ood = None, None, None, None
self.train_data, self.val_data, self.test_data = self.data, None, None
else:
assert self.params['split_params']['length_segment'] > self.params['length_pred'], \
'length_segment for test / val must be greater than length_pred.'
self.train_idx, self.val_idx, self.test_idx, self.test_idx_ood = utils.split(self.data,
self._column_definition,
**self._split_params)
self.train_data, self.val_data, self.test_data = self.data.iloc[self.train_idx], \
self.data.iloc[self.val_idx], \
self.data.iloc[self.test_idx + self.test_idx_ood]
def __encode(self):
self.data, self._column_definition, self.encoders = utils.encode(self.data,
self._column_definition,
**self._encoding_params)
def __scale(self):
self.train_data, self.val_data, self.test_data, self.scalers = utils.scale(self.train_data,
self.val_data,
self.test_data,
self._column_definition,
**self.params['scaling_params'])
def reshuffle(self, seed):
stdout = sys.stdout
f = open(self.study_file, 'a')
sys.stdout = f
self.params['split_params']['random_state'] = seed
# split data
self.train_idx, self.val_idx, self.test_idx, self.test_idx_ood = utils.split(self.data,
self._column_definition,
**self._split_params)
self.train_data, self.val_data, self.test_data = self.data.iloc[self.train_idx], \
self.data.iloc[self.val_idx], \
self.data.iloc[self.test_idx+self.test_idx_ood]
# re-scale data
self.train_data, self.val_data, self.test_data, self.scalers = utils.scale(self.train_data,
self.val_data,
self.test_data,
self._column_definition,
**self.params['scaling_params'])
sys.stdout = stdout
f.close()
def get_column(self, column_name):
# write cases for time, id, target, future, static, dynamic covariates
if column_name == 'time':
return [col[0] for col in self._column_definition if col[2] == InputTypes.TIME][0]
elif column_name == 'id':
return [col[0] for col in self._column_definition if col[2] == InputTypes.ID][0]
elif column_name == 'sid':
return [col[0] for col in self._column_definition if col[2] == InputTypes.SID][0]
elif column_name == 'target':
return [col[0] for col in self._column_definition if col[2] == InputTypes.TARGET]
elif column_name == 'future_covs':
future_covs = [col[0] for col in self._column_definition if col[2] == InputTypes.KNOWN_INPUT]
return future_covs if len(future_covs) > 0 else None
elif column_name == 'static_covs':
static_covs = [col[0] for col in self._column_definition if col[2] == InputTypes.STATIC_INPUT]
return static_covs if len(static_covs) > 0 else None
elif column_name == 'dynamic_covs':
dynamic_covs = [col[0] for col in self._column_definition if col[2] == InputTypes.OBSERVED_INPUT]
return dynamic_covs if len(dynamic_covs) > 0 else None
else:
raise ValueError('Column {} not found.'.format(column_name))
|