csc8114 / code / src / client / reporting.py
reporting.py
Raw
import json
import os
import time

import pandas as pd

from src.shared.common import cfg, project_root
from src.shared.config_artifacts import build_config_ref, build_minimal_config_snapshot, resolve_config_snapshot_policy
from src.shared.targets import is_rain


def summarize_logs(experimental_logs: list[dict]) -> tuple[float, float]:
    num_logs = len(experimental_logs)
    total_latency = sum(float(log["LatencyMs"]) for log in experimental_logs)
    total_bytes = sum(float(log["PayloadBytes"]) for log in experimental_logs)
    avg_latency = total_latency / num_logs if num_logs > 0 else 0.0
    avg_bytes = total_bytes / num_logs if num_logs > 0 else 0.0
    return avg_latency, avg_bytes


def summarize_phase(logs: list[dict], phase: str) -> dict[str, float]:
    """Compute compact per-phase metrics for client console summaries."""
    phase_logs = [log for log in logs if log.get("Status") == phase]
    steps = len(phase_logs)
    if steps == 0:
        return {
            "steps": 0,
            "avg_loss": float("nan"),
            "rain_acc": float("nan"),
            "avg_cls_loss": float("nan"),
            "avg_reg_loss": float("nan"),
        }

    return {
        "steps": steps,
        "avg_loss": sum(float(log["Loss"]) for log in phase_logs) / steps,
        "rain_acc": sum(
            int(is_rain(float(log["Target"])) == is_rain(float(log["Prediction"])))
            for log in phase_logs
        ) / steps,
        "avg_cls_loss": sum(float(log.get("ClassificationLoss", 0.0)) for log in phase_logs) / steps,
        "avg_reg_loss": sum(float(log.get("RegressionLoss", 0.0)) for log in phase_logs) / steps,
    }


def save_results(
    client_id: int,
    experimental_logs: list,
    session_id: str,
    *,
    best_model_path: str | None = None,
    best_test_loss: float | None = None,
    avg_latency: float | None = None,
    avg_bytes: float | None = None,
    avg_cpu: float | None = None,
    avg_mem: float | None = None,
    total_runtime_s: float | None = None,
    model_size_bytes: int | None = None,
    net_sent_mb: float | None = None,
    net_recv_mb: float | None = None,
    mem_peak_mb: float | None = None,
    sync_bytes_sent_mb: float | None = None,
    sync_bytes_recv_mb: float | None = None,
    actual_seed: int | None = None,
) -> None:
    scenario_id = os.environ.get("SCENARIO_ID")
    if scenario_id:
        output_dir = os.path.join(project_root, "results", session_id, scenario_id)
    else:
        output_dir = os.path.join(project_root, "results", session_id)
        
    os.makedirs(output_dir, exist_ok=True)

    log_df = pd.DataFrame(experimental_logs)
    timestamp = time.strftime("%Y%m%d_%H%M%S")
    filename = f"training_log_client{client_id}_{timestamp}.csv"
    filepath = os.path.join(output_dir, filename)
    log_df.to_csv(filepath, index=False)
    print(f"[CLIENT] Saved training log to {filepath}")

    meta = {
        "timestamp": timestamp,
        "csv": filename,
        "num_records": len(experimental_logs),
        "best_model_path": best_model_path,
        "best_test_loss": best_test_loss,
        "avg_latency_ms": avg_latency,
        "avg_payload_bytes": avg_bytes,
        "avg_cpu_percent": avg_cpu,
        "avg_mem_percent": avg_mem,
        "total_runtime_s": total_runtime_s,
        "model_size_bytes": model_size_bytes,
        "net_sent_mb": net_sent_mb,
        "net_recv_mb": net_recv_mb,
        "mem_peak_mb": mem_peak_mb,
        "sync_bytes_sent_mb": sync_bytes_sent_mb,
        "sync_bytes_recv_mb": sync_bytes_recv_mb,
        "actual_seed": actual_seed,
        "profiler_enabled": cfg.get("profiler", {}).get("enabled", True),
        "scheduler_enabled": cfg.get("scheduler", {}).get("enabled", True),
        "config_snapshot_policy": resolve_config_snapshot_policy(),
        "config_ref": build_config_ref(),
        "config_minimal": build_minimal_config_snapshot(),
    }
    meta_path = filepath.replace(".csv", "_meta.json")
    try:
        with open(meta_path, "w") as mf:
            json.dump(meta, mf, indent=2)
        print(f"[CLIENT] Saved metadata to {meta_path}")
    except Exception as e:
        print(f"[CLIENT WARN] Failed to write metadata: {e}")


