emflow-artifact / artifact / evaluation.py
evaluation.py
Raw
import argparse
from pathlib import Path
import numpy as np
import os
import pandas as pd
import scipy.signal as signal
import glob
import tqdm
import pickle
import time
import warnings

from control_flows import Control_Flow

def parse_args() -> argparse.Namespace:
    """Parse and return command line arguments."""
    parser = argparse.ArgumentParser(
        description="Runs evaluation on a given path."
    )

    # benign file path
    parser.add_argument(
        "--benign-path", "-p",
        type=Path,
        required=True,
        help="Path to where the benign data is stored.",
    )

    # list of file paths (at least one)
    parser.add_argument(
        "--attacks-path", "-a",
        nargs="+",
        type=Path,
        required=True,
        help="One or more paths to Attack Files.",
    )

    # bscore
    parser.add_argument(
        "-b", "--bscore",
        type=float,
        required=True,
        help="A floating‑point number.",
    )

    # which program
    parser.add_argument(
        "-c", "--program",
        type=str,
        choices=["checksum", "syringe-arduino", "syringe-pico", "servo-arduino", "servo-pico", "soldering", "home-iot", "distance"],
        required=True,
        help="Which program to evaluate",
    )

    return parser.parse_args()

def get_block_accuracies(basicBlocks, clfs, drs, test_full, test_labels):
    ''' Returns the Test accuracy of the basic blocks for each classifier.'''
    timings = []
    accuracies = []
    block_accuracies = {}
    for block in basicBlocks:
        clf = clfs[block]
        dr = drs[block]

        start = time.time()
        test_X = dr.transform(test_full[block])

        preds = clf.predict(test_X)
        end = time.time()
        timings.append((end-start)/test_labels[block].shape[0])

        count = 0
        for i,pred in enumerate(preds):
            if pred == test_labels[block][i]:
                count += 1
        block_accuracies[block] = count/test_labels[block].shape[0]
        accuracies.append(count/test_labels[block].shape[0]*100)
    return block_accuracies

def get_and_process_raw_data(file, block, features):
    '''This takes a string which points to a file to get and preprocess the data as the passed block'''
    width = 50
    widths = np.arange(1, width+1)

    #make a arrays to hold all of the values we will be reading
    test = np.zeros(shape=(features[block].shape[0]))

    #read data
    data = pd.read_csv(file,header=0)
    test_raw = np.asarray(data['CH2'])
    
    cwt_data = signal.cwt(test_raw.flatten(), signal.ricker, widths)
    for i,feat in enumerate(features[block]):
        x = feat[0]
        y = feat[1]
        test[i] = cwt_data[x][y]
    return test.reshape(1, -1)

def predict(block, data, clfs, drs, block_accuracies):
    '''This runs the prediction and returns if 1 raises an error if 0. Returns the prediction value multiplied by the test accuracy'''
    
    pred = int(clfs[block].predict(drs[block].transform(data))[0])
    return (pred * block_accuracies[block])

def run_evaluation(path, basicBlocks, control_flow, b_score_threshold, clfs, drs, features, block_accuracies, desc=None):
    '''Runs evaluation. Returns the TPR and FNR.'''
    total_runs = 300
    runs = glob.glob(f'{path}split{list(basicBlocks.keys())[0]}/validation/*.csv')
    runs = sorted([i.split('/')[-1][0:-6] for i in runs])

    incorrect_cnt = 0
    correct_cnt = 0

    acc = 0
    for run in tqdm.tqdm(runs[0:total_runs],desc=desc):
        try:
            run_history = 0

            block_num = 0
            for block in control_flow:
                if block[0:2] == '0x':
                    try:
                        data = get_and_process_raw_data(file=f'{path}split{block}/validation/{run}_{block_num}.csv', block=block, features=features)
                        run_history += predict(block, data, clfs, drs, block_accuracies)
                        block_num += 1
                    except:
                        print(f'{path}split{block}/validation/{run}_{block_num}.csv')

            b_score = run_history/(block_num)
            if b_score > b_score_threshold:
                correct_cnt += 1
            else:
                incorrect_cnt += 1
            

        except KeyboardInterrupt:
            break

    incorrect_pct = incorrect_cnt / (incorrect_cnt + correct_cnt)
    correct_pct = correct_cnt / (incorrect_cnt + correct_cnt)
    
    return (incorrect_pct, correct_pct)
        

def main():
    # Ignore all DeprecationWarnings
    warnings.filterwarnings("ignore", category=DeprecationWarning)

    args = parse_args()

    path_to_benign = os.path.join(args.benign_path, '')
    path_to_attacks = [os.path.join(attack, '') for attack in args.attacks_path]
    b_score_threshold = float(args.bscore)

    if not os.path.isdir(path_to_benign):
        raise FileNotFoundError(path_to_benign)
    for attack in path_to_attacks:
        if not os.path.isdir(attack):
            raise FileNotFoundError(attack)


    ###### Get the required control flow and blocks #####
    match args.program:
        case "checksum":
            control_flow = Control_Flow.CSUMEX()
        case "syringe-arduino":
            control_flow = Control_Flow.Syringe()
        case "syringe-pico":
            control_flow = Control_Flow.Syringe_Pico()
        case "servo-arduino":
            control_flow = Control_Flow.Servo()
        case "servo-pico":
            control_flow = Control_Flow.Servo_Pico()
        case "soldering":
            control_flow = Control_Flow.Soldering()
        case "home-iot":
            control_flow = Control_Flow.Coffee()
        case "distance":
            control_flow = Control_Flow.Distance_Sensor()
        case _:
            raise ValueError(f'{args.program} is not a recognized program. Pick from ["checksum", "syringe-arduino", "syringe-pico", "servo-arduino", "servo-pico", "soldering", "home-iot", "distance"],')
    basicBlocks = control_flow.get_basicBlocks()

    ######### read in and models here ###########

    with open(f"{path_to_benign}clfs_drs/clfs.pkl", "rb") as f:
        clfs = pickle.load(f)
    with open(f"{path_to_benign}clfs_drs/drs.pkl", "rb") as f:
        drs = pickle.load(f)

    ########  load in the dataset ##############

    print('Loading in Dataset')
    with open(f"{path_to_benign}processed_data/test_X", "rb") as f:
        test_full = pickle.load(f)
    with open(f"{path_to_benign}processed_data/test_Y", "rb") as f:
        test_labels = pickle.load(f)
    print('Dataset Loaded')

    ########  load in the features ##############

    with open(f"{path_to_benign}block_features/features", "rb") as f:
        features = pickle.load(f)
    print('Features Loaded')

    ############################################

    ########  get block accuracies ##############
    block_accuracies = get_block_accuracies(basicBlocks, clfs, drs, test_full, test_labels)


    ####### Run Evaluation ####################
    print("\nResults:\n")
    # first benign
    FPR,TNR = run_evaluation(path_to_benign, basicBlocks, control_flow, b_score_threshold, clfs, drs, features, block_accuracies, 'Benign')
    print(f'\nTNR: {TNR*100:.2f}%\nFPR: {FPR*100:.2f}%\n')

    #next attacks
    for attack in path_to_attacks:
        TPR,FNR = run_evaluation(attack, basicBlocks, control_flow, b_score_threshold, clfs, drs, features, block_accuracies, attack.split(os.sep)[-2])
        print(f'\nTPR: {TPR*100:.2f}%\nFNR: {FNR*100:.2f}%\n')

if __name__ == "__main__":
    main()