EMG-Pattern-Recognition / Real_Time_EMG_Clasification.py
Real_Time_EMG_Clasification.py
Raw
import argparse

import sys, time
import numpy as np

import matplotlib

matplotlib.use('QT5Agg')

# Import local packages
sys.path.insert(0, '../Lab1/Packages&Wheels')
from mite.inputs.MyoArmband import MyoArmband

import itertools

import scipy.io as sio
import numpy as np

from sklearn.discriminant_analysis import LinearDiscriminantAnalysis as LDA
from sklearn.metrics import confusion_matrix as sk_confusion_matrix
from sklearn.metrics import ConfusionMatrixDisplay
from sklearn.model_selection import train_test_split

from scipy.signal import butter, lfilter

import matplotlib as mpl

mpl.use('TkAgg')
import matplotlib.pyplot as plt
from mpl_toolkits import axes_grid1
import scipy.io
import statistics

# filename = 'Lab3_EMG.mat'
# data = scipy.io.loadmat(filename)  # dictionary of everything in mat
# data['train']
fs = 200.0
lowcut = 20.0
highcut = 90.0


def butter_bandpass(lowcut, highcut, fs, order=5):
    nyq = 0.5 * fs
    low = lowcut / nyq
    high = highcut / nyq
    b, a = butter(order, [low, high], btype='band')
    return b, a


def bandpass_filter(data, f_low, f_high, fs):
    y = None
    # Impliment a bandpass filter
    b, a = butter_bandpass(f_low, f_high, fs, order=5)
    y = lfilter(b, a, data)
    return y


def calc_mav(x):
    mav = []
    results = []
    # freq4 = bandpass_filter(x, lowcut, highcut, fs)
    abs_v = np.mean(abs(x))
    mav.append(abs_v)
    return mav


def calc_var(x):  # variance
    # var = None
    # Number of observations
    n = len(x)
    # Mean of the data
    mean = sum(x) / n
    # Square deviations
    deviations = [(y - mean) ** 2 for y in x]
    # Variance
    var = sum(deviations) / n
    return var


def calc_wl(x):  #
    # wl = None
    n = len(x)
    wl = 0
    for k in range(1, n - 1):
        wl = wl + (abs(x[k] - x[k - 1]))
    ###
    return wl


def calc_zc(x):
    n = len(x)
    # Square deviations
    zc = 0
    for k in range(1, n - 1):
        if ((x[k] > 0).all() and (x[k + 1] < 0).all() or (x[k] < 0).all()) and (x[k + 1] > 0).all() and (abs(
                x[k] - x[k + 1]) >= 0.01).all():
            zc = zc + 1
            ###
    return zc


def calc_ssc(x):
    ssc = None
    n = len(x)
    # Square deviations
    ssc = 0
    for k in range(1, n - 1):
        if ((x[k] > x[k - 1]).all() or (x[k] > x[k + 1]).all() or (x[k] < x[k - 1]).all()) and (
                (x[k] < x[k + 1]).all()) and \
                ((abs(x[k] - x[k + 1]) >= 0.01).all()) or ((abs(x[k] - x[k - 1]) >= 0.01).all()):
            ssc = ssc + 1
    ###
    return ssc


def extract_features(window_data):
    # Top leve; function for calculating TD5 features.
    # Input-- #samples x 8 matrix
    # Output-- 1x40 feature vector
    feature_data = []

    ### 
    for i in range(0, 8):
        data = window_data[:, i]
        feature_data.append(calc_mav(data))
        feature_data.append(calc_wl(data))
        feature_data.append(calc_var(data))
        feature_data.append(calc_ssc(data))
        feature_data.append(calc_zc(data))
    return np.hstack(feature_data)
    # return feature_data


def LinearDiscriminantAnalysis(X, y):
    # Return trained LDA model
    model = None
    model = LDA()
    model.fit(X, y)

    return model