def save_progress(
    client_id: int,
    experimental_logs: list,
    session_id: str,
    *,
    epoch: int,
    best_model_path: str | None = None,
    best_test_loss: float | None = None,
    avg_latency: float | None = None,
    avg_bytes: float | None = None,
    model_size_bytes: int | None = None,
    actual_seed: int | None = None,
) -> None:
    """
    Persist rolling progress to deterministic filenames so partial runs are recoverable.
    The file is overwritten each call and contains all logs collected so far.
    """
    scenario_id = os.environ.get("SCENARIO_ID")
    if scenario_id:
        output_dir = os.path.join(project_root, "results", session_id, scenario_id)
    else:
        output_dir = os.path.join(project_root, "results", session_id)
        
    os.makedirs(output_dir, exist_ok=True)

    csv_name = f"training_log_client{client_id}_progress.csv"
    csv_path = os.path.join(output_dir, csv_name)
    pd.DataFrame(experimental_logs).to_csv(csv_path, index=False)

    meta = {
        "epoch": epoch,
        "csv": csv_name,
        "num_records": len(experimental_logs),
        "best_model_path": best_model_path,
        "best_test_loss": best_test_loss,
        "avg_latency_ms": avg_latency,
        "avg_bytes": avg_bytes,
        "model_size_bytes": model_size_bytes,
        "actual_seed": actual_seed,
        "profiler_enabled": cfg.get("profiler", {}).get("enabled", True),
        "scheduler_enabled": cfg.get("scheduler", {}).get("enabled", True),
        "config_snapshot_policy": resolve_config_snapshot_policy(),
        "config_ref": build_config_ref(),
        "is_partial": True,
    }
    meta_path = os.path.join(output_dir, f"training_log_client{client_id}_progress_meta.json")
    try:
        with open(meta_path, "w") as mf:
            json.dump(meta, mf, indent=2)
    except Exception as e:
        print(f"[CLIENT WARN] Failed to write progress metadata: {e}")


def print_summary(
    *,
    client_id: int,
    epochs: int,
    num_logs: int,
    best_test_loss: float,
    avg_latency: float,
    avg_bytes: float,
    best_model_path: str | None,
    total_runtime_s: float | None = None,
    avg_steps_per_s: float | None = None,
    avg_cpu: float | None = None,
    avg_mem: float | None = None,
    actual_seed: int | None = None,
) -> None:
    print("\n" + "=" * 60)
    print(f"TRAINING COMPLETE: CLIENT {client_id} SUMMARY")
    print("=" * 60)
    print(f"[INFO]  Total Epochs Completed : {epochs}")
    print(f"[INFO]  Total Forward Passes   : {num_logs}")
    print(
        f"[INFO]  Best Test Loss (MSE)   : {best_test_loss:.4f}"
        if best_test_loss != float("inf")
        else "[INFO]  Best Test Loss (MSE)   : N/A"
    )
    print(f"[INFO]  Avg Latency per Pass   : {avg_latency:.2f} ms")
    print(f"[INFO]  Avg Payload per Pass   : {avg_bytes / 1024:.2f} KB ({avg_bytes:.0f} bytes)")
    if total_runtime_s is not None:
        print(f"[INFO]  Total Runtime         : {total_runtime_s:.2f} s")
    if avg_steps_per_s is not None:
        print(f"[INFO]  Avg Throughput        : {avg_steps_per_s:.2f} steps/s")
    if avg_cpu is not None:
        print(f"[INFO]  Avg CPU Usage         : {avg_cpu:.1f} %")
    if avg_mem is not None:
        print(f"[INFO]  Avg Memory Usage      : {avg_mem:.1f} %")
    if actual_seed is not None:
        print(f"[INFO]  Actual Seed (client)  : {actual_seed}")
    print(f"[INFO]  Best Model Checkpoint  : {best_model_path}")
    print("=" * 60 + "\n")