"""
data_download_openmeteo.py
Downloads hourly weather data for 12 locations around Newcastle upon Tyne
from the Open-Meteo Historical Weather API (https://open-meteo.com/).
No API key required.
Output: one parquet file per location in dataset/processed/,
matching the format expected by client_node.py:
Columns: Timestamp, Temperature, Humidity, Pressure, Wind Speed, Rain, Sensor_Name
"""
import time
import requests
import pandas as pd
from pathlib import Path
import os
import sys
# Use shared config
from src.shared.common import cfg, project_root
from datetime import datetime
# --- Configuration ---
dd_cfg = cfg.get("data_download", {})
# Read dates from config, default to 2025-01-01 to today if missing
try:
start_dt = datetime.fromisoformat(dd_cfg.get("start_date", "2025-01-01T00:00:00"))
START_DATE = start_dt.strftime("%Y-%m-%d")
except (ValueError, TypeError):
START_DATE = "2025-01-01"
if dd_cfg.get("end_date"):
try:
end_dt = datetime.fromisoformat(dd_cfg["end_date"])
END_DATE = end_dt.strftime("%Y-%m-%d")
except (ValueError, TypeError):
END_DATE = datetime.now().strftime("%Y-%m-%d")
else:
END_DATE = datetime.now().strftime("%Y-%m-%d")
OUT_DIR = Path(project_root) / dd_cfg.get("raw_out_dir", "dataset") / "processed"
HOLDOUT_DIR = Path(project_root) / dd_cfg.get("raw_out_dir", "dataset") / "holdout"
# Open-Meteo API endpoint for historical weather re-analysis (ERA5)
API_URL = "https://archive-api.open-meteo.com/v1/archive"
# 11 training locations (north bank of the Tyne) — one file per federated client
LOCATIONS = [
{"name": "NCL_CITY_CENTRE", "lat": 54.978, "lon": -1.617},
{"name": "NCL_JESMOND", "lat": 54.988, "lon": -1.602},
{"name": "NCL_GOSFORTH", "lat": 55.001, "lon": -1.616},
{"name": "NCL_WALLSEND", "lat": 55.000, "lon": -1.534},
{"name": "NCL_BYKER", "lat": 54.972, "lon": -1.574},
{"name": "NCL_HEATON", "lat": 54.980, "lon": -1.570},
{"name": "NCL_FENHAM", "lat": 54.984, "lon": -1.645},
{"name": "NCL_WALKER", "lat": 54.974, "lon": -1.551},
{"name": "NCL_BLAYDON", "lat": 54.966, "lon": -1.712},
{"name": "NCL_SCOTSWOOD", "lat": 54.970, "lon": -1.654},
{"name": "NCL_BENWELL", "lat": 54.975, "lon": -1.638},
]
# Holdout location — south bank of the Tyne (Gateshead borough), geographically
# isolated from all training sites by the River Tyne. Never seen during training;
# used only for post-training cross-location generalisation evaluation.
HOLDOUT_LOCATIONS = [
{"name": "NCL_GATESHEAD", "lat": 54.962, "lon": -1.601},
]
# Open-Meteo variable mapping → our column names
VARIABLE_MAP = {
"temperature_2m": "Temperature",
"relative_humidity_2m": "Humidity",
"surface_pressure": "Pressure",
"wind_speed_10m": "Wind Speed",
"rain": "Rain",
}
def fetch_location(loc: dict) -> pd.DataFrame | None:
"""Fetch hourly weather for one location from Open-Meteo."""
params = {
"latitude": loc["lat"],
"longitude": loc["lon"],
"start_date": START_DATE,
"end_date": END_DATE,
"hourly": list(VARIABLE_MAP.keys()),
"timezone": "Europe/London",
}
resp = requests.get(API_URL, params=params, timeout=30)
resp.raise_for_status()
data = resp.json()
hourly = data["hourly"]
df = pd.DataFrame({
"Timestamp": pd.to_datetime(hourly["time"]),
"Temperature": hourly["temperature_2m"],
"Humidity": hourly["relative_humidity_2m"],
"Pressure": hourly["surface_pressure"],
"Wind Speed": hourly["wind_speed_10m"],
"Rain": hourly["rain"],
"Sensor_Name": loc["name"],
})
# Drop rows where any core feature is missing
df = df.dropna(subset=["Temperature", "Humidity", "Pressure", "Rain"])
# Rain: fill remaining NaN with 0 (no rain = 0mm)
df["Rain"] = df["Rain"].fillna(0.0)
df["Wind Speed"] = df["Wind Speed"].fillna(0.0)
return df
def _download_locations(locations: list[dict], out_dir: Path, delay_sec: float = 3.0) -> int:
"""Download a list of locations into out_dir. Returns count of successes.
Skips locations whose parquet file already exists (resume-safe).
Waits delay_sec between requests to avoid 429 rate limiting.
"""
out_dir.mkdir(parents=True, exist_ok=True)
success = 0
for i, loc in enumerate(locations):
out_path = out_dir / f"{loc['name']}.parquet"
if out_path.exists():
print(f" [{loc['name']}] Skipping (already downloaded)")
success += 1
continue
if i > 0:
time.sleep(delay_sec)
print(f" [{loc['name']}] Fetching...", end=" ", flush=True)
try:
df = fetch_location(loc)
df.to_parquet(out_path, index=False)
rain_rows = (df["Rain"] > 0).sum()
print(f"✓ {len(df):,} rows | Rain: {rain_rows:,} samples ({100*rain_rows/len(df):.1f}%)")
success += 1
except Exception as e:
print(f"✗ FAILED: {e}")
return success
def main():
print(f"Downloading Open-Meteo data")
print(f"Period: {START_DATE} → {END_DATE}\n")
print(f"── Training locations ({len(LOCATIONS)}) → {OUT_DIR}")
n_train = _download_locations(LOCATIONS, OUT_DIR)
print(f"\n── Holdout locations ({len(HOLDOUT_LOCATIONS)}) → {HOLDOUT_DIR}")
n_holdout = _download_locations(HOLDOUT_LOCATIONS, HOLDOUT_DIR)
total = len(LOCATIONS) + len(HOLDOUT_LOCATIONS)
print(f"\n✅ Done. {n_train + n_holdout}/{total} locations saved.")
if __name__ == "__main__":
main()