csc8114 / code / src / server / reporting.py
reporting.py
Raw
import os
import threading

import pandas as pd

from src.shared.common import cfg, project_root


class ServerReporter:
    def __init__(self, *, session_id: str, session_dir: str | None = None):
        self.server_logs = []
        self._lock = threading.Lock()
        self.total_records = 0
        if session_dir:
            self.log_dir = session_dir
        else:
            self.log_dir = os.path.join(project_root, "results", session_id)
            os.makedirs(self.log_dir, exist_ok=True)
            
        self.log_file = os.path.join(self.log_dir, f"server_log_{session_id}.csv")
        self.flush_interval = max(1, int(cfg.get("server", {}).get("log_flush_interval", 100)))
        print(f"[SERVER] Server log path: {self.log_file}")
        print(f"[SERVER] Server log flush interval: {self.flush_interval} records")

    def record(self, log_entry: dict) -> None:
        if not log_entry.get("is_training", 1):
            return
        batch = None
        with self._lock:
            self.server_logs.append(log_entry)
            if len(self.server_logs) >= self.flush_interval:
                batch = self.server_logs
                self.server_logs = []
        if batch:
            self._flush_batch(batch)

    def flush(self) -> None:
        batch = None
        with self._lock:
            if self.server_logs:
                batch = self.server_logs
                self.server_logs = []
        if batch:
            self._flush_batch(batch)

    def _flush_batch(self, batch: list[dict]) -> None:
        if not batch:
            return
        try:
            df = pd.DataFrame(batch)
            file_exists = os.path.exists(self.log_file)
            df.to_csv(
                self.log_file,
                mode="a",
                header=not file_exists,
                index=False,
            )
            self.total_records += len(batch)
            print(
                f"[SERVER LOG] Appended {len(batch)} records (total={self.total_records}) "
                f"to {self.log_file}"
            )
        except Exception:
            # Keep records for retry on next flush if a transient write error occurs.
            with self._lock:
                self.server_logs = batch + self.server_logs
            raise