import os
import torch
from torch.utils.data import Dataset, DataLoader
import pandas as pd
import numpy as np
from pathlib import Path
from sklearn.preprocessing import StandardScaler
import glob
class RainfallDataset(Dataset):
"""
A PyTorch Dataset for loading Newcastle Urban Observatory rainfall data.
It takes a list of preprocessed parquet files, applies a sliding window,
and returns (X, y) pairs for training an LSTM.
"""
def __init__(self, file_paths, seq_length=24, pred_horizon=1, scaler=None, is_train=True):
"""
:param file_paths: List of paths to the `.parquet` files for this client
:param seq_length: How many past hours to look at (X)
:param pred_horizon: How many hours ahead to predict (y)
:param scaler: An sklearn StandardScaler instance (fitted on train data only)
"""
self.seq_length = seq_length
self.pred_horizon = pred_horizon
self.is_train = is_train
# Features to use as X (Temperature, Humidity, Pressure, Wind Speed, Rain)
self.feature_cols = ['Temperature', 'Humidity', 'Pressure', 'Wind Speed', 'Rain']
self.target_col = 'Rain'
self.X_data = []
self.y_data = []
self.scaler = scaler
self._load_and_process_files(file_paths)
def _load_and_process_files(self, file_paths):
all_features = []
# 1. Load all assigned sensor files into memory
for path in file_paths:
df = pd.read_parquet(path)
# Sort by timestamp just in case
df = df.sort_values('Timestamp').reset_index(drop=True)
# Extract raw numpy arrays
features = df[self.feature_cols].values
all_features.append(features)
# 2. Fit or apply the Scaler
# If this is the Training dataset, we fit the scaler on ALL its raw data first
if self.is_train and self.scaler is None:
self.scaler = StandardScaler()
# Concatenate all sensor features vertically to learn the global mean/std
stacked_train_data = np.vstack(all_features)
self.scaler.fit(stacked_train_data)
# 3. Create Sliding Windows
for features in all_features:
# Scale the features
scaled_features = self.scaler.transform(features)
# Since Rain is the last column, we know its index.
# But we want 'y' to be unscaled true rainfall values or scaled?
# Typically, target can remain unscaled or scaled. To calculate true MSE
# we often leave target unscaled, but for LSTM stability scaling is better.
# Here we grab the unscaled target for 'y' array so the Loss has real physical meaning.
target_rain_idx = self.feature_cols.index(self.target_col)
unscaled_rain = features[:, target_rain_idx]
# Create pairs
total_time_steps = len(scaled_features)
for i in range(total_time_steps - self.seq_length - self.pred_horizon + 1):
# X: The historical chunk (seq_length, num_features)
x_window = scaled_features[i : i + self.seq_length]
# Y: The future target value
y_val = unscaled_rain[i + self.seq_length + self.pred_horizon - 1]
self.X_data.append(x_window)
self.y_data.append(y_val)
def __len__(self):
return len(self.X_data)
def __getitem__(self, idx):
# Convert to PyTorch floats
x = torch.tensor(self.X_data[idx], dtype=torch.float32)
y = torch.tensor([self.y_data[idx]], dtype=torch.float32)
return x, y
def create_federated_dataloaders(data_dir="dataset/processed",
num_clients=3,
batch_size=64,
seq_length=24,
test_split=0.2):
"""
Distributes available sensor data files among `num_clients`.
Returns a dictionary of DataLoaders for each client.
"""
all_files = sorted(glob.glob(os.path.join(data_dir, "*.parquet")))
if len(all_files) < num_clients:
raise ValueError(f"Not enough sensors ({len(all_files)}) to split among {num_clients} clients.")
# Non-IID Splitting (Simple deterministic split by array slicing)
# E.g., 12 files, 3 clients -> 4 files per client
client_files = np.array_split(all_files, num_clients)
loaders = {}
for client_id, assigned_files in enumerate(client_files):
# Convert back to list
assigned_files = assigned_files.tolist()
# Temporal split for each client:
# Actually, to prevent data peaking, we should split the DataFrames temporally inside
# the dataset, but for simplicity, we treat the first 80% of rows in each file as train.
# Since we load entirely in `RainfallDataset`, passing test splits requires slight modification.
pass # Simplified for clarity, assuming full dataset loading to test first
return client_files
# Simple test block
if __name__ == "__main__":
files = sorted(glob.glob("dataset/processed/*.parquet"))
print(f"Found {len(files)} processed parquet files.")
if len(files) > 0:
# Test just the first sensor
test_dataset = RainfallDataset([files[0]], seq_length=24, is_train=True)
print(f"Dataset length for 1 sensor: {len(test_dataset)} samples.")
X_sample, y_sample = test_dataset[0]
print(f"X shape: {X_sample.shape}")
print(f"y shape: {y_sample.shape}")
test_loader = DataLoader(test_dataset, batch_size=32, shuffle=True)
batch_x, batch_y = next(iter(test_loader))
print(f"Batch X shape: {batch_x.shape}")
print(f"Batch y shape: {batch_y.shape}")