import json import os import time import pandas as pd from src.shared.common import cfg, project_root from src.shared.config_artifacts import build_config_ref, build_minimal_config_snapshot, resolve_config_snapshot_policy from src.shared.targets import is_rain def summarize_logs(experimental_logs: list[dict]) -> tuple[float, float]: num_logs = len(experimental_logs) total_latency = sum(float(log["LatencyMs"]) for log in experimental_logs) total_bytes = sum(float(log["PayloadBytes"]) for log in experimental_logs) avg_latency = total_latency / num_logs if num_logs > 0 else 0.0 avg_bytes = total_bytes / num_logs if num_logs > 0 else 0.0 return avg_latency, avg_bytes def summarize_phase(logs: list[dict], phase: str) -> dict[str, float]: """Compute compact per-phase metrics for client console summaries.""" phase_logs = [log for log in logs if log.get("Status") == phase] steps = len(phase_logs) if steps == 0: return { "steps": 0, "avg_loss": float("nan"), "rain_acc": float("nan"), "avg_cls_loss": float("nan"), "avg_reg_loss": float("nan"), } return { "steps": steps, "avg_loss": sum(float(log["Loss"]) for log in phase_logs) / steps, "rain_acc": sum( int(is_rain(float(log["Target"])) == is_rain(float(log["Prediction"]))) for log in phase_logs ) / steps, "avg_cls_loss": sum(float(log.get("ClassificationLoss", 0.0)) for log in phase_logs) / steps, "avg_reg_loss": sum(float(log.get("RegressionLoss", 0.0)) for log in phase_logs) / steps, } def save_results( client_id: int, experimental_logs: list, session_id: str, *, best_model_path: str | None = None, best_test_loss: float | None = None, avg_latency: float | None = None, avg_bytes: float | None = None, avg_cpu: float | None = None, avg_mem: float | None = None, total_runtime_s: float | None = None, model_size_bytes: int | None = None, net_sent_mb: float | None = None, net_recv_mb: float | None = None, mem_peak_mb: float | None = None, sync_bytes_sent_mb: float | None = None, sync_bytes_recv_mb: float | None = None, actual_seed: int | None = None, ) -> None: scenario_id = os.environ.get("SCENARIO_ID") if scenario_id: output_dir = os.path.join(project_root, "results", session_id, scenario_id) else: output_dir = os.path.join(project_root, "results", session_id) os.makedirs(output_dir, exist_ok=True) log_df = pd.DataFrame(experimental_logs) timestamp = time.strftime("%Y%m%d_%H%M%S") filename = f"training_log_client{client_id}_{timestamp}.csv" filepath = os.path.join(output_dir, filename) log_df.to_csv(filepath, index=False) print(f"[CLIENT] Saved training log to {filepath}") meta = { "timestamp": timestamp, "csv": filename, "num_records": len(experimental_logs), "best_model_path": best_model_path, "best_test_loss": best_test_loss, "avg_latency_ms": avg_latency, "avg_payload_bytes": avg_bytes, "avg_cpu_percent": avg_cpu, "avg_mem_percent": avg_mem, "total_runtime_s": total_runtime_s, "model_size_bytes": model_size_bytes, "net_sent_mb": net_sent_mb, "net_recv_mb": net_recv_mb, "mem_peak_mb": mem_peak_mb, "sync_bytes_sent_mb": sync_bytes_sent_mb, "sync_bytes_recv_mb": sync_bytes_recv_mb, "actual_seed": actual_seed, "profiler_enabled": cfg.get("profiler", {}).get("enabled", True), "scheduler_enabled": cfg.get("scheduler", {}).get("enabled", True), "config_snapshot_policy": resolve_config_snapshot_policy(), "config_ref": build_config_ref(), "config_minimal": build_minimal_config_snapshot(), } meta_path = filepath.replace(".csv", "_meta.json") try: with open(meta_path, "w") as mf: json.dump(meta, mf, indent=2) print(f"[CLIENT] Saved metadata to {meta_path}") except Exception as e: print(f"[CLIENT WARN] Failed to write metadata: {e}") def save_progress( client_id: int, experimental_logs: list, session_id: str, *, epoch: int, best_model_path: str | None = None, best_test_loss: float | None = None, avg_latency: float | None = None, avg_bytes: float | None = None, model_size_bytes: int | None = None, actual_seed: int | None = None, ) -> None: """ Persist rolling progress to deterministic filenames so partial runs are recoverable. The file is overwritten each call and contains all logs collected so far. """ scenario_id = os.environ.get("SCENARIO_ID") if scenario_id: output_dir = os.path.join(project_root, "results", session_id, scenario_id) else: output_dir = os.path.join(project_root, "results", session_id) os.makedirs(output_dir, exist_ok=True) csv_name = f"training_log_client{client_id}_progress.csv" csv_path = os.path.join(output_dir, csv_name) pd.DataFrame(experimental_logs).to_csv(csv_path, index=False) meta = { "epoch": epoch, "csv": csv_name, "num_records": len(experimental_logs), "best_model_path": best_model_path, "best_test_loss": best_test_loss, "avg_latency_ms": avg_latency, "avg_bytes": avg_bytes, "model_size_bytes": model_size_bytes, "actual_seed": actual_seed, "profiler_enabled": cfg.get("profiler", {}).get("enabled", True), "scheduler_enabled": cfg.get("scheduler", {}).get("enabled", True), "config_snapshot_policy": resolve_config_snapshot_policy(), "config_ref": build_config_ref(), "is_partial": True, } meta_path = os.path.join(output_dir, f"training_log_client{client_id}_progress_meta.json") try: with open(meta_path, "w") as mf: json.dump(meta, mf, indent=2) except Exception as e: print(f"[CLIENT WARN] Failed to write progress metadata: {e}") def print_summary( *, client_id: int, epochs: int, num_logs: int, best_test_loss: float, avg_latency: float, avg_bytes: float, best_model_path: str | None, total_runtime_s: float | None = None, avg_steps_per_s: float | None = None, avg_cpu: float | None = None, avg_mem: float | None = None, actual_seed: int | None = None, ) -> None: print("\n" + "=" * 60) print(f"TRAINING COMPLETE: CLIENT {client_id} SUMMARY") print("=" * 60) print(f"[INFO] Total Epochs Completed : {epochs}") print(f"[INFO] Total Forward Passes : {num_logs}") print( f"[INFO] Best Test Loss (MSE) : {best_test_loss:.4f}" if best_test_loss != float("inf") else "[INFO] Best Test Loss (MSE) : N/A" ) print(f"[INFO] Avg Latency per Pass : {avg_latency:.2f} ms") print(f"[INFO] Avg Payload per Pass : {avg_bytes / 1024:.2f} KB ({avg_bytes:.0f} bytes)") if total_runtime_s is not None: print(f"[INFO] Total Runtime : {total_runtime_s:.2f} s") if avg_steps_per_s is not None: print(f"[INFO] Avg Throughput : {avg_steps_per_s:.2f} steps/s") if avg_cpu is not None: print(f"[INFO] Avg CPU Usage : {avg_cpu:.1f} %") if avg_mem is not None: print(f"[INFO] Avg Memory Usage : {avg_mem:.1f} %") if actual_seed is not None: print(f"[INFO] Actual Seed (client) : {actual_seed}") print(f"[INFO] Best Model Checkpoint : {best_model_path}") print("=" * 60 + "\n")