Navigator / src / navigator / epoch / loaders / rinex_loader.py
rinex_loader.py
Raw
"""Load the rinex data to an epoch object."""

import typing as tp
from concurrent.futures import ThreadPoolExecutor
from pathlib import Path

import pandas as pd
import tqdm

from ...epoch.epoch import Epoch
from ...parse.iparse.nav.iparse_gps_nav import IParseGPSNav
from ...parse.iparse.obs.iparse_gps_obs import IParseGPSObs
from .fetchers import fetch_nav_data, fetch_sp3

__all__ = [
    "match_observation_navigation_timestamps",
    "get_sp3_data",
    "get_noon_of_unique_days",
    "from_rinex_dataframes",
    "from_precise_ephemeris",
    "split_dataframe_by_day",
    "from_rinex_files",
]


## Helper functions
def get_sp3_data(
    timestamps: tp.List[pd.Timestamp],
    max_workers: int = 4,
    logging: bool = False,
) -> pd.DataFrame:
    """Get the SP3 data for the timestamps.

    Args:
        timestamps (List[pd.Timestamp]): A list of timestamps.
        max_workers (int, optional): The maximum number of workers. Defaults to 4.
        logging (bool, optional): If True, the logging will be enabled. Defaults to False.

    Returns:
        pd.DataFrame: The SP3 data.

    Raises:
        ValueError: If the SP3 data is not available.
    """
    # Get the noon of the unique days
    unique_days = get_noon_of_unique_days(timestamps)

    # Thread the fetching of the SP3 data
    with ThreadPoolExecutor(max_workers=max_workers) as executor:
        sp3_data = executor.map(fetch_sp3, unique_days, [logging] * len(unique_days))

    return pd.concat([sp3[1] for sp3 in sp3_data], axis=0).sort_index()


def get_noon_of_unique_days(timestamps: tp.List[pd.Timestamp]) -> tp.List[pd.Timestamp]:
    """Get the noon of the unique days in the timestamps.

    Args:
        timestamps (List[pd.Timestamp]): A list of timestamps.

    Returns:
        List[pd.Timestamp]: A list of timestamps at noon of the unique days in the timestamps.
    """
    # Normalize timestamps to get unique days
    unique_days = set([timestamp.normalize() for timestamp in timestamps])

    # Add the noon timestamp of the unique days
    delta = pd.Timedelta(hours=12)

    return [day + delta for day in unique_days]


# TODO: Add the compatibility for other GNSS systems
def match_observation_navigation_timestamps(
    observation_data: pd.DataFrame,
    navigation_data: pd.DataFrame,
    mode: str = "maxsv",
    matching_threshold: pd.Timedelta = pd.Timedelta(hours=3),
) -> list[tuple[pd.Timestamp, pd.Timestamp]]:
    """Matches the observation timestamps to the navigation timestamps based on the mode.

    The dataframes must have a MultiIndex with the levels 'time' and 'sv'.

    Args:
        observation_data (pd.DataFrame): Observation data.
        navigation_data (pd.DataFrame): Navigation data.
        mode (str, optional): Mode to match the timestamps (maxsv | nearest). Defaults to "maxsv".
        matching_threshold (pd.Timedelta, optional): Matching threshold within which the timestamps are matched. Defaults to pd.Timedelta(hours=3).

    Returns:
        list[tuple[pd.Timestamp, pd.Timestamp]]: List of matched timestamps.
    """
    # Get the timestamps of the observations.
    obsTimestamps = observation_data.index.get_level_values("time").unique()

    # Get the timestamps and crossponding number of satellites in the navigation data.
    timeSVMap = {
        time: len(navigation_data.loc[time])
        for time in navigation_data.index.get_level_values("time").unique()
    }

    # Match the timestamps based on the mode.
    matchesTimestamps = []
    for obsTime in obsTimestamps:
        # Get the timestamps within the threshold.
        navTimes = [
            time
            for time in timeSVMap.keys()
            if abs(time - obsTime) <= matching_threshold
        ]

        # If no timestamps are found, skip.
        if len(navTimes) == 0:
            raise ValueError(
                f"No Navigation Data found for {obsTime} within +- 3 hours."
            )

        if mode.lower() == "maxsv":
            # Return the timestamp with the maximum number of satellites.
            matchesTimestamps.append(
                (obsTime, max(navTimes, key=lambda time: timeSVMap[time]))
            )
        elif mode.lower() == "nearest":
            # Return the timestamp with the nearest timestamp.
            matchesTimestamps.append(
                (obsTime, min(navTimes, key=lambda time: abs(time - obsTime)))
            )
        else:
            raise ValueError(f"Mode must be in ['maxsv', 'nearest']. Got {mode}.")

    return matchesTimestamps


