"""Load the rinex data to an epoch object."""
import typing as tp
from concurrent.futures import ThreadPoolExecutor
from pathlib import Path
import pandas as pd
import tqdm
from ...epoch.epoch import Epoch
from ...parse.iparse.nav.iparse_gps_nav import IParseGPSNav
from ...parse.iparse.obs.iparse_gps_obs import IParseGPSObs
from .fetchers import fetch_nav_data, fetch_sp3
__all__ = [
"match_observation_navigation_timestamps",
"get_sp3_data",
"get_noon_of_unique_days",
"from_rinex_dataframes",
"from_precise_ephemeris",
"split_dataframe_by_day",
"from_rinex_files",
]
## Helper functions
def get_sp3_data(
timestamps: tp.List[pd.Timestamp],
max_workers: int = 4,
logging: bool = False,
) -> pd.DataFrame:
"""Get the SP3 data for the timestamps.
Args:
timestamps (List[pd.Timestamp]): A list of timestamps.
max_workers (int, optional): The maximum number of workers. Defaults to 4.
logging (bool, optional): If True, the logging will be enabled. Defaults to False.
Returns:
pd.DataFrame: The SP3 data.
Raises:
ValueError: If the SP3 data is not available.
"""
# Get the noon of the unique days
unique_days = get_noon_of_unique_days(timestamps)
# Thread the fetching of the SP3 data
with ThreadPoolExecutor(max_workers=max_workers) as executor:
sp3_data = executor.map(fetch_sp3, unique_days, [logging] * len(unique_days))
return pd.concat([sp3[1] for sp3 in sp3_data], axis=0).sort_index()
def get_noon_of_unique_days(timestamps: tp.List[pd.Timestamp]) -> tp.List[pd.Timestamp]:
"""Get the noon of the unique days in the timestamps.
Args:
timestamps (List[pd.Timestamp]): A list of timestamps.
Returns:
List[pd.Timestamp]: A list of timestamps at noon of the unique days in the timestamps.
"""
# Normalize timestamps to get unique days
unique_days = set([timestamp.normalize() for timestamp in timestamps])
# Add the noon timestamp of the unique days
delta = pd.Timedelta(hours=12)
return [day + delta for day in unique_days]
# TODO: Add the compatibility for other GNSS systems
def match_observation_navigation_timestamps(
observation_data: pd.DataFrame,
navigation_data: pd.DataFrame,
mode: str = "maxsv",
matching_threshold: pd.Timedelta = pd.Timedelta(hours=3),
) -> list[tuple[pd.Timestamp, pd.Timestamp]]:
"""Matches the observation timestamps to the navigation timestamps based on the mode.
The dataframes must have a MultiIndex with the levels 'time' and 'sv'.
Args:
observation_data (pd.DataFrame): Observation data.
navigation_data (pd.DataFrame): Navigation data.
mode (str, optional): Mode to match the timestamps (maxsv | nearest). Defaults to "maxsv".
matching_threshold (pd.Timedelta, optional): Matching threshold within which the timestamps are matched. Defaults to pd.Timedelta(hours=3).
Returns:
list[tuple[pd.Timestamp, pd.Timestamp]]: List of matched timestamps.
"""
# Get the timestamps of the observations.
obsTimestamps = observation_data.index.get_level_values("time").unique()
# Get the timestamps and crossponding number of satellites in the navigation data.
timeSVMap = {
time: len(navigation_data.loc[time])
for time in navigation_data.index.get_level_values("time").unique()
}
# Match the timestamps based on the mode.
matchesTimestamps = []
for obsTime in obsTimestamps:
# Get the timestamps within the threshold.
navTimes = [
time
for time in timeSVMap.keys()
if abs(time - obsTime) <= matching_threshold
]
# If no timestamps are found, skip.
if len(navTimes) == 0:
raise ValueError(
f"No Navigation Data found for {obsTime} within +- 3 hours."
)
if mode.lower() == "maxsv":
# Return the timestamp with the maximum number of satellites.
matchesTimestamps.append(
(obsTime, max(navTimes, key=lambda time: timeSVMap[time]))
)
elif mode.lower() == "nearest":
# Return the timestamp with the nearest timestamp.
matchesTimestamps.append(
(obsTime, min(navTimes, key=lambda time: abs(time - obsTime)))
)
else:
raise ValueError(f"Mode must be in ['maxsv', 'nearest']. Got {mode}.")
return matchesTimestamps
def split_dataframe_by_day(df: pd.DataFrame) -> tp.Dict[pd.Timestamp, pd.DataFrame]:
"""Split a multi-indexed DataFrame by unique days, preserving the initial multi-index structure.
Args:
df (pd.DataFrame): The DataFrame to split.
Returns:
tp.Dict[pd.Timestamp, pd.DataFrame]: A dictionary of DataFrames, where each DataFrame contains data for a single day.
"""
# Group by day
grouped = df.groupby(pd.Grouper(level=0, freq="D"))
frame_map = {}
# Iterate over groups
for date, group in grouped:
# Append each day's DataFrame to the list
frame_map[date] = group
return frame_map
## Loaders Functions
# TODO: Add the compatibility for other GNSS systems
def from_rinex_dataframes(
observation_data: pd.DataFrame,
observation_metadata: pd.Series,
navigation_data: pd.DataFrame,
navigation_metadata: pd.Series,
station_name: tp.Optional[str] = None,
matching_mode: str = "maxsv",
trim: bool = True,
drop_na: bool = True,
column_mapper: tp.Optional[tp.List[str]] = None,
matching_threshold: pd.Timedelta = pd.Timedelta(hours=3),
profile: dict[str, str | bool] = Epoch.INITIAL,
) -> tp.Iterator[Epoch]:
"""Loads the RINEX data to an epoch object.
This methods assumes that the observation file is within one unique day.
If the observation file contains data from multiple days, the split the data
first and then load the data.
Args:
observation_data (pd.DataFrame): The observation data.
observation_metadata (pd.Series): The observation metadata.
navigation_data (pd.DataFrame): The navigation data.
navigation_metadata (pd.Series): The navigation metadata.
station_name (str, optional): The station name. Defaults to None.
matching_mode (str, optional): The mode of matching the navigation data.[maxsv | nearest]. Defaults to 'maxsv'.
trim (bool, optional): Intersect satellite vehicles in observation and navigation data. Defaults to True.
drop_na (bool, optional): If True, the NaN values will be dropped from relevant columns. Defaults to True.
column_mapper (tp.List[str], optional): The column mapper. Defaults to None.
matching_threshold (pd.Timedelta, optional): The matching threshold to match the observation and navigation data. Defaults to pd.Timedelta(hours=3).
profile (dict[str, str| bool], optional): The profile of the epoch. Defaults to Epoch.INITIAL.
max_workers (int, optional): The maximum number of workers. Defaults to 4.
Returns:
tp.List[Epoch]: A list of epoch objects.
"""
# Check if the data is empty
if observation_data.empty:
raise ValueError(
"The observation data is empty. Could not parse the observation file."
)
# Get the matches timestamps
matchesTimestamps = match_observation_navigation_timestamps(
observation_data=observation_data,
navigation_data=navigation_data,
mode=matching_mode,
matching_threshold=matching_threshold,
)
# Check if the matches are empty
epoches = []
with tqdm.tqdm(matchesTimestamps) as pbar:
for obsTime, navTime in pbar:
epoches.append(
Epoch(
timestamp=obsTime,
obs_data=observation_data.loc[obsTime],
obs_meta=observation_metadata,
nav_data=navigation_data.loc[
[navTime]
], # Do not drop the time index here since it is TOC
nav_meta=navigation_metadata,
trim=trim,
purify=drop_na,
station=station_name,
columns_mapping=column_mapper,
profile=profile,
)
)
pbar.set_description(f"Processing {obsTime}")
yield from epoches
# # TODO: Add the compatibility for other GNSS systems
def from_precise_ephemeris(
observation_data: pd.DataFrame,
observation_metadata: pd.Series,
sp3_data: pd.DataFrame,
station_name: tp.Optional[str] = None,
trim: bool = True,
drop_na: bool = True,
column_mapper: tp.Optional[tp.List[str]] = None,
) -> tp.Iterator[Epoch]:
"""Loads the observation data and precise ephemeris data to an epoch object.
Args:
observation_data (pd.DataFrame): The observation data.
observation_metadata (pd.Series): The observation metadata.
sp3_data (pd.DataFrame): The SP3 data.
station_name (str, optional): The station name. Defaults to None.
trim (bool, optional): Intersect satellite vehicles in observation and navigation data. Defaults to True.
drop_na (bool, optional): If True, the NaN values will be dropped from relevant columns. Defaults to True.
column_mapper (tp.List[str], optional): The column mapper. Defaults to None.
Returns:
tp.Iterator[Epoch]: An iterator of epoch objects.
"""
# Check if the data is empty
if observation_data.empty:
raise ValueError(
"The observation data is empty. Could not parse the observation file."
)
# Fragment the observation data
obsTimes = observation_data.index.get_level_values("time").unique()
epoches = []
with tqdm.tqdm(obsTimes) as pbar:
for t in pbar:
epoches.append(
Epoch(
timestamp=t,
obs_data=observation_data.loc[t],
obs_meta=observation_metadata,
nav_data=sp3_data,
nav_meta=pd.Series(),
station=station_name,
trim=trim,
purify=drop_na,
profile=Epoch.SP3,
columns_mapping=column_mapper,
)
)
pbar.set_description(f"Processing {t}")
yield from epoches
## TODO: Add the compatibility for other GNSS systems
def from_rinex_files(
observation_file: Path,
navigation_file: Path,
station_name: tp.Optional[str] = None,
mode: str = "maxsv",
trim: bool = True,
drop_na: bool = True,
column_mapper: tp.Optional[tp.List[str]] = None,
matching_threshold: pd.Timedelta = pd.Timedelta(hours=3),
profile: dict[str, str | bool] = Epoch.INITIAL,
) -> tp.Iterator[Epoch]:
"""Loads the RINEX files to an epoch object.
Args:
observation_file (Path): The observation file.
navigation_file (Path): The navigation file.
station_name (str, optional): The station name. Defaults to None.
mode (str, optional): The mode of matching the navigation data.[maxsv | nearest]. Defaults to 'maxsv'.
trim (bool, optional): Intersect satellite vehicles in observation and navigation data. Defaults to True.
drop_na (bool, optional): If True, the NaN values will be dropped from relevant columns. Defaults to True.
column_mapper (tp.List[str], optional): The column mapper. Defaults to None.
matching_threshold (pd.Timedelta, optional): The matching threshold to match the observation and navigation data. Defaults to pd.Timedelta(hours=3).
profile (dict[str, str| bool], optional): The profile of the epoch. Defaults to Epoch.INITIAL.
Returns:
tp.List[Epoch]: A list of epoch objects.
"""
# Load the data
obsParser = IParseGPSObs()
navParser = IParseGPSNav()
# Load the observation data
obsmeta, obsdata = obsParser.parse(observation_file)
# Load the navigation data
navmeta, navdata = navParser.parse(navigation_file)
return from_rinex_dataframes(
observation_data=obsdata,
observation_metadata=obsmeta,
navigation_data=navdata,
navigation_metadata=navmeta,
station_name=station_name,
matching_mode=mode,
trim=trim,
drop_na=drop_na,
column_mapper=column_mapper,
matching_threshold=matching_threshold,
profile=profile,
)
## TODO: Add the compatibility for other GNSS systems
def from_observation_file(
observation_file: Path,
station_name: tp.Optional[str] = None,
mode: str = "maxsv",
trim: bool = True,
drop_na: bool = True,
column_mapper: tp.Optional[tp.List[str]] = None,
matching_threshold: pd.Timedelta = pd.Timedelta(hours=3),
profile: dict[str, str | bool] = Epoch.INITIAL,
logging: bool = False,
download_station: str = "JPL",
) -> tp.Iterator[Epoch]:
"""Loads the observation file to an epoch object.
This method will fetch the navigation data from the NASA CDDIS server automatically.
Args:
observation_file (Path): The observation file.
station_name (str, optional): The receiver station name if from IGS station. Defaults to None.
mode (str, optional): The mode of matching the navigation data.[maxsv | nearest]. Defaults to 'maxsv'.
trim (bool, optional): Intersect satellite vehicles in observation and navigation data. Defaults to True.
drop_na (bool, optional): If True, the NaN values will be dropped from relevant columns. Defaults to True.
column_mapper (tp.List[str], optional): The column mapper. Defaults to None.
matching_threshold (pd.Timedelta, optional): The matching threshold to match the observation and navigation data. Defaults to pd.Timedelta(hours=3).
profile (dict[str, str| bool], optional): The profile of the epoch. Defaults to Epoch.INITIAL.
logging (bool, optional): If True, the logging will be enabled. Defaults to False.
download_station (str, optional): The station to download the navigation data. Defaults to "JPL".
Returns:
tp.List[Epoch]: A list of epoch objects.
"""
# Get the noon days of the observation file
obsParser = IParseGPSObs()
# Load the observation data
obsmeta, obsdata = obsParser.parse(observation_file)
# Seperate the data by day
obsDataFrames = split_dataframe_by_day(obsdata)
# Download the navigation data for each day
navDataFrames = {
key: fetch_nav_data(date=key, logging=logging, station=download_station)
for key in obsDataFrames.keys()
}
# Load the data
epoches = []
for key in obsDataFrames.keys():
epoches.extend(
list(
from_rinex_dataframes(
observation_data=obsDataFrames[key],
observation_metadata=obsmeta,
navigation_data=navDataFrames[key][1],
navigation_metadata=navDataFrames[key][0],
station_name=station_name,
matching_mode=mode,
trim=trim,
drop_na=drop_na,
column_mapper=column_mapper,
matching_threshold=matching_threshold,
profile=profile,
)
)
)
return epoches