import argparse from pathlib import Path import numpy as np import os import pandas as pd import scipy.signal as signal import glob import tqdm import pickle import time import warnings from control_flows import Control_Flow def parse_args() -> argparse.Namespace: """Parse and return command line arguments.""" parser = argparse.ArgumentParser( description="Runs evaluation on a given path." ) # benign file path parser.add_argument( "--benign-path", "-p", type=Path, required=True, help="Path to where the benign data is stored.", ) # list of file paths (at least one) parser.add_argument( "--attacks-path", "-a", nargs="+", type=Path, required=True, help="One or more paths to Attack Files.", ) # bscore parser.add_argument( "-b", "--bscore", type=float, required=True, help="A floating‑point number.", ) # which program parser.add_argument( "-c", "--program", type=str, choices=["checksum", "syringe-arduino", "syringe-pico", "servo-arduino", "servo-pico", "soldering", "home-iot", "distance"], required=True, help="Which program to evaluate", ) return parser.parse_args() def get_block_accuracies(basicBlocks, clfs, drs, test_full, test_labels): ''' Returns the Test accuracy of the basic blocks for each classifier.''' timings = [] accuracies = [] block_accuracies = {} for block in basicBlocks: clf = clfs[block] dr = drs[block] start = time.time() test_X = dr.transform(test_full[block]) preds = clf.predict(test_X) end = time.time() timings.append((end-start)/test_labels[block].shape[0]) count = 0 for i,pred in enumerate(preds): if pred == test_labels[block][i]: count += 1 block_accuracies[block] = count/test_labels[block].shape[0] accuracies.append(count/test_labels[block].shape[0]*100) return block_accuracies def get_and_process_raw_data(file, block, features): '''This takes a string which points to a file to get and preprocess the data as the passed block''' width = 50 widths = np.arange(1, width+1) #make a arrays to hold all of the values we will be reading test = np.zeros(shape=(features[block].shape[0])) #read data data = pd.read_csv(file,header=0) test_raw = np.asarray(data['CH2']) cwt_data = signal.cwt(test_raw.flatten(), signal.ricker, widths) for i,feat in enumerate(features[block]): x = feat[0] y = feat[1] test[i] = cwt_data[x][y] return test.reshape(1, -1) def predict(block, data, clfs, drs, block_accuracies): '''This runs the prediction and returns if 1 raises an error if 0. Returns the prediction value multiplied by the test accuracy''' pred = int(clfs[block].predict(drs[block].transform(data))[0]) return (pred * block_accuracies[block]) def run_evaluation(path, basicBlocks, control_flow, b_score_threshold, clfs, drs, features, block_accuracies, desc=None): '''Runs evaluation. Returns the TPR and FNR.''' total_runs = 300 runs = glob.glob(f'{path}split{list(basicBlocks.keys())[0]}/validation/*.csv') runs = sorted([i.split('/')[-1][0:-6] for i in runs]) incorrect_cnt = 0 correct_cnt = 0 acc = 0 for run in tqdm.tqdm(runs[0:total_runs],desc=desc): try: run_history = 0 block_num = 0 for block in control_flow: if block[0:2] == '0x': try: data = get_and_process_raw_data(file=f'{path}split{block}/validation/{run}_{block_num}.csv', block=block, features=features) run_history += predict(block, data, clfs, drs, block_accuracies) block_num += 1 except: print(f'{path}split{block}/validation/{run}_{block_num}.csv') b_score = run_history/(block_num) if b_score > b_score_threshold: correct_cnt += 1 else: incorrect_cnt += 1 except KeyboardInterrupt: break incorrect_pct = incorrect_cnt / (incorrect_cnt + correct_cnt) correct_pct = correct_cnt / (incorrect_cnt + correct_cnt) return (incorrect_pct, correct_pct) def main(): # Ignore all DeprecationWarnings warnings.filterwarnings("ignore", category=DeprecationWarning) args = parse_args() path_to_benign = os.path.join(args.benign_path, '') path_to_attacks = [os.path.join(attack, '') for attack in args.attacks_path] b_score_threshold = float(args.bscore) if not os.path.isdir(path_to_benign): raise FileNotFoundError(path_to_benign) for attack in path_to_attacks: if not os.path.isdir(attack): raise FileNotFoundError(attack) ###### Get the required control flow and blocks ##### match args.program: case "checksum": control_flow = Control_Flow.CSUMEX() case "syringe-arduino": control_flow = Control_Flow.Syringe() case "syringe-pico": control_flow = Control_Flow.Syringe_Pico() case "servo-arduino": control_flow = Control_Flow.Servo() case "servo-pico": control_flow = Control_Flow.Servo_Pico() case "soldering": control_flow = Control_Flow.Soldering() case "home-iot": control_flow = Control_Flow.Coffee() case "distance": control_flow = Control_Flow.Distance_Sensor() case _: raise ValueError(f'{args.program} is not a recognized program. Pick from ["checksum", "syringe-arduino", "syringe-pico", "servo-arduino", "servo-pico", "soldering", "home-iot", "distance"],') basicBlocks = control_flow.get_basicBlocks() ######### read in and models here ########### with open(f"{path_to_benign}clfs_drs/clfs.pkl", "rb") as f: clfs = pickle.load(f) with open(f"{path_to_benign}clfs_drs/drs.pkl", "rb") as f: drs = pickle.load(f) ######## load in the dataset ############## print('Loading in Dataset') with open(f"{path_to_benign}processed_data/test_X", "rb") as f: test_full = pickle.load(f) with open(f"{path_to_benign}processed_data/test_Y", "rb") as f: test_labels = pickle.load(f) print('Dataset Loaded') ######## load in the features ############## with open(f"{path_to_benign}block_features/features", "rb") as f: features = pickle.load(f) print('Features Loaded') ############################################ ######## get block accuracies ############## block_accuracies = get_block_accuracies(basicBlocks, clfs, drs, test_full, test_labels) ####### Run Evaluation #################### print("\nResults:\n") # first benign FPR,TNR = run_evaluation(path_to_benign, basicBlocks, control_flow, b_score_threshold, clfs, drs, features, block_accuracies, 'Benign') print(f'\nTNR: {TNR*100:.2f}%\nFPR: {FPR*100:.2f}%\n') #next attacks for attack in path_to_attacks: TPR,FNR = run_evaluation(attack, basicBlocks, control_flow, b_score_threshold, clfs, drs, features, block_accuracies, attack.split(os.sep)[-2]) print(f'\nTPR: {TPR*100:.2f}%\nFNR: {FNR*100:.2f}%\n') if __name__ == "__main__": main()