import flwr as fl import utils from sklearn.metrics import log_loss from typing import Dict from pycaret.classification import ClassificationExperiment from pycaret.regression import RegressionExperiment from logging import WARNING, INFO from flwr.common.logger import log import numpy as np import argparse def fit_config(server_round: int) -> Dict: """Send round number to client.""" return {"server_round": server_round, "model_name": MODEL_NAME, "num_rounds": N_ROUNDS} def get_evaluate_fn(model, X_test, y_test): """Return an evaluation function for server-side evaluation.""" # The `evaluate` function will be called after every round def evaluate(server_round, parameters: fl.common.NDArrays, config): # Update model with the latest parameters utils.set_model_params(model, parameters) exp.predict_model(model) # save the model at the last round if server_round == N_ROUNDS: final = exp.finalize_model(model) exp.save_model( final, f"{MODEL_PATH}/fl_{MODEL_NAME}"+("_gen" if "gen_dataset" in data_path else "")) metrics = exp.pull() results = utils.get_metrics(metrics, problem_type=problem_type) loss = utils.get_loss(problem_type=problem_type, metrics=metrics) # save results at the last round as csv if (server_round == N_ROUNDS): import os import pandas as pd if not os.path.exists('./results'): os.makedirs('./results') log(INFO, f"Saving round {server_round} evaluation results...") # add loss column metrics['Model'] = f"Federated {model.__class__.__name__}" metrics.to_csv( f"./results/{exp_id}-results-{MODEL_NAME}.csv", index=False) return loss, results return evaluate # Aggregate metrics and calculate weighted averages def metrics_aggregate(results) -> Dict: if not results: return {} else: total_samples = 0 # Number of samples in the dataset # Collecting metrics aggregated_metrics = {} # Extracting values from the results for samples, metrics in results: for key, value in metrics.items(): if key not in aggregated_metrics: aggregated_metrics[key] = value * samples else: aggregated_metrics[key] += (value * samples) total_samples += samples # Compute the weighted average for each metric for key in aggregated_metrics.keys(): aggregated_metrics[key] = round( aggregated_metrics[key] / total_samples, 6) return aggregated_metrics # Start Flower server for five rounds of federated learning if __name__ == "__main__": parser = argparse.ArgumentParser(description="Flower") parser.add_argument( "--num-clients", type=int, required=True, help="Specifies the number of clients", ) parser.add_argument( "--num-rounds", type=int, required=True, help="Specifies the number of rounds", ) parser.add_argument( "--model-name", type=str, required=True, # choices=utils.supported_models, help="Specifies the model name", ) parser.add_argument( "--data-path", type=str, required=True, help="Specifies the path to the data", ) parser.add_argument( "--problem-type", type=str, required=True, choices=["classification", "regression"], help="Specifies the problem type", default="classification" ) parser.add_argument( "--target", required=False, help="Specifies the target column", default=-1 ) parser.add_argument( "--exp-id", required=False, help="Specifies the experiment id", default="FL_Test" ) parser.add_argument( "--model-path", required=True, help="Specifies the path to the model", ) args = parser.parse_args() N_CLIENTS = args.num_clients N_ROUNDS = args.num_rounds MODEL_NAME = args.model_name problem_type = args.problem_type data_path = args.data_path target = args.target exp_id = args.exp_id MODEL_PATH = args.model_path df = utils.load_data(data_path, target_column=target) target = utils.get_target_column_name(df, target) # train_df = df.groupby(df.columns[-1]).head(2) # df = utils.stratified_partition_with_all_values( # df=df, n_partitions=N_CLIENTS, partition_id=N_CLIENTS - 1, target=target) if (problem_type == 'classification'): exp = ClassificationExperiment() elif (problem_type == 'regression'): exp = RegressionExperiment() exp.setup(data=df, target=target, ** utils.setup_params) if (problem_type == 'classification'): exp.add_metric('logloss', 'Log Loss', log_loss, greater_is_better=False, target="pred_proba") # train only on small portion of the data which would contain all possible target values # we just need it for the initial values of the model model = exp.create_model( MODEL_NAME, train_model=True, cross_validation=False) # model = exp.compare_models( # include=utils.supported_models, cross_validation=False) # utils.set_initial_params(model) X = exp.get_config("X_test") y = exp.get_config("y_test") params = utils.get_model_parameters(model) strategy = utils.get_strategy( model_name=MODEL_NAME, fraction_fit=1.0, # Sample 100% of available clients for training fraction_evaluate=1.0, # Sample 100% of available clients for evaluation min_available_clients=2, # Never sample less than all clients for training # Never sample less than 5 clients for evaluation # min_evaluate_clients=N_CLIENTS, evaluate_fn=get_evaluate_fn(model, X, y), on_fit_config_fn=fit_config, initial_parameters=utils.ndarrays_to_custom_parameters(params), evaluate_metrics_aggregation_fn=metrics_aggregate, fit_metrics_aggregation_fn=metrics_aggregate, num_rounds=N_ROUNDS, exp_id=exp_id, problem_type=problem_type ) # Generate a text file for saving the server log fl.common.logger.configure(identifier=exp_id, filename="log.txt") fl.server.start_server( server_address="0.0.0.0:8091", strategy=strategy, config=fl.server.ServerConfig(num_rounds=N_ROUNDS), )