import numpy as np
import os
import math
import pandas as pd
import matplotlib.pyplot as plt
import scipy.signal as signal
from scipy.fft import fft
from findpeaks import findpeaks
import glob
import tqdm
import pickle
import random
from sklearn.utils import shuffle
from sklearn.decomposition import PCA
import warnings
import utils
path_to_data = "./data/TRAINING-EXAMPLE/Soldering/"
data_heading = 'em_data'
mcu = 'arduino'
use_filter = True
do_feature_selection = True
clock_freq = 16e6 #16e6 for arduino, 32e6 for pico
#make appropriate directories if needed
from control_flows import Control_Flow
control_flow = Control_Flow.Soldering()
basicBlocks = control_flow.get_basicBlocks()
col_heading = 'CH2'
crop_high_voltage = 1.99
start = 0
stop = 1000000
example = pd.read_csv(f'{path_to_data}{data_heading}20.csv', header=19)[start:stop]
# print(example)
# print(example1)
plt.plot(example['CH2'])
plt.plot(example['CH3'])
plt.gca().set_ylim([-4,8])
def pull_block(list, actual_block):
choice = random.choice(list)
while choice==actual_block:
choice = random.choice(list)
return choice
# Split into blocks and store in specific directories
# channel 2 is the data signal
# channel 3 is the cropping signal
for block in basicBlocks:
try:
os.mkdir(f'{path_to_data}split{block}')
os.mkdir(f'{path_to_data}split{block}/train')
os.mkdir(f'{path_to_data}split{block}/test')
os.mkdir(f'{path_to_data}split{block}/validation')
except FileExistsError:
#the directory already exists
pass
#get all the csv files from the data directory
csv_files = glob.glob(f'{path_to_data}*.csv')
#clockFreq of Arduino = 16 MHz -> 2.5 GS/s sampling rate of oscilloscope
if mcu=='arduino':
data_points_in_a_clock_cycle = round((6.3e-8)/(4e-10))
crop_position = 2.2e5
elif mcu=='pico':
#clockFreq of Underclocked Pico = 32 MHz
data_points_in_a_clock_cycle= round((1/(0.032e9)/(4e-10)))
crop_position = 0
for file in tqdm.tqdm(csv_files):
#switches to create train/test sets
train_or_test = ''
if csv_files.index(file)/len(csv_files) < 0.7:
train_or_test = 'train'
elif csv_files.index(file)/len(csv_files) < 0.85:
train_or_test = 'test'
else:
train_or_test = 'validation'
data = pd.read_csv(file, header=19)
crop_position = 0
cropLevel = data['CH3'][crop_position]
#find the first crop
while cropLevel<crop_high_voltage and crop_position<1000000:
cropLevel = data['CH3'][crop_position]
crop_position += 1
if cropLevel<1000000:
block_num = 0
while cropLevel>2:
crop_position += 1
cropLevel = data['CH3'][crop_position]
#skip to the end of the NOP sled
crop_position += 30*data_points_in_a_clock_cycle
#--------------------------------------------------------------------
for block in control_flow:
cycles = control_flow[block]
clockCycles = cycles[0] * data_points_in_a_clock_cycle
if block[0:2] == '0x':
data['CH2'][int(crop_position):int(crop_position+clockCycles)].to_csv(f'{path_to_data}split{block}/{train_or_test}/{file[file.find(data_heading)+len(data_heading):-4]}_{block_num}.csv')
block_num += 1
crop_position += clockCycles
if crop_position >= len(data['CH2']):
print("ERROR")
else:
print(f"{file} errored")
#------------- Not Blocks -----------------------------------------
### Split into blocks and store in specific directories
#channel 2 is the data signal
#channel 3 is the cropping signal
#make appropriate directories if needed
for block in basicBlocks:
try:
os.mkdir(f'{path_to_data}split{block}_not')
os.mkdir(f'{path_to_data}split{block}_not/train')
os.mkdir(f'{path_to_data}split{block}_not/test')
os.mkdir(f'{path_to_data}split{block}_not/validation')
except FileExistsError:
#the directory already exists
pass
#get all the csv files from the data directory
csv_files = glob.glob(f'{path_to_data}*.csv')
#clockFreq of Arduino = 16 MHz -> 2.5 GS/s sampling rate of oscilloscope
if mcu=='arduino':
data_points_in_a_clock_cycle = round((6.3e-8)/(4e-10))
crop_position = 2.2e5
elif mcu=='rpi':
#clockFreq of Rpi = 150 Mhz
data_points_in_a_clock_cycle= round((1/(0.150e9)/(4e-10)))
crop_position = 40000
for file in tqdm.tqdm(csv_files):
#switches to create train/test sets
train_or_test = ''
if csv_files.index(file)/len(csv_files) < 0.7:
train_or_test = 'train'
elif csv_files.index(file)/len(csv_files) < 0.85:
train_or_test = 'test'
else:
train_or_test = 'validation'
data = pd.read_csv(file, header=19)
crop_position = 0
cropLevel = data['CH3'][crop_position]
random_pull_arr = list(basicBlocks.keys())
#find the first crop
while cropLevel<crop_high_voltage and crop_position<1000000:
cropLevel = data['CH3'][crop_position]
crop_position += 1
if cropLevel<1000000:
block_num = 0
#move to the other side of the crop
while cropLevel>2:
crop_position += 1
cropLevel = data['CH3'][crop_position]
#skip to the end of the NOP sled
crop_position += 30*data_points_in_a_clock_cycle
for block in control_flow:
rand_block = pull_block(random_pull_arr, block)
clockCycles = basicBlocks[rand_block][0] * data_points_in_a_clock_cycle
if block[0:2] == '0x':
data['CH2'][int(crop_position):int(crop_position+clockCycles)].to_csv(f'{path_to_data}split{rand_block}_not/{train_or_test}/{file[file.find(data_heading)+len(data_heading):-4]}_{block_num}.csv')
block_num += 1
cycles = control_flow[block]
clockCycles = cycles[0] * data_points_in_a_clock_cycle
crop_position += clockCycles
if crop_position >= len(data['CH2']):
print("ERROR")
## perform feature selection, on train only
block_data = {}
not_block_data = {}
signature_traces = {}
not_signature_traces = {}
for block in basicBlocks:
#grab all the csv files
csv_files = glob.glob(f'{path_to_data}split{block}/train/*')
#make a dictionary to hold all of the values we will be reading
length = len(pd.read_csv(csv_files[7],header=0))
block_data[block] = np.zeros(shape=(len(csv_files),length,))
i = 0
for file in tqdm.tqdm(csv_files, desc=f"Feature Selecting {block}"):
data = pd.read_csv(file,header=0)
if use_filter:
comb_filter = utils.createCombFilter(mcu=mcu)
with warnings.catch_warnings():
#apply filter, turn off annoying warning but plz fix me later
warnings.simplefilter(action='ignore', category=FutureWarning)
data = utils.applyFilterSingleSignal(comb_filter,data[col_heading],mcu)
block_data[block][i] = np.asarray(data)
else:
try:
block_data[block][i] = np.asarray(data[col_heading])
except ValueError as e:
print(file)
print(e)
csv_files.remove(file)
i = i-1
i += 1
not_csv_files = glob.glob(f'{path_to_data}split{block}_not/train/*')
# not_csv_files = np.random.choice(not_csv_files,size=9000,replace=False)
#make a dictionary to hold all of the values we will be reading
length = len(pd.read_csv(not_csv_files[7],header=0))
not_block_data[block] = np.zeros(shape=(len(not_csv_files),length,))
i = 0
for file in tqdm.tqdm(not_csv_files, desc=f"Feature Selecting not_{block}"):
try:
data = pd.read_csv(file,header=0)
if use_filter:
comb_filter = utils.createCombFilter(mcu=mcu)
with warnings.catch_warnings():
#apply filter, turn off annoying warning but plz fix me later
warnings.simplefilter(action='ignore', category=FutureWarning)
data = utils.applyFilterSingleSignal(comb_filter,data[col_heading],mcu)
not_block_data[block][i] = np.asarray(data)
else:
try:
not_block_data[block][i] = np.asarray(data[col_heading])
except ValueError as e:
print(file)
print(e)
csv_files.remove(file)
i = i-1
i += 1
except TypeError:
print(f'{file} is bad investigate')
#divide files into four batches
how_many_batches = 4
len_of_batch = len(csv_files)//how_many_batches
#average the arrays accross the batch, to form 4 "signature traces" for each block
temp = []
for batch_num in range(how_many_batches):
temp.append([])
accumulator = 0
for index in range(length):
for j in range(len_of_batch*batch_num, len_of_batch*batch_num + len_of_batch, 1):
accumulator += block_data[block][j][index]
temp[batch_num].append(accumulator/length)
signature_traces[block] = np.asarray(temp)
#divide files into four batches
not_len_of_batch = len(not_csv_files)//how_many_batches
#average the arrays accross the batch, to form 4 "signature traces" for each block
temp = []
for batch_num in range(how_many_batches):
temp.append([])
accumulator = 0
for index in range(length):
for j in range(not_len_of_batch*batch_num, not_len_of_batch*batch_num + not_len_of_batch, 1):
accumulator += not_block_data[block][j][index]
temp[batch_num].append(accumulator/length)
not_signature_traces[block] = np.asarray(temp)
#take the CWT of each of our signature traces
width = 50
widths = np.arange(1, width+1)
cwt_data = {}
for block in basicBlocks:
cwt_data[block] = np.zeros(shape=(how_many_batches, 50, signature_traces[block].shape[1]))
#for each of the signature traces take the cwt and store it
for i in range(signature_traces[block].shape[0]):
cwt_data[block][i] = signal.cwt(signature_traces[block][i], signal.ricker, widths)
#take the CWT of each of our signature traces
not_cwt_data = {}
for block in basicBlocks:
not_cwt_data[block] = np.zeros(shape=(how_many_batches, 50, not_signature_traces[block].shape[1]))
#for each of the signature traces take the cwt and store it
for i in range(not_signature_traces[block].shape[0]):
not_cwt_data[block][i] = signal.cwt(not_signature_traces[block][i], signal.ricker, widths)
#find the peaks in each of the blocks
peaks = {}
for block in basicBlocks:
#find the peaks in each cwt
num_data_pts = signature_traces[block].shape[1]
peaks[block] = np.full((how_many_batches, width, num_data_pts), False)
fp = findpeaks(method='mask', scale=True, denoise='fastnl', window=3, togray=True, imsize=(num_data_pts,50),verbose=0)
for batch in range(how_many_batches):
results = fp.peaks2d(cwt_data[block][0],'mask')
for i in range(len(results['Xdetect'])):
for j in range(len(results['Xdetect'][0])):
if results['Xdetect'][i][j]==True:
peaks[block][batch][i][j] = True
#find the peaks in each of the blocks
not_peaks = {}
for block in basicBlocks:
#find the peaks in each cwt
num_data_pts = not_signature_traces[block].shape[1]
not_peaks[block] = np.full((how_many_batches, width, num_data_pts), False)
fp = findpeaks(method='mask', scale=True, denoise='fastnl', window=3, togray=True, imsize=(num_data_pts,50),verbose=0)
for batch in range(how_many_batches):
results = fp.peaks2d(not_cwt_data[block][0],'mask')
for i in range(len(results['Xdetect'])):
for j in range(len(results['Xdetect'][0])):
if results['Xdetect'][i][j]==True:
not_peaks[block][batch][i][j] = True
#in each of the blocks condense to 1 array containing only the True's that are in all of the signature traces
nonvarying_peaks = {}
for block in basicBlocks:
nonvarying_peaks[block] = np.full((width, signature_traces[block].shape[1]), False)
for i in range(peaks[block][0].shape[0]):
for j in range(peaks[block][0].shape[1]):
is_true = True
for z in range(len(peaks[block])):
if peaks[block][z][i,j] == False:
is_true = False
if is_true:
nonvarying_peaks[block][i, j] = True
#in each of the blocks condense to 1 array containing only the True's that are in all of the signature traces
not_nonvarying_peaks = {}
for block in basicBlocks:
not_nonvarying_peaks[block] = np.full((width, not_signature_traces[block].shape[1]), False)
for i in range(not_peaks[block][0].shape[0]):
for j in range(not_peaks[block][0].shape[1]):
is_true = True
for z in range(len(not_peaks[block])):
if not_peaks[block][z][i,j] == False:
is_true = False
if is_true:
not_nonvarying_peaks[block][i, j] = True
#now find the disjoint set between all of them
features = {}
for block in basicBlocks:
features[block] = np.full((width, signature_traces[block].shape[1]), False)
for i in range(nonvarying_peaks[block].shape[0]):
for j in range(nonvarying_peaks[block].shape[1]):
#if the peak is in the regular block set
if nonvarying_peaks[block][i][j]==True:
#check it is false in the not block set
if not_nonvarying_peaks[block][i][j]==False:
#set it true if it is in the block set and not in the not block set
features[block][i][j]=True
#save features as an x,y of the feature for ease later
features_location = {}
for block in basicBlocks:
features_location[block] = []
for i in range(features[block].shape[0]):
for j in range(features[block].shape[1]):
if features[block][i][j] == True:
features_location[block].append((i,j))
features_location[block] = np.asarray(features_location[block])
print(f'{block} -> {features_location[block].shape[0]} features')
print('Saving Selected Features')
#save the features dict
try:
os.mkdir(f'{path_to_data}block_features')
print(f'Directory Created')
except FileExistsError as e:
#the directory already exists
pass
filehandler = open(f'{path_to_data}block_features/features', 'wb')
pickle.dump(features_location, filehandler)
filehandler.close()
#read in the selected features, start here if feature selection is already done
with open(f"{path_to_data}block_features/features", "rb") as f:
features = pickle.load(f)
print('Features Loaded.')
for block in basicBlocks:
print(f'{block} -> {features[block].shape[0]} features')
#read in the selected features, start here if feature selection is already done
with open(f"{path_to_data}block_features/features", "rb") as f:
features = pickle.load(f)
print('Features Loaded.')
train_raw = {}
for block in tqdm.tqdm(basicBlocks):
#grab all the csv files
csv_files = glob.glob(f'{path_to_data}split{block}/train/*')
#make a dictionary to hold all of the values we will be reading
length = len(pd.read_csv(csv_files[7],header=0))
train_raw[block] = np.zeros(shape=(len(csv_files),length,))
for i, file in enumerate(csv_files):
data = pd.read_csv(file,header=0)
train_raw[block][i] = data[col_heading]
test_raw = {}
for block in tqdm.tqdm(basicBlocks):
#grab all the csv files
csv_files = glob.glob(f'{path_to_data}split{block}/test/*')
#make a dictionary to hold all of the values we will be reading
length = len(pd.read_csv(csv_files[0],header=0))
test_raw[block] = np.zeros(shape=(len(csv_files),length,))
for i, file in enumerate(csv_files):
data = pd.read_csv(file,header=0)
test_raw[block][i] = data[col_heading]
width = 50
widths = np.arange(1, width+1)
train = {}
for block in basicBlocks:
train[block] = np.zeros(shape=(train_raw[block].shape[0],features[block].shape[0]))
for idx in tqdm.tqdm(range(train_raw[block].shape[0]), desc=f"Pulling out train {block} features:"):
cwt_data = signal.cwt(train_raw[block][idx].flatten(), signal.ricker, widths)
for i,feat in enumerate(features[block]):
x = feat[0]
y = feat[1]
train[block][idx][i] = cwt_data[x][y]
test = {}
for block in basicBlocks:
test[block] = np.zeros(shape=(test_raw[block].shape[0],features[block].shape[0]))
for idx in tqdm.tqdm(range(test_raw[block].shape[0]), desc=f"Pulling out test {block} features:"):
cwt_data = signal.cwt(test_raw[block][idx].flatten(), signal.ricker, widths)
for i,feat in enumerate(features[block]):
x = feat[0]
y = feat[1]
test[block][idx][i] = cwt_data[x][y]
#save the features dict
try:
os.mkdir(f'{path_to_data}temp')
print(f'Directory Created')
except FileExistsError as e:
#the directory already exists
pass
print('Saving train for redundancy')
filehandler = open(f'{path_to_data}temp/train', 'wb')
pickle.dump(train, filehandler)
filehandler.close()
print('Saving test for redundancy')
filehandler = open(f'{path_to_data}temp/test', 'wb')
pickle.dump(test, filehandler)
filehandler.close()
print('Done!')
#read in the selected features, start here if feature selection is already done
with open(f"{path_to_data}block_features/features", "rb") as f:
features = pickle.load(f)
print('Features Loaded.')
train_raw_not = {}
for block in tqdm.tqdm(basicBlocks):
#grab all the csv files
csv_files = glob.glob(f'{path_to_data}split{block}_not/train/*')
#make a dictionary to hold all of the values we will be reading
length = len(pd.read_csv(csv_files[0],header=0))
train_raw_not[block] = np.zeros(shape=(len(csv_files),length,))
for i, file in enumerate(csv_files):
data = pd.read_csv(file,header=0)
train_raw_not[block][i] = data[col_heading]
test_raw_not = {}
for block in basicBlocks:
#grab all the csv files
csv_files = glob.glob(f'{path_to_data}split{block}_not/test/*')
#make a dictionary to hold all of the values we will be reading
length = len(pd.read_csv(csv_files[0],header=0))
test_raw_not[block] = np.zeros(shape=(len(csv_files),length,))
for i, file in enumerate(csv_files):
data = pd.read_csv(file,header=0)
test_raw_not[block][i] = data[col_heading]
width = 50
widths = np.arange(1, width+1)
train_not = {}
for block in basicBlocks:
train_not[block] = np.zeros(shape=(train_raw_not[block].shape[0],features[block].shape[0]))
for idx in tqdm.tqdm(range(train_raw_not[block].shape[0]), desc=f"Pulling out train_not {block} features:"):
cwt_data = signal.cwt(train_raw_not[block][idx].flatten(), signal.ricker, widths)
for i,feat in enumerate(features[block]):
x = feat[0]
y = feat[1]
train_not[block][idx][i] = cwt_data[x][y]
test_not = {}
for block in basicBlocks:
test_not[block] = np.zeros(shape=(test_raw_not[block].shape[0],features[block].shape[0]))
for idx in tqdm.tqdm(range(test_raw_not[block].shape[0]), desc=f"Pulling out test_not {block} features:"):
cwt_data = signal.cwt(test_raw_not[block][idx].flatten(), signal.ricker, widths)
for i,feat in enumerate(features[block]):
x = feat[0]
y = feat[1]
test_not[block][idx][i] = cwt_data[x][y]
#save the features dict
try:
os.mkdir(f'{path_to_data}temp')
print(f'Directory Created')
except FileExistsError as e:
#the directory already exists
pass
print('Saving train_not for redundancy')
filehandler = open(f'{path_to_data}temp/train_not', 'wb')
pickle.dump(train_not, filehandler)
filehandler.close()
print('Saving test_not for redundancy')
filehandler = open(f'{path_to_data}temp/test_not', 'wb')
pickle.dump(test_not, filehandler)
filehandler.close()
print('Done!')
print('Loading in train')
with open(f"{path_to_data}temp/train", "rb") as f:
train = pickle.load(f)
print('Loading in train not')
with open(f"{path_to_data}temp/train_not", "rb") as f:
train_not = pickle.load(f)
print('Loading in test')
with open(f"{path_to_data}temp/test", "rb") as f:
test = pickle.load(f)
print('Loading in test not')
with open(f"{path_to_data}temp/test_not", "rb") as f:
test_not = pickle.load(f)
print('Done!')
for block in basicBlocks:
print(f'Saving {block} train for redundancy')
filehandler = open(f'{path_to_data}temp/train_{block}', 'wb')
pickle.dump(train[block], filehandler)
filehandler.close()
print('Done!')
for block in basicBlocks:
print(f'Saving {block} train_not for redundancy')
filehandler = open(f'{path_to_data}temp/train_not_{block}', 'wb')
pickle.dump(train_not[block], filehandler)
filehandler.close()
print('Done!')
for block in basicBlocks:
print(f'Saving {block} test for redundancy')
filehandler = open(f'{path_to_data}temp/test_{block}', 'wb')
pickle.dump(test[block], filehandler)
filehandler.close()
print('Done!')
for block in basicBlocks:
print(f'Saving {block} test_not for redundancy')
filehandler = open(f'{path_to_data}temp/test_not_{block}', 'wb')
pickle.dump(test_not[block], filehandler)
filehandler.close()
print('Done!')
#save train, test, train_not, test_not for quicker access in the future, after combining into one dataset
print('Saving Train, Test, Train_Not, Test_Not')
do_balance = False
### First combine all of the data into one dataset and then shuffle it up
train_full = {}
test_full = {}
train_labels = {}
test_labels = {}
for block in tqdm.tqdm(basicBlocks, desc='Combining Data into one Dataset'):
#build a training set
if do_balance:
how_many_samples = min(train[block].shape[0],train_not[block].shape[0])
# rand_int = random.randint(0, train[block].shape[0]-how_many_of_each_other_block_train)
num_rows_train = train[block].shape[0]
random_row_indices_train = np.random.choice(num_rows_train, size=how_many_samples, replace=False)
num_rows_train_not = train_not[block].shape[0]
random_row_indices_train_not = np.random.choice(num_rows_train_not, size=how_many_samples, replace=False)
train_full[block] = np.append(train[block][random_row_indices_train], train_not[block][random_row_indices_train_not], axis=0)
else:
#load in train
print('Loading in train')
with open(f"{path_to_data}temp/train_{block}", "rb") as f:
train = pickle.load(f)
print('Loading in train not')
with open(f"{path_to_data}temp/train_not_{block}", "rb") as f:
train_not = pickle.load(f)
#build train set
train_full[block] = np.append(train, train_not, axis=0)
train_shape = train.shape[0]
train_not_shape = train_not.shape[0]
#get rid of train
train = {}
train_not = {}
#load in test
print('Loading in test')
with open(f"{path_to_data}temp/test_{block}", "rb") as f:
test = pickle.load(f)
print('Loading in test not')
with open(f"{path_to_data}temp/test_not_{block}", "rb") as f:
test_not = pickle.load(f)
#build test set
test_full[block] = np.append(test, test_not, axis=0)
test_shape = test.shape[0]
test_not_shape = test_not.shape[0]
#empty test
test = {}
test_not = {}
#1 means valid run, 0 means not valid run
if do_balance:
temp_1 = np.full(how_many_samples, 1)
temp_0 = np.full(how_many_samples, 0)
else:
temp_1 = np.full(train_shape, 1)
temp_0 = np.full(train_not_shape, 0)
train_labels[block] = np.concatenate((temp_1,temp_0))
temp_1 = np.full(test_shape, 1)
temp_0 = np.full(test_not_shape, 0)
test_labels[block] = np.concatenate((temp_1,temp_0))
#shuffle
train_full[block], train_labels[block] = shuffle(train_full[block], train_labels[block], random_state=0)
test_full[block], test_labels[block] = shuffle(test_full[block], test_labels[block], random_state=0)
#save the features dict
try:
os.mkdir(f'{path_to_data}processed_data')
print(f'Directory Created')
except FileExistsError as e:
#the directory already exists
pass
print('Saving Dataset')
filehandler = open(f'{path_to_data}processed_data/train_X', 'wb')
pickle.dump(train_full, filehandler)
filehandler.close()
print('Saved Train_X')
filehandler = open(f'{path_to_data}processed_data/train_Y', 'wb')
pickle.dump(train_labels, filehandler)
filehandler.close()
print('Saved Train_Y')
filehandler = open(f'{path_to_data}processed_data/test_X', 'wb')
pickle.dump(test_full, filehandler)
filehandler.close()
print('Saved Test_X')
filehandler = open(f'{path_to_data}processed_data/test_Y', 'wb')
pickle.dump(test_labels, filehandler)
print('Saved Test_Y')
filehandler.close()
print('Dataset Saved')
#load in the dataset if needed
print('Loading in Dataset')
with open(f"{path_to_data}processed_data/train_X", "rb") as f:
train_full = pickle.load(f)
with open(f"{path_to_data}processed_data/train_Y", "rb") as f:
train_labels = pickle.load(f)
with open(f"{path_to_data}processed_data/test_X", "rb") as f:
test_full = pickle.load(f)
with open(f"{path_to_data}processed_data/test_Y", "rb") as f:
test_labels = pickle.load(f)
print('Dataset Loaded')
n_components_pico_syringe = {'0x384':15,'0x388':15,'0x38e':25,'0x400':25,'0x392':25,'0x394':35,'0x3a4':25,'0xnops':35}
n_components_arduino_syringe = {'0x544':45 , '0x54c':65, '0x1be':55 , '0x1e4':55 , '0x1ec':55 , '0x216':45 , '0x20a':55, '0x556':35 , '0x557':55 , '0x206':35 , '0x568':55, '0xnops':65}
n_components_arduino_coffee = {'0x836':35,'0x86c':35,'0x88c':35,'0x8ba':45,'0x7c8':45,'0x7d0':45, '0xnops':35}
n_components_arduino_distance = {'0x6e4':35,'0x6ea':35, '0x6eb':35, '0x6f5':25, "0x31e":25, '0xnops':45}
n_components_arduino_soldering = {'0x4f0':35, '0x534':45, "0x538":65, '0x556':55, '0x6ef6':45, '0x68f2':75, '0x695c':55, '0x68f6':55, '0x6f00':45, '0x58e':35, '0x59e':35, '0x5bf':45, '0x5ea':45, '0x5e6':45, '0x5ee':35, '0xnops':55}
n_components_arduino_servo = {'0x792':35,'0x16f0':35,'0x14a':35,'0x15e':45,'0xnops':35}
n_components = n_comps = n_components_arduino_soldering
from sklearn.decomposition import PCA
from sklearn.discriminant_analysis import LinearDiscriminantAnalysis
from sklearn.neural_network import MLPClassifier
from sklearn.svm import SVC
import random
import time
clfs = {}
drs = {}
timing = []
for block in basicBlocks:
#make classifier and PCA
clf = MLPClassifier(activation='relu',hidden_layer_sizes=(150,50),warm_start=True,solver='adam',early_stopping=True)
# dr = PCA(n_components=n_comps[block])
dr = PCA(n_components=30)
if do_feature_selection == False:
width = 50
widths = np.arange(1, width+1)
cwt_train = np.zeros((train_full[block].shape[0],50*train_full[block].shape[1]))
cwt_test = np.zeros((test_full[block].shape[0],50*test_full[block].shape[1]))
for i,item in tqdm.tqdm(enumerate(train_full[block])):
cwt_train[i] = signal.cwt(item, signal.ricker, widths).flatten()
for i,item in tqdm.tqdm(enumerate(test_full[block])):
cwt_test[i] = signal.cwt(item, signal.ricker, widths).flatten()
train_full[block] = cwt_train
test_full[block] = cwt_test
#fit the dimesionality reducer and transform the data
dr.fit(train_full[block],train_labels[block])
start = time.time()
train_X = dr.transform(train_full[block])
test_X = dr.transform(test_full[block])
#train the classifier
clf.fit(train_X, train_labels[block])
end = time.time()
timing.append(end-start)
#place the clf and dimensionality reducer in a dict to save after
clfs[block] = clf
drs[block] = dr
train_preds = clf.predict(train_X)
preds = clf.predict(test_X)
train_count = 0
for i,pred in enumerate(train_preds):
if pred == train_labels[block][i]:
train_count += 1
print(f'{block} train accuracy: {round(train_count/train_labels[block].shape[0]*100, 2)}%')
test_count = 0
for i,pred in enumerate(preds):
if pred == test_labels[block][i]:
test_count += 1
print(f'{block} test accuracy: {round(test_count/test_labels[block].shape[0]*100, 2)}%')
print('\n')
train_full[block] = np.array([])
test_full[block] = np.array([])
##if you want to save the dr or the clf run this
#save the classifiers and dimensionality reducers
try:
os.mkdir(f'{path_to_data}clfs_drs/')
print(f'Directory Created\n{path_to_data}clfs_drs/')
except FileExistsError as e:
#the directory already exists
print(f'Saving to\n{path_to_data}clfs_drs/')
filehandler = open(f'{path_to_data}clfs_drs/clfs.pkl', 'wb')
pickle.dump(clfs, filehandler)
filehandler.close()
filehandler = open(f'{path_to_data}clfs_drs/drs.pkl', 'wb')
pickle.dump(drs, filehandler)
filehandler.close()