emflow-artifact / artifact / evaluation.ipynb
evaluation.ipynb
Raw
import kagglehub
from kagglehub import KaggleDatasetAdapter
import kaggle

dataset_slug = "emflow2026/em-data-coffee"

# Download as a zip file
kaggle.api.dataset_download_files(dataset_slug, path="data/", unzip=True, quiet=False)

import numpy as np
import os
import math
import pandas as pd
import matplotlib.pyplot as plt
import scipy.signal as signal
from sklearn.utils import shuffle
from findpeaks import findpeaks
import glob
import tqdm
import pickle
import random
import time

import utils

temp = "./data/COFFEE/"

#path from where to find the dataset (training and testing)
path_to_data = temp
#path from where to find the trained classifiers
path_to_clfs = temp
#path from where to find the features
path_to_features = temp

from control_flows import Control_Flow
control_flow = Control_Flow.Coffee()
basicBlocks = control_flow.get_basicBlocks()

#the data heading after the / before the number
path_to_benign = temp
path_to_attacks = ["./data/COFFEE-ADVERSERIAL/3_clock_insert/", "./data/COFFEE-ADVERSERIAL/5_clock_insert/"]
mcu = 'arduino'
clock_freq = 16e6 #16 for arduino, 32e6 for pico
# clock_freq = 32e6

b_score_threshold = 0.784616

#load in the dataset

print('Loading in Dataset')
# with open(f"{path_to_data}processed_data/train_X", "rb") as f:
#     train_full = pickle.load(f)
# with open(f"{path_to_data}processed_data/train_Y", "rb") as f:
#     train_labels = pickle.load(f)
with open(f"{path_to_data}processed_data/test_X", "rb") as f:
    test_full = pickle.load(f)
with open(f"{path_to_data}processed_data/test_Y", "rb") as f:
    test_labels = pickle.load(f)
print('Dataset Loaded')

#load in the features
with open(f"{path_to_features}block_features/features", "rb") as f:
    features = pickle.load(f)
print('Features Loaded')
# read in and models here
with open(f"{path_to_clfs}clfs_drs/clfs.pkl", "rb") as f:
    clfs = pickle.load(f)
with open(f"{path_to_clfs}clfs_drs/drs.pkl", "rb") as f:
    drs = pickle.load(f)

timings = []
accuracies = []
block_accuracies = {}
for block in basicBlocks:
    clf = clfs[block]
    dr = drs[block]

    start = time.time()
    test_X = dr.transform(test_full[block])

    preds = clf.predict(test_X)
    end = time.time()
    timings.append((end-start)/test_labels[block].shape[0])

    count = 0
    for i,pred in enumerate(preds):
        if pred == test_labels[block][i]:
            count += 1
    print(f'{block} test accuracy: {round(count/test_labels[block].shape[0]*100, 2)}%')
    block_accuracies[block] = count/test_labels[block].shape[0]
    accuracies.append(count/test_labels[block].shape[0]*100)
print(f'Average Single Run Inference time {np.mean(timings)}')
print(f'Avg Acc : {np.mean(accuracies)}')
def get_and_process_raw_data(file, block):
    '''This takes a string which points to a file to get and preprocess the data as the passed block'''
    width = 50
    widths = np.arange(1, width+1)

    #make a arrays to hold all of the values we will be reading
    test = np.zeros(shape=(features[block].shape[0]))

    #read data
    data = pd.read_csv(file,header=0)
    test_raw = np.asarray(data['CH2'])
    
    cwt_data = signal.cwt(test_raw.flatten(), signal.ricker, widths)
    for i,feat in enumerate(features[block]):
        x = feat[0]
        y = feat[1]
        test[i] = cwt_data[x][y]
    return test.reshape(1, -1)

def predict(block, data):
    '''This runs the prediction and returns if 1 raises an error if 0. Returns the prediction value multiplied by the test accuracy'''
    
    pred = int(clfs[block].predict(drs[block].transform(data))[0])
    return (pred * block_accuracies[block])
def run_evaluation(path, desc=None):
    '''Runs evaluation. Returns the TPR and FNR.'''
    total_runs = 300
    runs = glob.glob(f'{path}split{list(basicBlocks.keys())[0]}/validation/*.csv')
    runs = sorted([i.split('/')[-1][0:-6] for i in runs])

    incorrect_cnt = 0
    correct_cnt = 0

    acc = 0
    for run in tqdm.tqdm(runs[0:total_runs],desc=desc):
        try:
            run_history = 0

            block_num = 0
            for block in control_flow:
                if block[0:2] == '0x':
                    try:
                        data = get_and_process_raw_data(file=f'{path}split{block}/validation/{run}_{block_num}.csv', block=block)
                        run_history += predict(block, data)
                        block_num += 1
                    except:
                        print(f'{path}split{block}/validation/{run}_{block_num}.csv')

            b_score = run_history/(block_num)
            if b_score > b_score_threshold:
                correct_cnt += 1
            else:
                incorrect_cnt += 1
            

        except KeyboardInterrupt:
            break

    incorrect_pct = incorrect_cnt / (incorrect_cnt + correct_cnt)
    correct_pct = correct_cnt / (incorrect_cnt + correct_cnt)
    
    return (incorrect_pct, correct_pct)
    print(f'Incorrect Count: {incorrect_cnt}')
    print(f'Correct Count: {correct_cnt}')
        
FPR,TNR = run_evaluation(path_to_benign,'Benign')
print(f'TNR: {TNR*100:.2f}%\nFPR: {FPR*100:.2f}%')
for attack in path_to_attacks:
    TPR,FNR = run_evaluation(attack, attack.split(os.sep)[-2])
    print(f'TPR: {TPR*100:.2f}%\nFNR: {FNR*100:.2f}%\n')