""" data_download_openmeteo.py Downloads hourly weather data for 12 locations around Newcastle upon Tyne from the Open-Meteo Historical Weather API (https://open-meteo.com/). No API key required. Output: one parquet file per location in dataset/processed/, matching the format expected by client_node.py: Columns: Timestamp, Temperature, Humidity, Pressure, Wind Speed, Rain, Sensor_Name """ import time import requests import pandas as pd from pathlib import Path import os import sys # Use shared config from src.shared.common import cfg, project_root from datetime import datetime # --- Configuration --- dd_cfg = cfg.get("data_download", {}) # Read dates from config, default to 2025-01-01 to today if missing try: start_dt = datetime.fromisoformat(dd_cfg.get("start_date", "2025-01-01T00:00:00")) START_DATE = start_dt.strftime("%Y-%m-%d") except (ValueError, TypeError): START_DATE = "2025-01-01" if dd_cfg.get("end_date"): try: end_dt = datetime.fromisoformat(dd_cfg["end_date"]) END_DATE = end_dt.strftime("%Y-%m-%d") except (ValueError, TypeError): END_DATE = datetime.now().strftime("%Y-%m-%d") else: END_DATE = datetime.now().strftime("%Y-%m-%d") OUT_DIR = Path(project_root) / dd_cfg.get("raw_out_dir", "dataset") / "processed" HOLDOUT_DIR = Path(project_root) / dd_cfg.get("raw_out_dir", "dataset") / "holdout" # Open-Meteo API endpoint for historical weather re-analysis (ERA5) API_URL = "https://archive-api.open-meteo.com/v1/archive" # 11 training locations (north bank of the Tyne) — one file per federated client LOCATIONS = [ {"name": "NCL_CITY_CENTRE", "lat": 54.978, "lon": -1.617}, {"name": "NCL_JESMOND", "lat": 54.988, "lon": -1.602}, {"name": "NCL_GOSFORTH", "lat": 55.001, "lon": -1.616}, {"name": "NCL_WALLSEND", "lat": 55.000, "lon": -1.534}, {"name": "NCL_BYKER", "lat": 54.972, "lon": -1.574}, {"name": "NCL_HEATON", "lat": 54.980, "lon": -1.570}, {"name": "NCL_FENHAM", "lat": 54.984, "lon": -1.645}, {"name": "NCL_WALKER", "lat": 54.974, "lon": -1.551}, {"name": "NCL_BLAYDON", "lat": 54.966, "lon": -1.712}, {"name": "NCL_SCOTSWOOD", "lat": 54.970, "lon": -1.654}, {"name": "NCL_BENWELL", "lat": 54.975, "lon": -1.638}, ] # Holdout location — south bank of the Tyne (Gateshead borough), geographically # isolated from all training sites by the River Tyne. Never seen during training; # used only for post-training cross-location generalisation evaluation. HOLDOUT_LOCATIONS = [ {"name": "NCL_GATESHEAD", "lat": 54.962, "lon": -1.601}, ] # Open-Meteo variable mapping → our column names VARIABLE_MAP = { "temperature_2m": "Temperature", "relative_humidity_2m": "Humidity", "surface_pressure": "Pressure", "wind_speed_10m": "Wind Speed", "rain": "Rain", } def fetch_location(loc: dict) -> pd.DataFrame | None: """Fetch hourly weather for one location from Open-Meteo.""" params = { "latitude": loc["lat"], "longitude": loc["lon"], "start_date": START_DATE, "end_date": END_DATE, "hourly": list(VARIABLE_MAP.keys()), "timezone": "Europe/London", } resp = requests.get(API_URL, params=params, timeout=30) resp.raise_for_status() data = resp.json() hourly = data["hourly"] df = pd.DataFrame({ "Timestamp": pd.to_datetime(hourly["time"]), "Temperature": hourly["temperature_2m"], "Humidity": hourly["relative_humidity_2m"], "Pressure": hourly["surface_pressure"], "Wind Speed": hourly["wind_speed_10m"], "Rain": hourly["rain"], "Sensor_Name": loc["name"], }) # Drop rows where any core feature is missing df = df.dropna(subset=["Temperature", "Humidity", "Pressure", "Rain"]) # Rain: fill remaining NaN with 0 (no rain = 0mm) df["Rain"] = df["Rain"].fillna(0.0) df["Wind Speed"] = df["Wind Speed"].fillna(0.0) return df def _download_locations(locations: list[dict], out_dir: Path, delay_sec: float = 3.0) -> int: """Download a list of locations into out_dir. Returns count of successes. Skips locations whose parquet file already exists (resume-safe). Waits delay_sec between requests to avoid 429 rate limiting. """ out_dir.mkdir(parents=True, exist_ok=True) success = 0 for i, loc in enumerate(locations): out_path = out_dir / f"{loc['name']}.parquet" if out_path.exists(): print(f" [{loc['name']}] Skipping (already downloaded)") success += 1 continue if i > 0: time.sleep(delay_sec) print(f" [{loc['name']}] Fetching...", end=" ", flush=True) try: df = fetch_location(loc) df.to_parquet(out_path, index=False) rain_rows = (df["Rain"] > 0).sum() print(f"✓ {len(df):,} rows | Rain: {rain_rows:,} samples ({100*rain_rows/len(df):.1f}%)") success += 1 except Exception as e: print(f"✗ FAILED: {e}") return success def main(): print(f"Downloading Open-Meteo data") print(f"Period: {START_DATE} → {END_DATE}\n") print(f"── Training locations ({len(LOCATIONS)}) → {OUT_DIR}") n_train = _download_locations(LOCATIONS, OUT_DIR) print(f"\n── Holdout locations ({len(HOLDOUT_LOCATIONS)}) → {HOLDOUT_DIR}") n_holdout = _download_locations(HOLDOUT_LOCATIONS, HOLDOUT_DIR) total = len(LOCATIONS) + len(HOLDOUT_LOCATIONS) print(f"\n✅ Done. {n_train + n_holdout}/{total} locations saved.") if __name__ == "__main__": main()