import argparse
from pathlib import Path
import numpy as np
import os
import pandas as pd
import scipy.signal as signal
import glob
import tqdm
import pickle
import time
import warnings
from control_flows import Control_Flow
def parse_args() -> argparse.Namespace:
"""Parse and return command line arguments."""
parser = argparse.ArgumentParser(
description="Runs evaluation on a given path."
)
# benign file path
parser.add_argument(
"--benign-path", "-p",
type=Path,
required=True,
help="Path to where the benign data is stored.",
)
# list of file paths (at least one)
parser.add_argument(
"--attacks-path", "-a",
nargs="+",
type=Path,
required=True,
help="One or more paths to Attack Files.",
)
# bscore
parser.add_argument(
"-b", "--bscore",
type=float,
required=True,
help="A floating‑point number.",
)
# which program
parser.add_argument(
"-c", "--program",
type=str,
choices=["checksum", "syringe-arduino", "syringe-pico", "servo-arduino", "servo-pico", "soldering", "home-iot", "distance"],
required=True,
help="Which program to evaluate",
)
return parser.parse_args()
def get_block_accuracies(basicBlocks, clfs, drs, test_full, test_labels):
''' Returns the Test accuracy of the basic blocks for each classifier.'''
timings = []
accuracies = []
block_accuracies = {}
for block in basicBlocks:
clf = clfs[block]
dr = drs[block]
start = time.time()
test_X = dr.transform(test_full[block])
preds = clf.predict(test_X)
end = time.time()
timings.append((end-start)/test_labels[block].shape[0])
count = 0
for i,pred in enumerate(preds):
if pred == test_labels[block][i]:
count += 1
block_accuracies[block] = count/test_labels[block].shape[0]
accuracies.append(count/test_labels[block].shape[0]*100)
return block_accuracies
def get_and_process_raw_data(file, block, features):
'''This takes a string which points to a file to get and preprocess the data as the passed block'''
width = 50
widths = np.arange(1, width+1)
#make a arrays to hold all of the values we will be reading
test = np.zeros(shape=(features[block].shape[0]))
#read data
data = pd.read_csv(file,header=0)
test_raw = np.asarray(data['CH2'])
cwt_data = signal.cwt(test_raw.flatten(), signal.ricker, widths)
for i,feat in enumerate(features[block]):
x = feat[0]
y = feat[1]
test[i] = cwt_data[x][y]
return test.reshape(1, -1)
def predict(block, data, clfs, drs, block_accuracies):
'''This runs the prediction and returns if 1 raises an error if 0. Returns the prediction value multiplied by the test accuracy'''
pred = int(clfs[block].predict(drs[block].transform(data))[0])
return (pred * block_accuracies[block])
def run_evaluation(path, basicBlocks, control_flow, b_score_threshold, clfs, drs, features, block_accuracies, desc=None):
'''Runs evaluation. Returns the TPR and FNR.'''
total_runs = 300
runs = glob.glob(f'{path}split{list(basicBlocks.keys())[0]}/validation/*.csv')
runs = sorted([i.split('/')[-1][0:-6] for i in runs])
incorrect_cnt = 0
correct_cnt = 0
acc = 0
for run in tqdm.tqdm(runs[0:total_runs],desc=desc):
try:
run_history = 0
block_num = 0
for block in control_flow:
if block[0:2] == '0x':
try:
data = get_and_process_raw_data(file=f'{path}split{block}/validation/{run}_{block_num}.csv', block=block, features=features)
run_history += predict(block, data, clfs, drs, block_accuracies)
block_num += 1
except:
print(f'{path}split{block}/validation/{run}_{block_num}.csv')
b_score = run_history/(block_num)
if b_score > b_score_threshold:
correct_cnt += 1
else:
incorrect_cnt += 1
except KeyboardInterrupt:
break
incorrect_pct = incorrect_cnt / (incorrect_cnt + correct_cnt)
correct_pct = correct_cnt / (incorrect_cnt + correct_cnt)
return (incorrect_pct, correct_pct)
def main():
# Ignore all DeprecationWarnings
warnings.filterwarnings("ignore", category=DeprecationWarning)
args = parse_args()
path_to_benign = os.path.join(args.benign_path, '')
path_to_attacks = [os.path.join(attack, '') for attack in args.attacks_path]
b_score_threshold = float(args.bscore)
if not os.path.isdir(path_to_benign):
raise FileNotFoundError(path_to_benign)
for attack in path_to_attacks:
if not os.path.isdir(attack):
raise FileNotFoundError(attack)
###### Get the required control flow and blocks #####
match args.program:
case "checksum":
control_flow = Control_Flow.CSUMEX()
case "syringe-arduino":
control_flow = Control_Flow.Syringe()
case "syringe-pico":
control_flow = Control_Flow.Syringe_Pico()
case "servo-arduino":
control_flow = Control_Flow.Servo()
case "servo-pico":
control_flow = Control_Flow.Servo_Pico()
case "soldering":
control_flow = Control_Flow.Soldering()
case "home-iot":
control_flow = Control_Flow.Coffee()
case "distance":
control_flow = Control_Flow.Distance_Sensor()
case _:
raise ValueError(f'{args.program} is not a recognized program. Pick from ["checksum", "syringe-arduino", "syringe-pico", "servo-arduino", "servo-pico", "soldering", "home-iot", "distance"],')
basicBlocks = control_flow.get_basicBlocks()
######### read in and models here ###########
with open(f"{path_to_benign}clfs_drs/clfs.pkl", "rb") as f:
clfs = pickle.load(f)
with open(f"{path_to_benign}clfs_drs/drs.pkl", "rb") as f:
drs = pickle.load(f)
######## load in the dataset ##############
print('Loading in Dataset')
with open(f"{path_to_benign}processed_data/test_X", "rb") as f:
test_full = pickle.load(f)
with open(f"{path_to_benign}processed_data/test_Y", "rb") as f:
test_labels = pickle.load(f)
print('Dataset Loaded')
######## load in the features ##############
with open(f"{path_to_benign}block_features/features", "rb") as f:
features = pickle.load(f)
print('Features Loaded')
############################################
######## get block accuracies ##############
block_accuracies = get_block_accuracies(basicBlocks, clfs, drs, test_full, test_labels)
####### Run Evaluation ####################
print("\nResults:\n")
# first benign
FPR,TNR = run_evaluation(path_to_benign, basicBlocks, control_flow, b_score_threshold, clfs, drs, features, block_accuracies, 'Benign')
print(f'\nTNR: {TNR*100:.2f}%\nFPR: {FPR*100:.2f}%\n')
#next attacks
for attack in path_to_attacks:
TPR,FNR = run_evaluation(attack, basicBlocks, control_flow, b_score_threshold, clfs, drs, features, block_accuracies, attack.split(os.sep)[-2])
print(f'\nTPR: {TPR*100:.2f}%\nFNR: {FNR*100:.2f}%\n')
if __name__ == "__main__":
main()