import os import threading import pandas as pd from src.shared.common import cfg, project_root class ServerReporter: def __init__(self, *, session_id: str, session_dir: str | None = None): self.server_logs = [] self._lock = threading.Lock() self.total_records = 0 if session_dir: self.log_dir = session_dir else: self.log_dir = os.path.join(project_root, "results", session_id) os.makedirs(self.log_dir, exist_ok=True) self.log_file = os.path.join(self.log_dir, f"server_log_{session_id}.csv") self.flush_interval = max(1, int(cfg.get("server", {}).get("log_flush_interval", 100))) print(f"[SERVER] Server log path: {self.log_file}") print(f"[SERVER] Server log flush interval: {self.flush_interval} records") def record(self, log_entry: dict) -> None: if not log_entry.get("is_training", 1): return batch = None with self._lock: self.server_logs.append(log_entry) if len(self.server_logs) >= self.flush_interval: batch = self.server_logs self.server_logs = [] if batch: self._flush_batch(batch) def flush(self) -> None: batch = None with self._lock: if self.server_logs: batch = self.server_logs self.server_logs = [] if batch: self._flush_batch(batch) def _flush_batch(self, batch: list[dict]) -> None: if not batch: return try: df = pd.DataFrame(batch) file_exists = os.path.exists(self.log_file) df.to_csv( self.log_file, mode="a", header=not file_exists, index=False, ) self.total_records += len(batch) print( f"[SERVER LOG] Appended {len(batch)} records (total={self.total_records}) " f"to {self.log_file}" ) except Exception: # Keep records for retry on next flush if a transient write error occurs. with self._lock: self.server_logs = batch + self.server_logs raise