def split_dataframe_by_day(df: pd.DataFrame) -> tp.Dict[pd.Timestamp, pd.DataFrame]:
    """Split a multi-indexed DataFrame by unique days, preserving the initial multi-index structure.

    Args:
        df (pd.DataFrame): The DataFrame to split.

    Returns:
        tp.Dict[pd.Timestamp, pd.DataFrame]: A dictionary of DataFrames, where each DataFrame contains data for a single day.
    """
    # Group by day
    grouped = df.groupby(pd.Grouper(level=0, freq="D"))

    frame_map = {}
    # Iterate over groups
    for date, group in grouped:
        # Append each day's DataFrame to the list
        frame_map[date] = group
    return frame_map


## Loaders Functions
# TODO: Add the compatibility for other GNSS systems
def from_rinex_dataframes(
    observation_data: pd.DataFrame,
    observation_metadata: pd.Series,
    navigation_data: pd.DataFrame,
    navigation_metadata: pd.Series,
    station_name: tp.Optional[str] = None,
    matching_mode: str = "maxsv",
    trim: bool = True,
    drop_na: bool = True,
    column_mapper: tp.Optional[tp.List[str]] = None,
    matching_threshold: pd.Timedelta = pd.Timedelta(hours=3),
    profile: dict[str, str | bool] = Epoch.INITIAL,
) -> tp.Iterator[Epoch]:
    """Loads the RINEX data to an epoch object.

    This methods assumes that the observation file is within one unique day.
    If the observation file contains data from multiple days, the split the data
    first and then load the data.

    Args:
        observation_data (pd.DataFrame): The observation data.
        observation_metadata (pd.Series): The observation metadata.
        navigation_data (pd.DataFrame): The navigation data.
        navigation_metadata (pd.Series): The navigation metadata.
        station_name (str, optional): The station name. Defaults to None.
        matching_mode (str, optional): The mode of matching the navigation data.[maxsv | nearest]. Defaults to 'maxsv'.
        trim (bool, optional): Intersect satellite vehicles in observation and navigation data. Defaults to True.
        drop_na (bool, optional): If True, the NaN values will be dropped from relevant columns. Defaults to True.
        column_mapper (tp.List[str], optional): The column mapper. Defaults to None.
        matching_threshold (pd.Timedelta, optional): The matching threshold to match the observation and navigation data. Defaults to pd.Timedelta(hours=3).
        profile (dict[str, str| bool], optional): The profile of the epoch. Defaults to Epoch.INITIAL.
        max_workers (int, optional): The maximum number of workers. Defaults to 4.

    Returns:
        tp.List[Epoch]: A list of epoch objects.
    """
    # Check if the data is empty
    if observation_data.empty:
        raise ValueError(
            "The observation data is empty. Could not parse the observation file."
        )

    # Get the matches timestamps
    matchesTimestamps = match_observation_navigation_timestamps(
        observation_data=observation_data,
        navigation_data=navigation_data,
        mode=matching_mode,
        matching_threshold=matching_threshold,
    )

    # Check if the matches are empty
    epoches = []

    with tqdm.tqdm(matchesTimestamps) as pbar:
        for obsTime, navTime in pbar:
            epoches.append(
                Epoch(
                    timestamp=obsTime,
                    obs_data=observation_data.loc[obsTime],
                    obs_meta=observation_metadata,
                    nav_data=navigation_data.loc[
                        [navTime]
                    ],  # Do not drop the time index here since it is TOC
                    nav_meta=navigation_metadata,
                    trim=trim,
                    purify=drop_na,
                    station=station_name,
                    columns_mapping=column_mapper,
                    profile=profile,
                )
            )

            pbar.set_description(f"Processing {obsTime}")

    yield from epoches


# # TODO: Add the compatibility for other GNSS systems
def from_precise_ephemeris(
    observation_data: pd.DataFrame,
    observation_metadata: pd.Series,
    sp3_data: pd.DataFrame,
    station_name: tp.Optional[str] = None,
    trim: bool = True,
    drop_na: bool = True,
    column_mapper: tp.Optional[tp.List[str]] = None,
) -> tp.Iterator[Epoch]:
    """Loads the observation data and precise ephemeris data to an epoch object.

    Args:
        observation_data (pd.DataFrame): The observation data.
        observation_metadata (pd.Series): The observation metadata.
        sp3_data (pd.DataFrame): The SP3 data.
        station_name (str, optional): The station name. Defaults to None.
        trim (bool, optional): Intersect satellite vehicles in observation and navigation data. Defaults to True.
        drop_na (bool, optional): If True, the NaN values will be dropped from relevant columns. Defaults to True.
        column_mapper (tp.List[str], optional): The column mapper. Defaults to None.

    Returns:
        tp.Iterator[Epoch]: An iterator of epoch objects.
    """
    # Check if the data is empty
    if observation_data.empty:
        raise ValueError(
            "The observation data is empty. Could not parse the observation file."
        )

    # Fragment the observation data
    obsTimes = observation_data.index.get_level_values("time").unique()

    epoches = []

    with tqdm.tqdm(obsTimes) as pbar:
        for t in pbar:
            epoches.append(
                Epoch(
                    timestamp=t,
                    obs_data=observation_data.loc[t],
                    obs_meta=observation_metadata,
                    nav_data=sp3_data,
                    nav_meta=pd.Series(),
                    station=station_name,
                    trim=trim,
                    purify=drop_na,
                    profile=Epoch.SP3,
                    columns_mapping=column_mapper,
                )
            )

            pbar.set_description(f"Processing {t}")

    yield from epoches


