auto-fl-fit / pycaret-fl / utils.py
utils.py
Raw
import numpy as np

from flwr.common import (
    NDArray, NDArrays, Parameters, FitRes, MetricsAggregationFn, NDArrays, Scalar)
from flwr.common.typing import NDArray, NDArrays, Parameters

from io import BytesIO
from typing import Callable, Dict, List, Optional, Tuple, Union

import pickle
import pandas as pd

from strategy.customfedavg import CustomFedAvg
from strategy.fedensemble import FedEnsemble

from imblearn.over_sampling import SMOTE

def get_model_type(model) -> str:
    return model.__class__.__name__


def string_to_integers(string):
    return [ord(char) for char in string]


def integers_to_string(integers):
    return ''.join([chr(int(integer)) for integer in integers])


supported_models = ["lr", "rf", "xgboost", "dt"]
setup_params = {
    "verbose": True,
    "session_id": 42,
    "transformation": False,
    "html": False,
    "ignore_features": ["date", "Unnamed: 0"],
    # "fix_imbalance": True,
    # "fix_imbalance_method": "smote",
}
create_model_params = {
    "cross_validation": False,
    "verbose": False,
}

def get_model_parameters(model) -> NDArrays:
    """Returns the parameters of a sklearn model."""
    tp = get_model_type(model)
    attrs = []
    blacklist = ["feature_names_in_"]
    # if tp == "LogisticRegression" or tp == 'RidgeClassifier' or "LinearRegression":
    #     attrs = ["coef_", "intercept_", "classes_"]
    # if tp == "RandomForestClassifier":
    #     attrs = ["n_estimators", "max_depth",
    #              "min_samples_split", "min_samples_leaf"]
    # if tp == "XGBClassifier":
    #     attrs = ["n_estimators", "max_depth",
    #              "learning_rate", "subsample", "colsample_bytree", "subsample", "gamma"]
    # if attrs == []:
    #     raise Exception("Model type not supported")

    attrs = [v for v in vars(model)
             if v.endswith("_") or v.startswith("_")]
    attrs = [i for i in attrs if i not in blacklist]

    params = []
    for attr in attrs:
        if hasattr(model, attr):
            k = string_to_integers(attr)
            v = getattr(model, attr)
            if v is None:
                continue
            params.append(k)
            params.append(v)

    # print("Params: ", params)
    return params


def set_model_params(model, params: NDArrays):
    """Sets the parameters of a sklearn model."""
    for i in range(0, len(params), 2):
        k, v = params[i], params[i+1]
        k = integers_to_string(k)
        setattr(model, k, v)
    return model


def set_initial_params(model):
    """Sets initial parameters as zeros Required since model params are uninitialized
    until model.fit is called.

    But server asks for initial parameters from clients at launch. Refer to
    sklearn.linear_model.LogisticRegression documentation for more information.
    """
    n_classes = 2  # MNIST has 10 classes
    n_features = 8  # Number of features in dataset
    model.classes_ = np.array([i for i in range(n_classes)])
    model.coef_ = np.zeros((n_classes, n_features))
    if model.fit_intercept:
        model.intercept_ = np.zeros((n_classes,))
    return model
    model.n_estimators = 100
    model.max_depth = 40,
    model.min_samples_split = 2,
    model.min_samples_leaf = 1,
    return model


def get_metrics(df, problem_type="classification") -> Dict:
    # take from the first row
    results = df.iloc[0].to_dict()
    results.pop('Model', None)
    return results


def ndarrays_to_custom_parameters(ndarrays: NDArrays) -> Parameters:
    """Convert NumPy ndarrays to parameters object."""
    tensors = [ndarray_to_bytes(ndarray) for ndarray in ndarrays]
    return Parameters(tensors=tensors, tensor_type="numpy.ndarray")


def custom_parameters_to_ndarrays(parameters: Parameters) -> NDArrays:
    """Convert parameters object to NumPy ndarrays."""
    return [bytes_to_ndarray(tensor) for tensor in parameters.tensors]


def ndarray_to_bytes(ndarray: NDArray) -> bytes:
    """Serialize NumPy ndarray to bytes."""
    # bytes_io = BytesIO()
    # WARNING: NEVER set allow_pickle to true.
    # Reason: loading pickled data can execute arbitrary code
    # Source: https://numpy.org/doc/stable/reference/generated/numpy.save.html
    # np.save(bytes_io, ndarray, allow_pickle=True)
    # return bytes_io.getvalue()
    return pickle.dumps(ndarray)


def bytes_to_ndarray(tensor: bytes) -> NDArray:
    """Deserialize NumPy ndarray from bytes."""
    # bytes_io = BytesIO(tensor)
    # WARNING: NEVER set allow_pickle to true.
    # Reason: loading pickled data can execute arbitrary code
    # Source: https://numpy.org/doc/stable/reference/generated/numpy.load.html
    # ndarray_deserialized = np.load(bytes_io, allow_pickle=True)
    # return cast(NDArray, ndarray_deserialized)
    return pickle.loads(tensor)


def data_partitioning(df, N_CLIENTS, partition_id):
    # group the data by the last column and return df
    split_size = len(df) // N_CLIENTS
    df = [df.iloc[i:i+split_size].reset_index(drop=True)
          for i in range(0, len(df), split_size)][partition_id]
    return df


