import numpy as np from flwr.common import ( NDArray, NDArrays, Parameters, FitRes, MetricsAggregationFn, NDArrays, Scalar) from flwr.common.typing import NDArray, NDArrays, Parameters from io import BytesIO from typing import Callable, Dict, List, Optional, Tuple, Union import pickle import pandas as pd from strategy.customfedavg import CustomFedAvg from strategy.fedensemble import FedEnsemble from imblearn.over_sampling import SMOTE def get_model_type(model) -> str: return model.__class__.__name__ def string_to_integers(string): return [ord(char) for char in string] def integers_to_string(integers): return ''.join([chr(int(integer)) for integer in integers]) supported_models = ["lr", "rf", "xgboost", "dt"] setup_params = { "verbose": True, "session_id": 42, "transformation": False, "html": False, "ignore_features": ["date", "Unnamed: 0"], # "fix_imbalance": True, # "fix_imbalance_method": "smote", } create_model_params = { "cross_validation": False, "verbose": False, } def get_model_parameters(model) -> NDArrays: """Returns the parameters of a sklearn model.""" tp = get_model_type(model) attrs = [] blacklist = ["feature_names_in_"] # if tp == "LogisticRegression" or tp == 'RidgeClassifier' or "LinearRegression": # attrs = ["coef_", "intercept_", "classes_"] # if tp == "RandomForestClassifier": # attrs = ["n_estimators", "max_depth", # "min_samples_split", "min_samples_leaf"] # if tp == "XGBClassifier": # attrs = ["n_estimators", "max_depth", # "learning_rate", "subsample", "colsample_bytree", "subsample", "gamma"] # if attrs == []: # raise Exception("Model type not supported") attrs = [v for v in vars(model) if v.endswith("_") or v.startswith("_")] attrs = [i for i in attrs if i not in blacklist] params = [] for attr in attrs: if hasattr(model, attr): k = string_to_integers(attr) v = getattr(model, attr) if v is None: continue params.append(k) params.append(v) # print("Params: ", params) return params def set_model_params(model, params: NDArrays): """Sets the parameters of a sklearn model.""" for i in range(0, len(params), 2): k, v = params[i], params[i+1] k = integers_to_string(k) setattr(model, k, v) return model def set_initial_params(model): """Sets initial parameters as zeros Required since model params are uninitialized until model.fit is called. But server asks for initial parameters from clients at launch. Refer to sklearn.linear_model.LogisticRegression documentation for more information. """ n_classes = 2 # MNIST has 10 classes n_features = 8 # Number of features in dataset model.classes_ = np.array([i for i in range(n_classes)]) model.coef_ = np.zeros((n_classes, n_features)) if model.fit_intercept: model.intercept_ = np.zeros((n_classes,)) return model model.n_estimators = 100 model.max_depth = 40, model.min_samples_split = 2, model.min_samples_leaf = 1, return model def get_metrics(df, problem_type="classification") -> Dict: # take from the first row results = df.iloc[0].to_dict() results.pop('Model', None) return results def ndarrays_to_custom_parameters(ndarrays: NDArrays) -> Parameters: """Convert NumPy ndarrays to parameters object.""" tensors = [ndarray_to_bytes(ndarray) for ndarray in ndarrays] return Parameters(tensors=tensors, tensor_type="numpy.ndarray") def custom_parameters_to_ndarrays(parameters: Parameters) -> NDArrays: """Convert parameters object to NumPy ndarrays.""" return [bytes_to_ndarray(tensor) for tensor in parameters.tensors] def ndarray_to_bytes(ndarray: NDArray) -> bytes: """Serialize NumPy ndarray to bytes.""" # bytes_io = BytesIO() # WARNING: NEVER set allow_pickle to true. # Reason: loading pickled data can execute arbitrary code # Source: https://numpy.org/doc/stable/reference/generated/numpy.save.html # np.save(bytes_io, ndarray, allow_pickle=True) # return bytes_io.getvalue() return pickle.dumps(ndarray) def bytes_to_ndarray(tensor: bytes) -> NDArray: """Deserialize NumPy ndarray from bytes.""" # bytes_io = BytesIO(tensor) # WARNING: NEVER set allow_pickle to true. # Reason: loading pickled data can execute arbitrary code # Source: https://numpy.org/doc/stable/reference/generated/numpy.load.html # ndarray_deserialized = np.load(bytes_io, allow_pickle=True) # return cast(NDArray, ndarray_deserialized) return pickle.loads(tensor) def data_partitioning(df, N_CLIENTS, partition_id): # group the data by the last column and return df split_size = len(df) // N_CLIENTS df = [df.iloc[i:i+split_size].reset_index(drop=True) for i in range(0, len(df), split_size)][partition_id] return df def get_target_column_name(df, target=-1): if target.lstrip('-+').isnumeric(): return df.columns[int(target)] else: return target def get_smote_data(_real_data, _target_column): sm = SMOTE(random_state=42) X = _real_data.drop(_target_column, axis=1) y = _real_data[_target_column] X_res, y_res = sm.fit_resample(X, y) synthetic_data = pd.concat([X_res, y_res], axis=1) return synthetic_data def stratified_partition_with_all_values(df, n_partitions, partition_id, target=-1): # Group the data by the column of interest target = get_target_column_name(df, target) grouped = df.groupby(target) # Initialize empty list to store partitions partitions = [[] for _ in range(n_partitions)] # Iterate over groups for name, group in grouped: # Randomly shuffle the data within the group # group = group.sample(frac=1).reset_index(drop=True) # Calculate the number of samples in each partition for this group samples_per_partition = len(group) // n_partitions # Distribute the data evenly among partitions, ensuring each partition has all values for i in range(n_partitions): start_idx = i * samples_per_partition end_idx = (i + 1) * samples_per_partition if i == n_partitions - 1: end_idx = None # Include remaining samples in the last partition partition_data = group.iloc[start_idx:end_idx] partitions[i].append(partition_data) # Concatenate data frames in each partition partitions = [pd.concat(partition) for partition in partitions] # reset index df = partitions[partition_id].reset_index(drop=True) # get smote smote_data = get_smote_data(df, target) return smote_data def load_data(path: str, encode_target=True, target_column=-1): # Load data # from pycaret.datasets import get_data # return get_data('diabetes') import pandas as pd # df = pd.read_csv('../../data/aw_fb/aw_fb_data.csv') # df = df.drop(['Unnamed: 0', 'X1'], axis=1) # df_aw = df[df['device'] == 'apple watch'] # df_fb = df[df['device'] == 'fitbit'] # df_aw = df_aw.drop('device', axis=1) # df_fb = df_fb.drop('device', axis=1) # df = df_aw df = pd.read_csv(path) if encode_target: from sklearn.preprocessing import LabelEncoder le = LabelEncoder() # encode target column target = get_target_column_name(df, target_column) df[target] = le.fit_transform(df[target]) return df def get_strategy(model_name: str, num_rounds: int, exp_id: str, problem_type: str, fraction_fit: float = 1.0, fraction_evaluate: float = 1.0, min_fit_clients: int = 2, min_evaluate_clients: int = 2, min_available_clients: int = 2, evaluate_fn: Optional[ Callable[ [int, NDArrays, Dict[str, Scalar]], Optional[Tuple[float, Dict[str, Scalar]]], ] ] = None, on_fit_config_fn: Optional[Callable[[int], Dict[str, Scalar]]] = None, on_evaluate_config_fn: Optional[Callable[[ int], Dict[str, Scalar]]] = None, accept_failures: bool = True, initial_parameters: Optional[Parameters] = None, fit_metrics_aggregation_fn: Optional[MetricsAggregationFn] = None, evaluate_metrics_aggregation_fn: Optional[MetricsAggregationFn] = None,): coef_models = ['lr'] if model_name in coef_models: return CustomFedAvg( num_rounds=num_rounds, exp_id=exp_id, fraction_fit=fraction_fit, fraction_evaluate=fraction_evaluate, min_fit_clients=min_fit_clients, min_evaluate_clients=min_evaluate_clients, min_available_clients=min_available_clients, evaluate_fn=evaluate_fn, on_fit_config_fn=on_fit_config_fn, on_evaluate_config_fn=on_evaluate_config_fn, accept_failures=accept_failures, initial_parameters=initial_parameters, fit_metrics_aggregation_fn=fit_metrics_aggregation_fn, evaluate_metrics_aggregation_fn=evaluate_metrics_aggregation_fn, ) else: return FedEnsemble( num_rounds=num_rounds, exp_id=exp_id, fraction_fit=fraction_fit, fraction_evaluate=fraction_evaluate, min_fit_clients=min_fit_clients, min_evaluate_clients=min_evaluate_clients, min_available_clients=min_available_clients, evaluate_fn=evaluate_fn, on_fit_config_fn=on_fit_config_fn, on_evaluate_config_fn=on_evaluate_config_fn, accept_failures=accept_failures, initial_parameters=initial_parameters, fit_metrics_aggregation_fn=fit_metrics_aggregation_fn, evaluate_metrics_aggregation_fn=evaluate_metrics_aggregation_fn, problem_type=problem_type ) def get_loss(problem_type: str, metrics): if (problem_type == 'classification'): loss = float(metrics['Log Loss']) elif (problem_type == 'regression'): loss = float(metrics['MAE']) return loss