## TODO: Add the compatibility for other GNSS systems
def from_rinex_files(
    observation_file: Path,
    navigation_file: Path,
    station_name: tp.Optional[str] = None,
    mode: str = "maxsv",
    trim: bool = True,
    drop_na: bool = True,
    column_mapper: tp.Optional[tp.List[str]] = None,
    matching_threshold: pd.Timedelta = pd.Timedelta(hours=3),
    profile: dict[str, str | bool] = Epoch.INITIAL,
) -> tp.Iterator[Epoch]:
    """Loads the RINEX files to an epoch object.

    Args:
        observation_file (Path): The observation file.
        navigation_file (Path): The navigation file.
        station_name (str, optional): The station name. Defaults to None.
        mode (str, optional): The mode of matching the navigation data.[maxsv | nearest]. Defaults to 'maxsv'.
        trim (bool, optional): Intersect satellite vehicles in observation and navigation data. Defaults to True.
        drop_na (bool, optional): If True, the NaN values will be dropped from relevant columns. Defaults to True.
        column_mapper (tp.List[str], optional): The column mapper. Defaults to None.
        matching_threshold (pd.Timedelta, optional): The matching threshold to match the observation and navigation data. Defaults to pd.Timedelta(hours=3).
        profile (dict[str, str| bool], optional): The profile of the epoch. Defaults to Epoch.INITIAL.

    Returns:
        tp.List[Epoch]: A list of epoch objects.
    """
    # Load the data
    obsParser = IParseGPSObs()
    navParser = IParseGPSNav()

    # Load the observation data
    obsmeta, obsdata = obsParser.parse(observation_file)

    # Load the navigation data
    navmeta, navdata = navParser.parse(navigation_file)

    return from_rinex_dataframes(
        observation_data=obsdata,
        observation_metadata=obsmeta,
        navigation_data=navdata,
        navigation_metadata=navmeta,
        station_name=station_name,
        matching_mode=mode,
        trim=trim,
        drop_na=drop_na,
        column_mapper=column_mapper,
        matching_threshold=matching_threshold,
        profile=profile,
    )


## TODO: Add the compatibility for other GNSS systems
def from_observation_file(
    observation_file: Path,
    station_name: tp.Optional[str] = None,
    mode: str = "maxsv",
    trim: bool = True,
    drop_na: bool = True,
    column_mapper: tp.Optional[tp.List[str]] = None,
    matching_threshold: pd.Timedelta = pd.Timedelta(hours=3),
    profile: dict[str, str | bool] = Epoch.INITIAL,
    logging: bool = False,
    download_station: str = "JPL",
) -> tp.Iterator[Epoch]:
    """Loads the observation file to an epoch object.

    This method will fetch the navigation data from the NASA CDDIS server automatically.

    Args:
        observation_file (Path): The observation file.
        station_name (str, optional): The receiver station name if from IGS station. Defaults to None.
        mode (str, optional): The mode of matching the navigation data.[maxsv | nearest]. Defaults to 'maxsv'.
        trim (bool, optional): Intersect satellite vehicles in observation and navigation data. Defaults to True.
        drop_na (bool, optional): If True, the NaN values will be dropped from relevant columns. Defaults to True.
        column_mapper (tp.List[str], optional): The column mapper. Defaults to None.
        matching_threshold (pd.Timedelta, optional): The matching threshold to match the observation and navigation data. Defaults to pd.Timedelta(hours=3).
        profile (dict[str, str| bool], optional): The profile of the epoch. Defaults to Epoch.INITIAL.
        logging (bool, optional): If True, the logging will be enabled. Defaults to False.
        download_station (str, optional): The station to download the navigation data. Defaults to "JPL".

    Returns:
        tp.List[Epoch]: A list of epoch objects.
    """
    # Get the noon days of the observation file
    obsParser = IParseGPSObs()

    # Load the observation data
    obsmeta, obsdata = obsParser.parse(observation_file)

    # Seperate the data by day
    obsDataFrames = split_dataframe_by_day(obsdata)

    # Download the navigation data for each day
    navDataFrames = {
        key: fetch_nav_data(date=key, logging=logging, station=download_station)
        for key in obsDataFrames.keys()
    }

    # Load the data
    epoches = []

    for key in obsDataFrames.keys():
        epoches.extend(
            list(
                from_rinex_dataframes(
                    observation_data=obsDataFrames[key],
                    observation_metadata=obsmeta,
                    navigation_data=navDataFrames[key][1],
                    navigation_metadata=navDataFrames[key][0],
                    station_name=station_name,
                    matching_mode=mode,
                    trim=trim,
                    drop_na=drop_na,
                    column_mapper=column_mapper,
                    matching_threshold=matching_threshold,
                    profile=profile,
                )
            )
        )

    return epoches