def confusion_matrix(ytest, yhat, labels=[], cmap='viridis', ax=None, show=True):
    """
	Computes (and displays) a confusion matrix given true and predicted classification labels

	Parameters
	----------
	ytest : numpy.ndarray (n_samples,)
			The true labels
	yhat : numpy.ndarray (n_samples,)
			The predicted label
	labels : iterable
			The class labels
	show : bool
			A flag determining whether we should plot the confusion matrix (True) or not (False)
	Returns
	-------
	numpy.ndarray
			The confusion matrix numerical values [n_classes x n_classes]

	"""

    def add_colorbar(im, aspect=20, pad_fraction=0.5, **kwargs):
        """Add a vertical color bar to an image plot."""
        divider = axes_grid1.make_axes_locatable(im.axes)
        width = axes_grid1.axes_size.AxesY(im.axes, aspect=1. / aspect)
        pad = axes_grid1.axes_size.Fraction(pad_fraction, width)
        current_ax = plt.gca()
        cax = divider.append_axes("right", size=width, pad=pad)
        plt.sca(current_ax)
        return im.axes.figure.colorbar(im, cax=cax, **kwargs)

    cm = sk_confusion_matrix(ytest, yhat)
    cm = cm.astype('float') / cm.sum(axis=1)[:, np.newaxis]

    if ax is None:
        fig = plt.figure()
        ax = fig.add_subplot(111)

    try:
        plt.set_cmap(cmap)
    except ValueError:
        cmap = 'viridis'

    im = ax.imshow(cm, interpolation='nearest', vmin=0.0, vmax=1.0, cmap=cmap)
    add_colorbar(im)

    if len(labels):
        tick_marks = np.arange(len(labels))
        plt.xticks(tick_marks, labels, rotation=45)
        plt.yticks(tick_marks, labels)

    thresh = 0.5  # cm.max() / 2.
    colors = mpl.cm.get_cmap(cmap)
    for i, j in itertools.product(range(cm.shape[0]), range(cm.shape[1])):
        r, g, b, _ = colors(cm[i, j])
        br = np.sqrt(r * r * 0.241 + g * g * 0.691 + b * b * 0.068)
        plt.text(j, i, format(cm[i, j], '.2f'),
                 horizontalalignment="center",
                 verticalalignment='center',
                 color="black" if br > thresh else "white")

    plt.ylabel('Actual')
    plt.xlabel('Predicted')

    ax.set_ylim(cm.shape[0] - 0.5, -0.5)
    plt.tight_layout()
    if show: plt.show(block=True)

    return cm, ax


##########################
## Paramters
##########################

CLASSES = ['rest', 'power', 'open', 'pronate', 'supinate']
NUM_CLASSES = 5
EMG_WINDOW_SIZE = 100
EMG_WINDOW_STEP = 5
F_LOW = 20.0
F_HIGH = 90.0
FS = 200.0


##################################
##Processing function
##################################
def LDA_Train():
    # import training data
    training_file = 'train.mat'
    training_data = sio.loadmat(training_file)['train'][0]

    # Compute training features and labels
    print("Extracting features...")
    num_trials = len(training_data)
    X = []
    Y = []

    for i in range(NUM_CLASSES):
        class_data = []
        for j in range(num_trials):
            raw_data = training_data[j][CLASSES[i]][0][0]

            ## Filter Data
            filter_data = bandpass_filter(raw_data, F_LOW, F_HIGH, FS)

            num_samples = filter_data.shape[0]
            idx = 0
            while (idx + EMG_WINDOW_SIZE < num_samples):
                # Grab Window Data

                window = filter_data[idx:idx + EMG_WINDOW_SIZE, :]

                # Extract Features
                features = extract_features(window)

                class_data.append(np.hstack(features))

                idx = idx + EMG_WINDOW_STEP
        X.append(np.vstack(class_data))
        Y.append(i * np.ones((X[-1].shape[0],)))

    X = np.vstack(X)
    Y = np.hstack(Y)
    print("Done")

    ## Split data into train / test sets
    print('Computing train/test split...', end='', flush=True)
    Xtrain, Xtest, ytrain, ytest = train_test_split(X, Y, test_size=0.33)
    print('Done!')

    ## Train Classifier

    mdl = LinearDiscriminantAnalysis(Xtrain, ytrain)
    return mdl


DEVICE = ['myo', 'sense'][0]

if __name__ == '__main__':
    # helper function to clean commandline input

    print("Training model...")
    mdl = LDA_Train()
    print("model trained")

    num_electrodes = 8

    print("Initializing Myo EMG...", flush=True)
    myo = MyoArmband(mac='eb:33:40:96:ce:a5')
    print('Done!')

    print('Starting data collection...', end='\n', flush=True)
    myo.run()

    MAX_TIME = 60
    emg_data = []
    t0 = time.perf_counter()
    count = 0
    while (time.perf_counter() - t0) < MAX_TIME:
        data = myo.state
        if data is not None:
            emg_data.append(data[:num_electrodes])
            count = count + 1
            if count == EMG_WINDOW_SIZE:
                stack_data = np.vstack(emg_data)
                window_filter = bandpass_filter(stack_data, F_LOW, F_HIGH, FS)
                features = extract_features(window_filter)
                features = np.transpose(features.reshape(-1, 1))
                yhat = mdl.predict(features)

                # print(yhat)
                print(CLASSES[int(yhat)])
                count = 0
                emg_data = []

    # train[trial][cue] = np.vstack(emg_data)
    # print(train[trial][cue].shape)

    myo.stop()
    myo.close()

    # print('Saving data...', end='', flush=True)
    # savemat('train.mat', mdict={'train': train})
    print('Done!')