Spaces:
Runtime error
Runtime error
| import os | |
| import csv | |
| import torch | |
| import random | |
| import numpy as np | |
| from torch.autograd import Variable | |
| from model.tf_model import subsequent_mask | |
| from torch.utils.data import Dataset | |
| class Batch: | |
| "Object for holding a batch of data with mask during training." | |
| def __init__(self, src, trg=None, ys=None, pad=0): | |
| self.src = src | |
| self.ys = ys | |
| self.src_len = src[:, 0, :] | |
| self.src_mask = (self.src_len != pad).unsqueeze(-2) | |
| if trg is not None: | |
| # ------for EEG------- | |
| self.trg = trg[:, :, :-1] | |
| self.trg_len = trg[:, 0, :] | |
| self.trg_x = self.trg_len[:, :-1] | |
| self.trg_y = self.trg_len[:, 1:] | |
| # ------for NLP------- | |
| #self.trg = trg[:, :-1] | |
| #self.trg_x = trg[:, :-1] | |
| #self.trg_y = trg[:, 1:] | |
| self.trg_mask = \ | |
| self.make_std_mask(self.trg_x, pad) | |
| self.ntokens = (self.trg_y != pad).data.sum() | |
| def make_std_mask(tgt, pad): | |
| "Create a mask to hide padding and future words." | |
| tgt_mask = (tgt != pad).unsqueeze(-2) | |
| tgt_mask = tgt_mask & Variable( | |
| subsequent_mask(tgt.size(-1)).type_as(tgt_mask.data)) | |
| return tgt_mask | |
| def data_gen(V, batch, nbatches): | |
| "Generate random data for a src-tgt copy task." | |
| for i in range(nbatches): | |
| data = torch.from_numpy(np.random.randint(1, V, size=(batch, 10))) | |
| data[:, 0] = 1 | |
| src = Variable(data, requires_grad=False) | |
| tgt = Variable(data, requires_grad=False) | |
| yield Batch(src, tgt, 0) | |
| def data_load(train_loader): | |
| print("data_load:", len(train_loader)) | |
| for i, (attr, target, ys) in enumerate(train_loader): | |
| #src = Variable(attr, requires_grad=False) | |
| #tgt = Variable(target, requires_grad=False) | |
| #print("data_load1:", attr.shape) | |
| #print("data_load2:", target.shape) | |
| src = attr.cuda() | |
| tgt = target.cuda() | |
| ys = ys.cuda() | |
| yield Batch(src, tgt, ys, 0) | |
| class preDataset(Dataset): | |
| def __init__(self, mode, train_len=870300, block_num=1): | |
| self.sample_rate = 256 | |
| self.lenth = 870300 #train_len | |
| self.lenthtest = 3600 | |
| self.lenthval = 3500 | |
| self.mode = mode | |
| self.block_num = block_num | |
| def __len__(self): | |
| if self.mode == 2: | |
| return self.lenthval | |
| elif self.mode == 1: | |
| return self.lenthtest | |
| else: | |
| return self.lenth | |
| def __getitem__(self, idx): | |
| ''' | |
| :param idx: | |
| :return: | |
| ''' | |
| data_mode = ["Brain", "ChannelNoise", "Eye", "Heart", "LineNoise", "Muscle", "Other"] | |
| ## step 1 locate idx into dataset | |
| dataset = ["Hyperscanning_navigation", "Hyperscanning_slapjack", "Lane_keeping"] | |
| if idx < 249816: | |
| now_idx = idx | |
| dataloc = 0 | |
| elif idx >= 249816 and idx < 506365: | |
| now_idx = idx - 249816 | |
| dataloc = 1 | |
| elif idx >= 506365: | |
| now_idx = idx - 506365 | |
| dataloc = 2 | |
| ## step 2 read data | |
| folder_name = './MetaPreTrain/' + dataset[dataloc] + '/3_ICA/' | |
| allFileList = os.listdir(folder_name) | |
| file_name1 = folder_name + allFileList[now_idx] | |
| #print("preDataset1: ", allFileList[now_idx]) | |
| ## get after 4 sec data | |
| string_array = allFileList[now_idx] | |
| parts = string_array[0].split('_') | |
| numeric_part = parts[-1].split('.')[0] | |
| new_numeric_part = str(int(numeric_part) + 4) | |
| parts[-1] = new_numeric_part + '.csv' | |
| string_array = '_'.join(parts) | |
| file_name2 = folder_name + string_array | |
| #print("preDataset2: ", string_array) | |
| try: | |
| data_nosie = self.read_train_data(file_name1) | |
| data_clean = self.read_train_data(file_name2) | |
| except: | |
| data_nosie = self.read_train_data(file_name1) | |
| data_clean = data_nosie | |
| ## step 3 signal normalize | |
| max_num = np.max(data_nosie) | |
| data_avg = np.average(data_nosie) | |
| data_std = np.std(data_nosie) | |
| if int(data_std) != 0: | |
| target = np.array((data_clean - data_avg) / data_std).astype(np.float64) | |
| attr = np.array((data_nosie - data_avg) / data_std).astype(np.float64) | |
| else: | |
| target = np.array(data_clean - data_avg).astype(np.float64) | |
| attr = np.array(data_nosie - data_avg).astype(np.float64) | |
| ## step 4 deep copy & return | |
| #target = target.copy() | |
| target = torch.FloatTensor(target) | |
| #attr = attr.copy() | |
| attr = torch.FloatTensor(attr) | |
| ys = torch.ones(30, 1023).fill_(1).type_as(attr) | |
| return attr, target, ys | |
| def read_train_data(self, file_name): | |
| with open(file_name, 'r', newline='') as f: | |
| lines = csv.reader(f) | |
| data = [] | |
| for line in lines: | |
| data.append(line) | |
| new_data = np.array(data).astype(np.float64) | |
| return new_data | |
| class myDataset(Dataset): | |
| def __init__(self, mode, train_len=0, block_num=1): | |
| self.sample_rate = 256 | |
| self.lenth = train_len | |
| self.lenthtest = 3600 | |
| self.lenthval = 3500 | |
| self.mode = mode | |
| self.block_num = block_num | |
| def __len__(self): | |
| if self.mode == 2: | |
| return self.lenthval | |
| elif self.mode == 1: | |
| return self.lenthtest | |
| else: | |
| return self.lenth | |
| def __getitem__(self, idx): | |
| ''' | |
| :param idx: | |
| :return: | |
| ''' | |
| data_mode = ["Brain", "ChannelNoise", "Eye", "Heart", "LineNoise", "Muscle", "Other"] | |
| if self.mode == 2: | |
| allFileList = os.listdir("./Real_EEG/val/Brain/") | |
| file_name = './Real_EEG/val/Brain/' + allFileList[idx] | |
| data_clean = self.read_train_data(file_name) | |
| for i in range(7): | |
| file_name = './Real_EEG/val/' + data_mode[random.randint(0, 6)] + '/' + allFileList[idx] | |
| if os.path.isfile(file_name): | |
| data_nosie = self.read_train_data(file_name) | |
| break | |
| else: | |
| data_nosie = data_clean | |
| elif self.mode == 1: | |
| allFileList = os.listdir("./Real_EEG/test/Brain/") | |
| file_name = './Real_EEG/test/Brain/' + allFileList[idx] | |
| data_clean = self.read_train_data(file_name) | |
| for i in range(7): | |
| file_name = './Real_EEG/test/' + data_mode[random.randint(0, 6)] + '/' + allFileList[idx] | |
| if os.path.isfile(file_name): | |
| data_nosie = self.read_train_data(file_name) | |
| break | |
| else: | |
| data_nosie = data_clean | |
| else: | |
| allFileList = os.listdir("./Real_EEG/train/Brain/") | |
| file_name = './Real_EEG/train/Brain/' + allFileList[idx] | |
| #print("dataloader: ", file_name) | |
| data_clean = self.read_train_data(file_name) | |
| for i in range(7): | |
| file_name = './Real_EEG/train/' + data_mode[random.randint(0, 6)] + '/' + allFileList[idx] | |
| if os.path.isfile(file_name): | |
| data_nosie = self.read_train_data(file_name) | |
| break | |
| else: | |
| data_nosie = data_clean | |
| #print(file_name) | |
| #print("data_set", noise.shape) | |
| max_num = np.max(data_nosie) | |
| data_avg = np.average(data_nosie) | |
| data_std = np.std(data_nosie) | |
| #max_num = 100 | |
| #print("max_num: ", max_num) | |
| #target = np.array(data / max_num).astype(np.float) | |
| if int(data_std) != 0: | |
| target = np.array((data_clean - data_avg) / data_std).astype(np.float64) | |
| attr = np.array((data_nosie - data_avg) / data_std).astype(np.float64) | |
| else: | |
| target = np.array(data_clean - data_avg).astype(np.float64) | |
| attr = np.array(data_nosie - data_avg).astype(np.float64) | |
| ## step 4 deep copy & return | |
| # target = target.copy() | |
| target = torch.FloatTensor(target) | |
| # attr = attr.copy() | |
| attr = torch.FloatTensor(attr) | |
| ys = torch.ones(30, 1023).fill_(1).type_as(attr) | |
| return attr, target, ys | |
| def read_train_data(self, file_name): | |
| with open(file_name, 'r', newline='') as f: | |
| lines = csv.reader(f) | |
| data = [] | |
| for line in lines: | |
| data.append(line) | |
| new_data = np.array(data).astype(np.float64) | |
| ''' for training 19 channels | |
| row = np.array([0,1,2,3,4,5,6,12,13,14,15,16,22,23,24,25,26,27,29]) | |
| new_data = [] | |
| for i in range(19): | |
| #print(i, row[i]) | |
| #print(data[row[i]].shape) | |
| new_data.append(data[row[i]]) | |
| new_data = np.array(new_data).astype(np.float) | |
| ''' | |
| # data = data.T | |
| return new_data |