def get_target_column_name(df, target=-1):
    if target.lstrip('-+').isnumeric():
        return df.columns[int(target)]
    else:
        return target


def get_smote_data(_real_data, _target_column):
    sm = SMOTE(random_state=42)

    X = _real_data.drop(_target_column, axis=1)
    y = _real_data[_target_column]

    X_res, y_res = sm.fit_resample(X, y)

    synthetic_data = pd.concat([X_res, y_res], axis=1)

    return synthetic_data

def stratified_partition_with_all_values(df, n_partitions, partition_id, target=-1):
    # Group the data by the column of interest
    target = get_target_column_name(df, target)
    grouped = df.groupby(target)

    # Initialize empty list to store partitions
    partitions = [[] for _ in range(n_partitions)]

    # Iterate over groups
    for name, group in grouped:
        # Randomly shuffle the data within the group
        # group = group.sample(frac=1).reset_index(drop=True)

        # Calculate the number of samples in each partition for this group
        samples_per_partition = len(group) // n_partitions

        # Distribute the data evenly among partitions, ensuring each partition has all values
        for i in range(n_partitions):
            start_idx = i * samples_per_partition
            end_idx = (i + 1) * samples_per_partition
            if i == n_partitions - 1:
                end_idx = None  # Include remaining samples in the last partition
            partition_data = group.iloc[start_idx:end_idx]
            partitions[i].append(partition_data)

    # Concatenate data frames in each partition
    partitions = [pd.concat(partition) for partition in partitions]

    # reset index
    df = partitions[partition_id].reset_index(drop=True)

    # get smote
    smote_data = get_smote_data(df, target)

    return smote_data


def load_data(path: str, encode_target=True, target_column=-1):
    # Load data
    # from pycaret.datasets import get_data
    # return get_data('diabetes')
    import pandas as pd
    # df = pd.read_csv('../../data/aw_fb/aw_fb_data.csv')
    # df = df.drop(['Unnamed: 0', 'X1'], axis=1)
    # df_aw = df[df['device'] == 'apple watch']
    # df_fb = df[df['device'] == 'fitbit']
    # df_aw = df_aw.drop('device', axis=1)
    # df_fb = df_fb.drop('device', axis=1)
    # df = df_aw
    df = pd.read_csv(path)
    if encode_target:
        from sklearn.preprocessing import LabelEncoder
        le = LabelEncoder()
        # encode target column
        target = get_target_column_name(df, target_column)
        df[target] = le.fit_transform(df[target])
    return df


def get_strategy(model_name: str, num_rounds: int, exp_id: str, problem_type: str, fraction_fit: float = 1.0, fraction_evaluate: float = 1.0, min_fit_clients: int = 2,
                 min_evaluate_clients: int = 2,
                 min_available_clients: int = 2, evaluate_fn: Optional[
                     Callable[
                         [int, NDArrays, Dict[str, Scalar]],
                         Optional[Tuple[float, Dict[str, Scalar]]],
                     ]
                 ] = None,  on_fit_config_fn: Optional[Callable[[int], Dict[str, Scalar]]] = None,
                 on_evaluate_config_fn: Optional[Callable[[
                     int], Dict[str, Scalar]]] = None,
                 accept_failures: bool = True,
                 initial_parameters: Optional[Parameters] = None,
                 fit_metrics_aggregation_fn: Optional[MetricsAggregationFn] = None,
                 evaluate_metrics_aggregation_fn: Optional[MetricsAggregationFn] = None,):

    coef_models = ['lr']

    if model_name in coef_models:
        return CustomFedAvg(
            num_rounds=num_rounds,
            exp_id=exp_id,
            fraction_fit=fraction_fit,
            fraction_evaluate=fraction_evaluate,
            min_fit_clients=min_fit_clients,
            min_evaluate_clients=min_evaluate_clients,
            min_available_clients=min_available_clients,
            evaluate_fn=evaluate_fn,
            on_fit_config_fn=on_fit_config_fn,
            on_evaluate_config_fn=on_evaluate_config_fn,
            accept_failures=accept_failures,
            initial_parameters=initial_parameters,
            fit_metrics_aggregation_fn=fit_metrics_aggregation_fn,
            evaluate_metrics_aggregation_fn=evaluate_metrics_aggregation_fn,
        )
    else:
        return FedEnsemble(
            num_rounds=num_rounds,
            exp_id=exp_id,
            fraction_fit=fraction_fit,
            fraction_evaluate=fraction_evaluate,
            min_fit_clients=min_fit_clients,
            min_evaluate_clients=min_evaluate_clients,
            min_available_clients=min_available_clients,
            evaluate_fn=evaluate_fn,
            on_fit_config_fn=on_fit_config_fn,
            on_evaluate_config_fn=on_evaluate_config_fn,
            accept_failures=accept_failures,
            initial_parameters=initial_parameters,
            fit_metrics_aggregation_fn=fit_metrics_aggregation_fn,
            evaluate_metrics_aggregation_fn=evaluate_metrics_aggregation_fn,
            problem_type=problem_type
        )


def get_loss(problem_type: str, metrics):
    if (problem_type == 'classification'):
        loss = float(metrics['Log Loss'])
    elif (problem_type == 'regression'):
        loss = float(metrics['MAE'])
    return loss