import numpy as np
from flwr.common import (
NDArray, NDArrays, Parameters, FitRes, MetricsAggregationFn, NDArrays, Scalar)
from flwr.common.typing import NDArray, NDArrays, Parameters
from io import BytesIO
from typing import Callable, Dict, List, Optional, Tuple, Union
import pickle
import pandas as pd
from strategy.customfedavg import CustomFedAvg
from strategy.fedensemble import FedEnsemble
from imblearn.over_sampling import SMOTE
def get_model_type(model) -> str:
return model.__class__.__name__
def string_to_integers(string):
return [ord(char) for char in string]
def integers_to_string(integers):
return ''.join([chr(int(integer)) for integer in integers])
supported_models = ["lr", "rf", "xgboost", "dt"]
setup_params = {
"verbose": True,
"session_id": 42,
"transformation": False,
"html": False,
"ignore_features": ["date", "Unnamed: 0"],
# "fix_imbalance": True,
# "fix_imbalance_method": "smote",
}
create_model_params = {
"cross_validation": False,
"verbose": False,
}
def get_model_parameters(model) -> NDArrays:
"""Returns the parameters of a sklearn model."""
tp = get_model_type(model)
attrs = []
blacklist = ["feature_names_in_"]
# if tp == "LogisticRegression" or tp == 'RidgeClassifier' or "LinearRegression":
# attrs = ["coef_", "intercept_", "classes_"]
# if tp == "RandomForestClassifier":
# attrs = ["n_estimators", "max_depth",
# "min_samples_split", "min_samples_leaf"]
# if tp == "XGBClassifier":
# attrs = ["n_estimators", "max_depth",
# "learning_rate", "subsample", "colsample_bytree", "subsample", "gamma"]
# if attrs == []:
# raise Exception("Model type not supported")
attrs = [v for v in vars(model)
if v.endswith("_") or v.startswith("_")]
attrs = [i for i in attrs if i not in blacklist]
params = []
for attr in attrs:
if hasattr(model, attr):
k = string_to_integers(attr)
v = getattr(model, attr)
if v is None:
continue
params.append(k)
params.append(v)
# print("Params: ", params)
return params
def set_model_params(model, params: NDArrays):
"""Sets the parameters of a sklearn model."""
for i in range(0, len(params), 2):
k, v = params[i], params[i+1]
k = integers_to_string(k)
setattr(model, k, v)
return model
def set_initial_params(model):
"""Sets initial parameters as zeros Required since model params are uninitialized
until model.fit is called.
But server asks for initial parameters from clients at launch. Refer to
sklearn.linear_model.LogisticRegression documentation for more information.
"""
n_classes = 2 # MNIST has 10 classes
n_features = 8 # Number of features in dataset
model.classes_ = np.array([i for i in range(n_classes)])
model.coef_ = np.zeros((n_classes, n_features))
if model.fit_intercept:
model.intercept_ = np.zeros((n_classes,))
return model
model.n_estimators = 100
model.max_depth = 40,
model.min_samples_split = 2,
model.min_samples_leaf = 1,
return model
def get_metrics(df, problem_type="classification") -> Dict:
# take from the first row
results = df.iloc[0].to_dict()
results.pop('Model', None)
return results
def ndarrays_to_custom_parameters(ndarrays: NDArrays) -> Parameters:
"""Convert NumPy ndarrays to parameters object."""
tensors = [ndarray_to_bytes(ndarray) for ndarray in ndarrays]
return Parameters(tensors=tensors, tensor_type="numpy.ndarray")
def custom_parameters_to_ndarrays(parameters: Parameters) -> NDArrays:
"""Convert parameters object to NumPy ndarrays."""
return [bytes_to_ndarray(tensor) for tensor in parameters.tensors]
def ndarray_to_bytes(ndarray: NDArray) -> bytes:
"""Serialize NumPy ndarray to bytes."""
# bytes_io = BytesIO()
# WARNING: NEVER set allow_pickle to true.
# Reason: loading pickled data can execute arbitrary code
# Source: https://numpy.org/doc/stable/reference/generated/numpy.save.html
# np.save(bytes_io, ndarray, allow_pickle=True)
# return bytes_io.getvalue()
return pickle.dumps(ndarray)
def bytes_to_ndarray(tensor: bytes) -> NDArray:
"""Deserialize NumPy ndarray from bytes."""
# bytes_io = BytesIO(tensor)
# WARNING: NEVER set allow_pickle to true.
# Reason: loading pickled data can execute arbitrary code
# Source: https://numpy.org/doc/stable/reference/generated/numpy.load.html
# ndarray_deserialized = np.load(bytes_io, allow_pickle=True)
# return cast(NDArray, ndarray_deserialized)
return pickle.loads(tensor)
def data_partitioning(df, N_CLIENTS, partition_id):
# group the data by the last column and return df
split_size = len(df) // N_CLIENTS
df = [df.iloc[i:i+split_size].reset_index(drop=True)
for i in range(0, len(df), split_size)][partition_id]
return df
def get_target_column_name(df, target=-1):
if target.lstrip('-+').isnumeric():
return df.columns[int(target)]
else:
return target
def get_smote_data(_real_data, _target_column):
sm = SMOTE(random_state=42)
X = _real_data.drop(_target_column, axis=1)
y = _real_data[_target_column]
X_res, y_res = sm.fit_resample(X, y)
synthetic_data = pd.concat([X_res, y_res], axis=1)
return synthetic_data
def stratified_partition_with_all_values(df, n_partitions, partition_id, target=-1):
# Group the data by the column of interest
target = get_target_column_name(df, target)
grouped = df.groupby(target)
# Initialize empty list to store partitions
partitions = [[] for _ in range(n_partitions)]
# Iterate over groups
for name, group in grouped:
# Randomly shuffle the data within the group
# group = group.sample(frac=1).reset_index(drop=True)
# Calculate the number of samples in each partition for this group
samples_per_partition = len(group) // n_partitions
# Distribute the data evenly among partitions, ensuring each partition has all values
for i in range(n_partitions):
start_idx = i * samples_per_partition
end_idx = (i + 1) * samples_per_partition
if i == n_partitions - 1:
end_idx = None # Include remaining samples in the last partition
partition_data = group.iloc[start_idx:end_idx]
partitions[i].append(partition_data)
# Concatenate data frames in each partition
partitions = [pd.concat(partition) for partition in partitions]
# reset index
df = partitions[partition_id].reset_index(drop=True)
# get smote
smote_data = get_smote_data(df, target)
return smote_data
def load_data(path: str, encode_target=True, target_column=-1):
# Load data
# from pycaret.datasets import get_data
# return get_data('diabetes')
import pandas as pd
# df = pd.read_csv('../../data/aw_fb/aw_fb_data.csv')
# df = df.drop(['Unnamed: 0', 'X1'], axis=1)
# df_aw = df[df['device'] == 'apple watch']
# df_fb = df[df['device'] == 'fitbit']
# df_aw = df_aw.drop('device', axis=1)
# df_fb = df_fb.drop('device', axis=1)
# df = df_aw
df = pd.read_csv(path)
if encode_target:
from sklearn.preprocessing import LabelEncoder
le = LabelEncoder()
# encode target column
target = get_target_column_name(df, target_column)
df[target] = le.fit_transform(df[target])
return df
def get_strategy(model_name: str, num_rounds: int, exp_id: str, problem_type: str, fraction_fit: float = 1.0, fraction_evaluate: float = 1.0, min_fit_clients: int = 2,
min_evaluate_clients: int = 2,
min_available_clients: int = 2, evaluate_fn: Optional[
Callable[
[int, NDArrays, Dict[str, Scalar]],
Optional[Tuple[float, Dict[str, Scalar]]],
]
] = None, on_fit_config_fn: Optional[Callable[[int], Dict[str, Scalar]]] = None,
on_evaluate_config_fn: Optional[Callable[[
int], Dict[str, Scalar]]] = None,
accept_failures: bool = True,
initial_parameters: Optional[Parameters] = None,
fit_metrics_aggregation_fn: Optional[MetricsAggregationFn] = None,
evaluate_metrics_aggregation_fn: Optional[MetricsAggregationFn] = None,):
coef_models = ['lr']
if model_name in coef_models:
return CustomFedAvg(
num_rounds=num_rounds,
exp_id=exp_id,
fraction_fit=fraction_fit,
fraction_evaluate=fraction_evaluate,
min_fit_clients=min_fit_clients,
min_evaluate_clients=min_evaluate_clients,
min_available_clients=min_available_clients,
evaluate_fn=evaluate_fn,
on_fit_config_fn=on_fit_config_fn,
on_evaluate_config_fn=on_evaluate_config_fn,
accept_failures=accept_failures,
initial_parameters=initial_parameters,
fit_metrics_aggregation_fn=fit_metrics_aggregation_fn,
evaluate_metrics_aggregation_fn=evaluate_metrics_aggregation_fn,
)
else:
return FedEnsemble(
num_rounds=num_rounds,
exp_id=exp_id,
fraction_fit=fraction_fit,
fraction_evaluate=fraction_evaluate,
min_fit_clients=min_fit_clients,
min_evaluate_clients=min_evaluate_clients,
min_available_clients=min_available_clients,
evaluate_fn=evaluate_fn,
on_fit_config_fn=on_fit_config_fn,
on_evaluate_config_fn=on_evaluate_config_fn,
accept_failures=accept_failures,
initial_parameters=initial_parameters,
fit_metrics_aggregation_fn=fit_metrics_aggregation_fn,
evaluate_metrics_aggregation_fn=evaluate_metrics_aggregation_fn,
problem_type=problem_type
)
def get_loss(problem_type: str, metrics):
if (problem_type == 'classification'):
loss = float(metrics['Log Loss'])
elif (problem_type == 'regression'):
loss = float(metrics['MAE'])
return loss