## Standard libraries
import os
import json
import math
import random
import time
import numpy as np
import scipy.linalg
from PIL import Image, ImageFilter
#Libraries for getting dataset
import urllib.request
from urllib.error import HTTPError
import zipfile
## Progress bar
from tqdm.notebook import tqdm
## PyTorch
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.utils.data as data
import torch.optim as optim
# Torchvision
import torchvision
from torchvision.datasets import CIFAR10
from torchvision import transforms
from torchvision.utils import save_image
from torchvision.transforms import v2
# PyTorch Lightning
import pytorch_lightning as pl
from pytorch_lightning.callbacks import LearningRateMonitor, ModelCheckpoint
#local libs
from models import Generator
## Imports for plotting
import matplotlib.pyplot as plt
from IPython.display import set_matplotlib_formats
set_matplotlib_formats('svg', 'pdf') # For export
from matplotlib.colors import to_rgb
import matplotlib
matplotlib.rcParams['lines.linewidth'] = 2.0
import seaborn as sns
sns.set()
# Path to the folder where the datasets are/should be downloaded (e.g. MNIST)
DATASET_PATH = "./data"
# Path to the folder where the pretrained models are saved
CHECKPOINT_PATH = "./saved_models/tutorial10"
# Setting the ]seed
pl.seed_everything(42)
# Ensure that all operations are deterministic on GPU (if used) for reproducibility
torch.backends.cudnn.deterministic = True
torch.backends.cudnn.benchmark = False
# Fetching the device that will be used throughout this notebook
device = torch.device("cpu") if not torch.cuda.is_available() else torch.device("cuda:0")
#print("Using device", device)
# Github URL where the dataset is stored for this tutorial
base_url = "https://raw.githubusercontent.com/phlippe/saved_models/main/tutorial10/"
# Files to download
pretrained_files = [(DATASET_PATH, "TinyImageNet.zip"), (CHECKPOINT_PATH, "patches.zip")]
# Create checkpoint path if it doesn't exist yet
os.makedirs(DATASET_PATH, exist_ok=True)
os.makedirs(CHECKPOINT_PATH, exist_ok=True)
# Mean and Std from ImageNet
NORM_MEAN = np.array([0.485, 0.456, 0.406])
NORM_STD = np.array([0.229, 0.224, 0.225])
# No resizing and center crop necessary as images are already preprocessed.
plain_transforms = transforms.Compose([
transforms.ToTensor(),
transforms.Normalize(mean=NORM_MEAN,
std=NORM_STD)
])
# Load dataset and create data loader
imagenet_path = os.path.join(DATASET_PATH, "TinyImageNet/")
assert os.path.isdir(imagenet_path), f"Could not find the ImageNet dataset at expected path \"{imagenet_path}\". " + \
f"Please make sure to have downloaded the ImageNet dataset here, or change the {DATASET_PATH=} variable."
dataset = torchvision.datasets.ImageFolder(root=imagenet_path, transform=plain_transforms)
data_loader = data.DataLoader(dataset, batch_size=32, shuffle=False, drop_last=False, num_workers=8)
# Load CNN architecture pretrained on ImageNet
os.environ["TORCH_HOME"] = CHECKPOINT_PATH
pretrained_model = torchvision.models.resnet34(weights='IMAGENET1K_V1')
pretrained_model = pretrained_model.to(device)
# No gradients needed for the network
pretrained_model.eval()
for p in pretrained_model.parameters():
p.requires_grad = False
# Load label names to interpret the label numbers 0 to 999
with open(os.path.join(imagenet_path, "label_list.json"), "r") as f:
label_names = json.load(f)
def get_label_index(lab_str):
assert lab_str in label_names, f"Label \"{lab_str}\" not found. Check the spelling of the class."
return label_names.index(lab_str)
# For each file, check whether it already exists. If not, try downloading it.
for dir_name, file_name in pretrained_files:
file_path = os.path.join(dir_name, file_name)
if not os.path.isfile(file_path):
file_url = base_url + file_name
print(f"Downloading {file_url}...")
try:
urllib.request.urlretrieve(file_url, file_path)
except HTTPError as e:
print("Something went wrong. Please try to download the file from the GDrive folder, or contact the author with the full output including the following error:\n", e)
if file_name.endswith(".zip"):
print("Unzipping file...")
with zipfile.ZipFile(file_path, 'r') as zip_ref:
zip_ref.extractall(file_path.rsplit("/",1)[0])
def eval_model(dataset_loader, img_func=None):
tp, tp_5, counter = 0., 0., 0.
for imgs, labels in tqdm(dataset_loader, desc="Validating..."):
imgs = imgs.to(device)
labels = labels.to(device)
if img_func is not None:
imgs = img_func(imgs, labels)
with torch.no_grad():
preds = pretrained_model(imgs)
tp += (preds.argmax(dim=-1) == labels).sum()
tp_5 += (preds.topk(5, dim=-1)[1] == labels[...,None]).any(dim=-1).sum()
counter += preds.shape[0]
acc = tp.float().item()/counter
top5 = tp_5.float().item()/counter
print(f"Top-1 error: {(100.0 * (1 - acc)):4.2f}%")
print(f"Top-5 error: {(100.0 * (1 - top5)):4.2f}%")
return acc, top5
def show_prediction(img, label, pred, K=5, adv_img=None, noise=None):
if adv_img is not None:
print("")
# save_image(adv_img, CHECKPOINT_PATH + "/" + f"adv_{label_names[label]} + .png")
# save_image(img, CHECKPOINT_PATH + "/" + f"{label_names[label]} + .png")
if abs(pred.sum().item() - 1.0) > 1e-4:
pred = torch.softmax(pred, dim=-1)
topk_vals, topk_idx = pred.topk(K, dim=-1)
topk_vals, topk_idx = topk_vals.cpu().numpy(), topk_idx.cpu().numpy()
top_vals, top_idx = pred.topk(15, dim=-1)
top_vals, top_idx = top_vals.cpu().numpy(), top_idx.cpu().numpy()
print(f"{label_names[label]}")
val = 0
for i in range(15):
if(label_names[top_idx[i]] == label_names[label]):
val = top_vals[i] * 100
print(f"{label_names[label]} | {val}")
for i in range(K):
print(f"{label_names[topk_idx[i]]} | {topk_vals[i] * 100}")
print(val)
if (label_names[topk_idx[i]] == label_names[label]):
return [val, 1.0]
else:
return [val, 0.0]
#FGSM for normal attack
def fast_gradient_sign_method(model, imgs, labels, epsilon=0.02):
# Determine prediction of the model
inp_imgs = imgs.clone().requires_grad_()
preds = model(inp_imgs.to(device))
preds = F.log_softmax(preds, dim=-1)
# Calculate loss by NLL
loss = -torch.gather(preds, 1, labels.to(device).unsqueeze(dim=-1))
loss.sum().backward()
# Update image to adversarial example as written above
noise_grad = torch.sign(inp_imgs.grad.to(imgs.device))
fake_imgs = imgs + epsilon * noise_grad
fake_imgs.detach_()
return fake_imgs, noise_grad
#patch attack methods
def place_patch(img, patch):
for i in range(img.shape[0]):
h_offset = np.random.randint(0,img.shape[2]-patch.shape[1]-1)
w_offset = np.random.randint(0,img.shape[3]-patch.shape[2]-1)
img[i,:,h_offset:h_offset+patch.shape[1],w_offset:w_offset+patch.shape[2]] = patch_forward(patch)
return img
TENSOR_MEANS, TENSOR_STD = torch.FloatTensor(NORM_MEAN)[:,None,None], torch.FloatTensor(NORM_STD)[:,None,None]
def patch_forward(patch):
# Map patch values from [-infty,infty] to ImageNet min and max
patch = (torch.tanh(patch) + 1 - 2 * TENSOR_MEANS) / (2 * TENSOR_STD)
return patch
def eval_patch(model, patch, val_loader, target_class):
model.eval()
tp, tp_5, counter = 0., 0., 0.
with torch.no_grad():
for img, img_labels in tqdm(val_loader, desc="Validating...", leave=False):
# For stability, place the patch at 4 random locations per image, and average the performance
for _ in range(4):
patch_img = place_patch(img, patch)
patch_img = patch_img.to(device)
img_labels = img_labels.to(device)
pred = model(patch_img)
# In the accuracy calculation, we need to exclude the images that are of our target class
# as we would not "fool" the model into predicting those
tp += torch.logical_and(pred.argmax(dim=-1) == target_class, img_labels != target_class).sum()
tp_5 += torch.logical_and((pred.topk(5, dim=-1)[1] == target_class).any(dim=-1), img_labels != target_class).sum()
counter += (img_labels != target_class).sum()
acc = tp/counter
top5 = tp_5/counter
return acc, top5
def patch_attack(model, target_class, patch_size=64, num_epochs=5):
# Leave a small set of images out to check generalization
# In most of our experiments, the performance on the hold-out data points
# was as good as on the training set. Overfitting was little possible due
# to the small size of the patches.
train_set, val_set = torch.utils.data.random_split(dataset, [4500, 500])
train_loader = data.DataLoader(train_set, batch_size=32, shuffle=True, drop_last=True, num_workers=8)
val_loader = data.DataLoader(val_set, batch_size=32, shuffle=False, drop_last=False, num_workers=4)
# Create parameter and optimizer
if not isinstance(patch_size, tuple):
patch_size = (patch_size, patch_size)
patch = nn.Parameter(torch.zeros(3, patch_size[0], patch_size[1]), requires_grad=True)
optimizer = torch.optim.SGD([patch], lr=1e-1, momentum=0.8)
loss_module = nn.CrossEntropyLoss()
# Training loop
for epoch in range(num_epochs):
t = tqdm(train_loader, leave=False)
for img, _ in t:
img = place_patch(img, patch)
img = img.to(device)
pred = model(img)
labels = torch.zeros(img.shape[0], device=pred.device, dtype=torch.long).fill_(target_class)
loss = loss_module(pred, labels)
optimizer.zero_grad()
loss.mean().backward()
optimizer.step()
t.set_description(f"Epoch {epoch}, Loss: {loss.item():4.2f}")
# Final validation
acc, top5 = eval_patch(model, patch, val_loader, target_class)
return patch.data, {"acc": acc.item(), "top5": top5.item()}
# Load evaluation results of the pretrained patches
json_results_file = os.path.join(CHECKPOINT_PATH, "patch_results.json")
json_results = {}
if os.path.isfile(json_results_file):
with open(json_results_file, "r") as f:
json_results = json.load(f)
# If you train new patches, you can save the results via calling this function
def save_results(patch_dict):
result_dict = {cname: {psize: [t.item() if isinstance(t, torch.Tensor) else t
for t in patch_dict[cname][psize]["results"]]
for psize in patch_dict[cname]}
for cname in patch_dict}
with open(os.path.join(CHECKPOINT_PATH, "patch_results.json"), "w") as f:
json.dump(result_dict, f, indent=4)
def get_patches(class_names, patch_sizes):
result_dict = dict()
# Loop over all classes and patch sizes
for name in class_names:
result_dict[name] = dict()
for patch_size in patch_sizes:
c = label_names.index(name)
file_name = os.path.join(CHECKPOINT_PATH, f"{name}_{patch_size}_patch.pt")
# Load patch if pretrained file exists, otherwise start training
if not os.path.isfile(file_name):
patch, val_results = patch_attack(pretrained_model, target_class=c, patch_size=patch_size, num_epochs=5)
print(f"Validation results for {name} and {patch_size}:", val_results)
torch.save(patch, file_name)
else:
patch = torch.load(file_name)
# Load evaluation results if exist, otherwise manually evaluate the patch
if name in json_results:
results = json_results[name][str(patch_size)]
else:
results = eval_patch(pretrained_model, patch, data_loader, target_class=c)
# Store results and the patches in a dict for better access
result_dict[name][patch_size] = {
"results": results,
"patch": patch
}
return result_dict
def perform_patch_attack(patch):
patch_batch = exmp_batch.clone()
patch_batch = place_patch(patch_batch, patch)
return patch_batch
def perform_multi_patch_attack(patch):
patch_batch = exmp_batch.clone()
for i in range(4):
patch_batch = place_patch(patch_batch, patch)
return patch_batch
def gaussian_blur(img, kernelSize, sig):
blurrer = v2.GaussianBlur(kernel_size=kernelSize, sigma=sig)
blurred_imgs = blurrer(img)
return img
def median_blur(input_tensor, kernel_size):
pad_size = kernel_size // 2
padded_input = F.pad(input_tensor, (pad_size, pad_size, pad_size, pad_size), mode='reflect')
unfolded = padded_input.unfold(1, kernel_size, 1).unfold(2, kernel_size, 1)
median = unfolded.contiguous().view(*unfolded.shape[:3], -1).median(dim=-1)[0]
return median
def monte_denoise(image_tensor, beta):
num_iterations=1000
denoised_image = image_tensor.clone()
# Get image dimensions
C, H, W = denoised_image.shape
# MCMC iterations
for _ in range(num_iterations):
# Randomly select a pixel
i = torch.randint(0, H, (1,)).item()
j = torch.randint(0, W, (1,)).item()
i_min = max(i - 1, 0)
i_max = min(i + 2, H)
j_min = max(j - 1, 0)
j_max = min(j + 2, W)
neighbors = denoised_image[:, i_min:i_max, j_min:j_max]
mean_neighborhood = neighbors.mean(dim=(1, 2))
proposed_value = mean_neighborhood + beta * torch.randn(C)
current_value = denoised_image[:, i, j]
energy_diff = torch.norm(current_value - mean_neighborhood) - torch.norm(proposed_value - mean_neighborhood)
if torch.exp(-energy_diff / beta) > torch.rand(1):
denoised_image[:, i, j] = proposed_value
return denoised_image
def defend_image_with_gan(img):
gan_path = "./APE-GAN/checkpoint/cifar/10.tar" #torch.load(args.gan_path)
gan_point = torch.load(gan_path)
G = Generator(3)
G.load_state_dict(gan_point["generator"])
loss_cre = nn.CrossEntropyLoss()
# model.eval(), G.eval()
G.eval()
x_ape = G(img.unsqueeze(0))
return x_ape
#running models
if __name__ == '__main__':
#_ = eval_model(data_loader)
file = open("output_data.csv", "w")
file.write("type,time,prediction,correct\n")
exmp_batch, label_batch = next(iter(data_loader))
print("Model with out attack")
with torch.no_grad():
preds = pretrained_model(exmp_batch.to(device))
print(f"{exmp_batch=} + {label_batch=}")
for i in range(1,17,5):
output = show_prediction(exmp_batch[i], label_batch[i], preds[i])
file.write(f"v,0,{output[0]},{output[1]}\n")
print("Model with Traditional adv ttack")
adv_imgs, noise_grad = fast_gradient_sign_method(pretrained_model, exmp_batch, label_batch, epsilon=0.02)
with torch.no_grad():
adv_preds = pretrained_model(adv_imgs.to(device))
for i in range(1,17,5):
output = show_prediction(exmp_batch[i], label_batch[i], adv_preds[i], adv_img=adv_imgs[i], noise=noise_grad[i])
file.write(f"tau,0,{output[0]},{output[1]}\n")
print("Model with single patch attack")
class_names = [ 'goldfish']
patch_sizes = [32]
patch_dict = get_patches(class_names, patch_sizes)
patch_batch = perform_patch_attack(patch_dict['goldfish'][32]['patch'])
with torch.no_grad():
patch_preds = pretrained_model(patch_batch.to(device))
for i in range(1,17,5):
output = show_prediction(patch_batch[i], label_batch[i], patch_preds[i])
file.write(f"spu,0,{output[0]},{output[1]}\n")
print("Model with multi patch attack")
multi_patch_batch = perform_multi_patch_attack(patch_dict['goldfish'][32]['patch'])
with torch.no_grad():
patch_preds = pretrained_model(multi_patch_batch.to(device))
for i in range(1,17,5):
output = show_prediction(multi_patch_batch[i], label_batch[i], patch_preds[i])
file.write(f"mpu,0,{output[0]},{output[1]}\n")
kernel_sizes = [3,5,7,15,31,61]
sigmas = [1.0,2.0,3.0,5.0]
betas = [.1,.2,.4,.6,.8]
print("Median defended Model with adv attack")
for kernel_size in kernel_sizes:
adv_imgs, noise_grad = fast_gradient_sign_method(pretrained_model, exmp_batch, label_batch, epsilon=0.02)
patch_batch = perform_patch_attack(patch_dict['goldfish'][32]['patch'])
multi_patch_batch = perform_multi_patch_attack(patch_dict['goldfish'][32]['patch'])
med_imgs = adv_imgs
med_imgs0 = patch_batch
med_imgs1 = multi_patch_batch
timeTaken = 0
start = time.time()
for i in range(0, len(med_imgs)):
med_imgs[i] = median_blur(med_imgs[i], kernel_size)
med_imgs0[i] = median_blur(med_imgs0[i], kernel_size)
med_imgs1[i] = median_blur(med_imgs1[i], kernel_size)
timeTaken = (time.time() - start) / 3
with torch.no_grad():
adv_preds = pretrained_model(med_imgs.to(device))
adv_preds0 = pretrained_model(med_imgs0.to(device))
adv_preds1 = pretrained_model(med_imgs1.to(device))
for i in range(1,17,5):
output = show_prediction(exmp_batch[i], label_batch[i], adv_preds[i], adv_img=med_imgs[i], noise=noise_grad[i])
file.write(f"tamed{kernel_size},{timeTaken},{output[0]},{output[1]}\n")
output = show_prediction(exmp_batch[i], label_batch[i], adv_preds0[i], adv_img=med_imgs0[i], noise=noise_grad[i])
file.write(f"spmed{kernel_size},{timeTaken},{output[0]},{output[1]}\n")
output = show_prediction(exmp_batch[i], label_batch[i], adv_preds1[i], adv_img=med_imgs1[i], noise=noise_grad[i])
file.write(f"mpmed{kernel_size},{timeTaken},{output[0]},{output[1]}\n")
print("Gussian defended Model with adv attack")
for rad in sigmas:
adv_imgs, noise_grad = fast_gradient_sign_method(pretrained_model, exmp_batch, label_batch, epsilon=0.02)
patch_batch = perform_patch_attack(patch_dict['goldfish'][32]['patch'])
multi_patch_batch = perform_multi_patch_attack(patch_dict['goldfish'][32]['patch'])
gus_imgs= adv_imgs
gus_imgs0=patch_batch
gus_imgs1= multi_patch_batch
timeTaken = 0
start = time.time()
for i in range(0, len(gus_imgs)):
gus_imgs[i] = gaussian_blur(gus_imgs[i],5, rad)
gus_imgs0[i] = gaussian_blur(gus_imgs0[i],5, rad)
gus_imgs1[i] = gaussian_blur(gus_imgs1[i],5, rad)
timeTaken = (time.time() - start) / 3
with torch.no_grad():
adv_preds = pretrained_model(gus_imgs.to(device))
adv_preds0 = pretrained_model(gus_imgs0.to(device))
adv_preds1 = pretrained_model(gus_imgs1.to(device))
for i in range(1,17,5):
output = show_prediction(exmp_batch[i], label_batch[i], adv_preds[i], adv_img=gus_imgs[i], noise=noise_grad[i])
file.write(f"tagus{rad},{timeTaken},{output[0]},{output[1]}\n")
output = show_prediction(exmp_batch[i], label_batch[i], adv_preds0[i], adv_img=gus_imgs0[i], noise=noise_grad[i])
file.write(f"spgus{rad},{timeTaken},{output[0]},{output[1]}\n")
output = show_prediction(exmp_batch[i], label_batch[i], adv_preds1[i], adv_img=gus_imgs1[i], noise=noise_grad[i])
file.write(f"mpgus{rad},{timeTaken},{output[0]},{output[1]}\n")
print("Mont defended Model with adv attack")
for beta in betas:
adv_imgs, noise_grad = fast_gradient_sign_method(pretrained_model, exmp_batch, label_batch, epsilon=0.02)
patch_batch = perform_patch_attack(patch_dict['goldfish'][32]['patch'])
multi_patch_batch = perform_multi_patch_attack(patch_dict['goldfish'][32]['patch'])
mont_imgs = adv_imgs
mont_imgs0 = patch_batch
mont_imgs1 = multi_patch_batch
timeTaken = 0
start = time.time()
for i in range(0, len(mont_imgs)):
mont_imgs[i] = monte_denoise(mont_imgs[i], beta)
mont_imgs0[i] = monte_denoise(mont_imgs0[i], beta)
mont_imgs1[i] = monte_denoise(mont_imgs1[i], beta)
timeTaken = (time.time() - start) / 3
with torch.no_grad():
adv_preds = pretrained_model(mont_imgs.to(device))
adv_preds0 = pretrained_model(mont_imgs0.to(device))
adv_preds1 = pretrained_model(mont_imgs1.to(device))
for i in range(1,17,5):
output = show_prediction(exmp_batch[i], label_batch[i], adv_preds[i], adv_img=mont_imgs[i], noise=noise_grad[i])
file.write(f"tamont{beta},{timeTaken},{output[0]},{output[1]}\n")
output = show_prediction(exmp_batch[i], label_batch[i], adv_preds0[i], adv_img=mont_imgs0[i], noise=noise_grad[i])
file.write(f"spmont{beta},{timeTaken},{output[0]},{output[1]}\n")
output = show_prediction(exmp_batch[i], label_batch[i], adv_preds1[i], adv_img=mont_imgs1[i], noise=noise_grad[i])
file.write(f"mpmont{beta},{timeTaken},{output[0]},{output[1]}\n")
print("GAN defended Model with adv attack")
adv_imgs, noise_grad = fast_gradient_sign_method(pretrained_model, exmp_batch, label_batch, epsilon=0.02)
patch_batch = perform_patch_attack(patch_dict['goldfish'][32]['patch'])
multi_patch_batch = perform_multi_patch_attack(patch_dict['goldfish'][32]['patch'])
gan_imgs= adv_imgs
gan_imgs0=patch_batch
gan_imgs1= multi_patch_batch
timeTaken = 0
start = time.time()
for i in range(0, len(gan_imgs)):
gan_imgs[i] = defend_image_with_gan(gan_imgs[i])
gan_imgs0[i] = defend_image_with_gan(gan_imgs0[i])
gan_imgs1[i] = defend_image_with_gan(gan_imgs1[i])
timeTaken = (time.time() - start) / 3
with torch.no_grad():
adv_preds = pretrained_model(gan_imgs.to(device))
adv_preds0 = pretrained_model(gan_imgs0.to(device))
adv_preds1 = pretrained_model(gan_imgs1.to(device))
for i in range(1,17,5):
output = show_prediction(exmp_batch[i], label_batch[i], adv_preds[i], adv_img=gan_imgs[i].detach(), noise=noise_grad[i])
file.write(f"tagan,{timeTaken},{output[0]},{output[1]}\n")
output = show_prediction(exmp_batch[i], label_batch[i], adv_preds0[i], adv_img=gan_imgs0[i].detach(), noise=noise_grad[i])
file.write(f"spgan,{timeTaken},{output[0]},{output[1]}\n")
output = show_prediction(exmp_batch[i], label_batch[i], adv_preds1[i], adv_img=gan_imgs1[i].detach(), noise=noise_grad[i])
file.write(f"mpgan,{timeTaken},{output[0]},{output[1]}\n")
file.close()
print("DONE")
time.sleep(600)