import argparse import sys, time import numpy as np import matplotlib matplotlib.use('QT5Agg') # Import local packages sys.path.insert(0, '../Lab1/Packages&Wheels') from mite.inputs.MyoArmband import MyoArmband import itertools import scipy.io as sio import numpy as np from sklearn.discriminant_analysis import LinearDiscriminantAnalysis as LDA from sklearn.metrics import confusion_matrix as sk_confusion_matrix from sklearn.metrics import ConfusionMatrixDisplay from sklearn.model_selection import train_test_split from scipy.signal import butter, lfilter import matplotlib as mpl mpl.use('TkAgg') import matplotlib.pyplot as plt from mpl_toolkits import axes_grid1 import scipy.io import statistics # filename = 'Lab3_EMG.mat' # data = scipy.io.loadmat(filename) # dictionary of everything in mat # data['train'] fs = 200.0 lowcut = 20.0 highcut = 90.0 def butter_bandpass(lowcut, highcut, fs, order=5): nyq = 0.5 * fs low = lowcut / nyq high = highcut / nyq b, a = butter(order, [low, high], btype='band') return b, a def bandpass_filter(data, f_low, f_high, fs): y = None # Impliment a bandpass filter b, a = butter_bandpass(f_low, f_high, fs, order=5) y = lfilter(b, a, data) return y def calc_mav(x): mav = [] results = [] # freq4 = bandpass_filter(x, lowcut, highcut, fs) abs_v = np.mean(abs(x)) mav.append(abs_v) return mav def calc_var(x): # variance # var = None # Number of observations n = len(x) # Mean of the data mean = sum(x) / n # Square deviations deviations = [(y - mean) ** 2 for y in x] # Variance var = sum(deviations) / n return var def calc_wl(x): # # wl = None n = len(x) wl = 0 for k in range(1, n - 1): wl = wl + (abs(x[k] - x[k - 1])) ### return wl def calc_zc(x): n = len(x) # Square deviations zc = 0 for k in range(1, n - 1): if ((x[k] > 0).all() and (x[k + 1] < 0).all() or (x[k] < 0).all()) and (x[k + 1] > 0).all() and (abs( x[k] - x[k + 1]) >= 0.01).all(): zc = zc + 1 ### return zc def calc_ssc(x): ssc = None n = len(x) # Square deviations ssc = 0 for k in range(1, n - 1): if ((x[k] > x[k - 1]).all() or (x[k] > x[k + 1]).all() or (x[k] < x[k - 1]).all()) and ( (x[k] < x[k + 1]).all()) and \ ((abs(x[k] - x[k + 1]) >= 0.01).all()) or ((abs(x[k] - x[k - 1]) >= 0.01).all()): ssc = ssc + 1 ### return ssc def extract_features(window_data): # Top leve; function for calculating TD5 features. # Input-- #samples x 8 matrix # Output-- 1x40 feature vector feature_data = [] ### for i in range(0, 8): data = window_data[:, i] feature_data.append(calc_mav(data)) feature_data.append(calc_wl(data)) feature_data.append(calc_var(data)) feature_data.append(calc_ssc(data)) feature_data.append(calc_zc(data)) return np.hstack(feature_data) # return feature_data def LinearDiscriminantAnalysis(X, y): # Return trained LDA model model = None model = LDA() model.fit(X, y) return model def confusion_matrix(ytest, yhat, labels=[], cmap='viridis', ax=None, show=True): """ Computes (and displays) a confusion matrix given true and predicted classification labels Parameters ---------- ytest : numpy.ndarray (n_samples,) The true labels yhat : numpy.ndarray (n_samples,) The predicted label labels : iterable The class labels show : bool A flag determining whether we should plot the confusion matrix (True) or not (False) Returns ------- numpy.ndarray The confusion matrix numerical values [n_classes x n_classes] """ def add_colorbar(im, aspect=20, pad_fraction=0.5, **kwargs): """Add a vertical color bar to an image plot.""" divider = axes_grid1.make_axes_locatable(im.axes) width = axes_grid1.axes_size.AxesY(im.axes, aspect=1. / aspect) pad = axes_grid1.axes_size.Fraction(pad_fraction, width) current_ax = plt.gca() cax = divider.append_axes("right", size=width, pad=pad) plt.sca(current_ax) return im.axes.figure.colorbar(im, cax=cax, **kwargs) cm = sk_confusion_matrix(ytest, yhat) cm = cm.astype('float') / cm.sum(axis=1)[:, np.newaxis] if ax is None: fig = plt.figure() ax = fig.add_subplot(111) try: plt.set_cmap(cmap) except ValueError: cmap = 'viridis' im = ax.imshow(cm, interpolation='nearest', vmin=0.0, vmax=1.0, cmap=cmap) add_colorbar(im) if len(labels): tick_marks = np.arange(len(labels)) plt.xticks(tick_marks, labels, rotation=45) plt.yticks(tick_marks, labels) thresh = 0.5 # cm.max() / 2. colors = mpl.cm.get_cmap(cmap) for i, j in itertools.product(range(cm.shape[0]), range(cm.shape[1])): r, g, b, _ = colors(cm[i, j]) br = np.sqrt(r * r * 0.241 + g * g * 0.691 + b * b * 0.068) plt.text(j, i, format(cm[i, j], '.2f'), horizontalalignment="center", verticalalignment='center', color="black" if br > thresh else "white") plt.ylabel('Actual') plt.xlabel('Predicted') ax.set_ylim(cm.shape[0] - 0.5, -0.5) plt.tight_layout() if show: plt.show(block=True) return cm, ax ########################## ## Paramters ########################## CLASSES = ['rest', 'power', 'open', 'pronate', 'supinate'] NUM_CLASSES = 5 EMG_WINDOW_SIZE = 100 EMG_WINDOW_STEP = 5 F_LOW = 20.0 F_HIGH = 90.0 FS = 200.0 ################################## ##Processing function ################################## def LDA_Train(): # import training data training_file = 'train.mat' training_data = sio.loadmat(training_file)['train'][0] # Compute training features and labels print("Extracting features...") num_trials = len(training_data) X = [] Y = [] for i in range(NUM_CLASSES): class_data = [] for j in range(num_trials): raw_data = training_data[j][CLASSES[i]][0][0] ## Filter Data filter_data = bandpass_filter(raw_data, F_LOW, F_HIGH, FS) num_samples = filter_data.shape[0] idx = 0 while (idx + EMG_WINDOW_SIZE < num_samples): # Grab Window Data window = filter_data[idx:idx + EMG_WINDOW_SIZE, :] # Extract Features features = extract_features(window) class_data.append(np.hstack(features)) idx = idx + EMG_WINDOW_STEP X.append(np.vstack(class_data)) Y.append(i * np.ones((X[-1].shape[0],))) X = np.vstack(X) Y = np.hstack(Y) print("Done") ## Split data into train / test sets print('Computing train/test split...', end='', flush=True) Xtrain, Xtest, ytrain, ytest = train_test_split(X, Y, test_size=0.33) print('Done!') ## Train Classifier mdl = LinearDiscriminantAnalysis(Xtrain, ytrain) return mdl DEVICE = ['myo', 'sense'][0] if __name__ == '__main__': # helper function to clean commandline input print("Training model...") mdl = LDA_Train() print("model trained") num_electrodes = 8 print("Initializing Myo EMG...", flush=True) myo = MyoArmband(mac='eb:33:40:96:ce:a5') print('Done!') print('Starting data collection...', end='\n', flush=True) myo.run() MAX_TIME = 60 emg_data = [] t0 = time.perf_counter() count = 0 while (time.perf_counter() - t0) < MAX_TIME: data = myo.state if data is not None: emg_data.append(data[:num_electrodes]) count = count + 1 if count == EMG_WINDOW_SIZE: stack_data = np.vstack(emg_data) window_filter = bandpass_filter(stack_data, F_LOW, F_HIGH, FS) features = extract_features(window_filter) features = np.transpose(features.reshape(-1, 1)) yhat = mdl.predict(features) # print(yhat) print(CLASSES[int(yhat)]) count = 0 emg_data = [] # train[trial][cue] = np.vstack(emg_data) # print(train[trial][cue].shape) myo.stop() myo.close() # print('Saving data...', end='', flush=True) # savemat('train.mat', mdict={'train': train}) print('Done!')