"""NASA CDDIS RINEX V3 Downloader Module.
This module facilitates the downloading of RINEX Version 3 (V3) files from the NASA CDDIS
(Crustal Dynamics Data Information System) FTP server. It offers a class, NasaCddis, that
enables establishing connections, checking the connection status, and downloading RINEX V3
files with specific criteria.
The module uses FTPFS from the fs.ftpfs package to interact with the FTP server and perform
file downloads. It leverages concurrent threading for efficient file retrieval.
Classes:
NasaCddis: A class to download RINEX V3 files from the NASA CDDIS FTP server.
Attributes:
None
Functions:
None
Example:
An example use case might be:
```
downloader = NasaCddis(email="your_email@example.com", threads=8)
downloader.connect()
if downloader.is_alive():
downloader.download(year=2023, day=300, save_path=Path("/path/to/save"))
else:
print("Connection to the FTP server is not available.")
```
Note:
Ensure that the provided email address is valid and anonymous access is allowed on
the NASA CDDIS FTP server.
Author:
Nischal Bhattarai
nbhattarai@crimson.ua.edu
Version:
0.1.0
"""
import random
from concurrent.futures import ThreadPoolExecutor
from logging import NullHandler
from pathlib import Path
import tqdm
from ....logger.logger import get_logger
from ....utils.matcher.matcher import GpsNav3DailyMatcher, MixedObs3DailyMatcher
from ...ftpserver.ftpfs_server import FTPFSServer
__all__ = ["NasaCddisV3"]
class NasaCddisV3:
"""A class to download RINEX V3 files from NASA CDDIS FTP server.
This class enables downloading RINEX V3 files from the NASA CDDIS (Crustal
Dynamics Data Information System) FTP server. It provides methods to establish
connections, check connection status, and download RINEX V3 files.
Attributes:
server_address (str): The address of the FTP server.
username (str): The username for authentication.
account (str): The account name for authentication.
tls (bool): Indicates whether TLS encryption should be used.
"""
server_address: str = "gdc.cddis.eosdis.nasa.gov"
usename = "anonymous"
tls = True
def __init__(
self,
email: str = "anonymous@gmail.com",
threads: int = 5,
logging: bool = False,
) -> None:
"""Initialize NasaCddis object to download RINEX V3 files.
Args:
email (str): Email address used for authentication.
threads (int): Number of threads for concurrent downloads (default is 5).
logging (bool): If True, enables logging (default is False).
Raises:
ValueError: If the provided email is invalid.
"""
# Set the number of threads
self.threads = threads
# Get the logger
self.logger = get_logger(__name__, dummy=not logging)
# Disable logging if logging is False
if not logging:
self.logger.handlers.clear()
self.logger.addHandler(NullHandler())
# Matcher for GPS Nav Files
self.gps_nav_matcher = GpsNav3DailyMatcher()
self.obs_matcher = MixedObs3DailyMatcher()
self.logger.info(f"Instantiating NasaCddisV3 with email: {email}")
# Initialize the FTPFS server
self.ftpfs = FTPFSServer(
host=self.server_address, user="anonymous", acct=email, tls=True
)
super().__init__()
def _threaded_fetch_files(
self,
files: list[str],
save_path: Path,
no_pbar: bool = False,
*args, # noqa : ARG006
**kwargs, # noqa : ARG006
) -> None:
"""Fetches the file names from the FTP server.
This method fetches the file names from the FTP server for the provided
observation and navigation paths. It also updates the progress bar
accordingly.
Args:
obs_paths (list[str]): The observation paths to fetch file names from.
files (list[str]): The list to store the file names.
save_path (Path): The path to save the downloaded files.
no_pbar (bool): If True, disables the progress bar (default is False).
*args: Variable length argument list.
**kwargs: Arbitrary keyword arguments.
Raises:
ValueError: If the provided input arguments are invalid.
"""
# Log the number of files to download
self.logger.info(f"Number of Files to Download: {len(files)}")
# Initialize the progress bar
with tqdm.tqdm(
total=len(files), desc="Downloading", disable=not no_pbar
) as pbar:
with ThreadPoolExecutor(max_workers=self.threads) as executor:
futures = []
for fname in files:
# Submit the download job to the executor
futures.append(
executor.submit(self.ftpfs.download, fname, save_path)
)
# Add a callback to update the progress bar
futures[-1].add_done_callback(
lambda x: pbar.update(1) # noqa : ARG005
)
# Wait for all the futures to complete
executor.shutdown(wait=True)
# Log the download completion
self.logger.info("Download Complete!")
self.logger.info(f"Downloaded {len(files)} files to {save_path.absolute()}")
return
def _search_available_files(
self,
year: int,
day: int,
match_string: str = None,
sample: int = -1,
*args, # noqa : ARG006
**kwargs, # noqa : ARG006
) -> list[str]:
"""Searches for available files on the FTP server.
This method searches for available files on the FTP server for the provided year, day, and match string.
Args:
year (int): The year of the RINEX files.
day (int): The day of the RINEX files. [1-366]
match_string (str): The string to match in the file names (default is None).
sample (int): The number of files to sample (default is -1).
*args: Variable length argument list.
**kwargs: Arbitrary keyword arguments.
Returns:
list[str]: The list of available files on the FTP server.
"""
# Default path to daily data
default_obs_path = (
f"/pub/gnss/data/daily/{year}/{str(day).zfill(3)}/{str(year)[-2:]}d"
)
default_nav_path = (
f"/pub/gnss/data/daily/{year}/{str(day).zfill(3)}/{str(year)[-2:]}n"
)
# Log the default paths
self.logger.info(f"Default OBS Path: {default_obs_path}")
self.logger.info(f"Default NAV Path: {default_nav_path}")
# Get the file names under the default path
obs_file_names = self.ftpfs.listdir(default_obs_path)
nav_file_names = self.ftpfs.listdir(default_nav_path)
# Get only the matched file names
obs_file_names = [fname for fname in obs_file_names if self.obs_matcher(fname)]
nav_file_names = [
fname for fname in nav_file_names if self.gps_nav_matcher(fname)
]
self.logger.info(f"Number of OBS Files: {len(obs_file_names)}")
self.logger.info(f"Number of NAV Files: {len(nav_file_names)}")
# Get the station name in both nav and obs
obs_stations = [
metadata["station_name"]
for metadata in map(self.obs_matcher.extract_metadata, obs_file_names)
]
nav_stations = [
metadata["station_name"]
for metadata in map(self.gps_nav_matcher.extract_metadata, nav_file_names)
]
# Intersect the stations
stations = set(obs_stations).intersection(set(nav_stations))
# Filter stations based on match_string
if match_string is not None:
stations = [
station for station in stations if match_string.upper() in station
]
self.logger.info(f"Filtered Stations: {stations}")
# Raise error if no stations are found
if len(stations) == 0:
raise ValueError("No stations found with the provided match_string")
# File pairs
file_pairs = []
self.logger.info("Getting file pairs!")
# Get (OBS, NAV) file per station
for station_name in stations:
station_pair = [None, None]
for obs_fname in obs_file_names:
if (
station_name
== self.obs_matcher.extract_metadata(obs_fname)["station_name"]
):
station_pair[0] = obs_fname
break
for nav_fname in nav_file_names:
if (
station_name
== self.gps_nav_matcher.extract_metadata(nav_fname)["station_name"]
):
station_pair[1] = nav_fname
break
file_pairs.append(
[
default_obs_path + "/" + station_pair[0],
default_nav_path + "/" + station_pair[1],
]
)
self.logger.info(f"Number of File Pairs: {len(file_pairs)}")
# Sample the file pairs if sample is not -1
if sample != -1:
# Sample must be less than or equal to the number of files
if sample > len(file_pairs):
raise ValueError(
f"requested sample must be less than or equal to {len(file_pairs)}"
)
file_pairs = random.sample(file_pairs, sample)
# Log the sample
self.logger.info(f"Number of File Pairs after sampling: {len(file_pairs)}")
# Flatten the file pairs
return [pair for pair in file_pairs for pair in pair]
def download(
self,
year: int,
day: int,
save_path: Path,
num_files: int = 1,
no_pbar: bool = False,
match_string: str = "JPL",
*args, # noqa : ARG006
**kwargs, # noqa : ARG006
) -> None:
"""Downloads RINEX V3 files from the FTP server.
This method downloads RINEX V3 files for a specific day and year from
the NASA CDDIS FTP server. It identifies matching observation and GPS
navigation files, downloads them concurrently using multiple threads,
and saves them to the specified local directory.
Args:
year (int): The year of the RINEX files.
day (int): The day of the RINEX files. [1-366]
save_path (Path): The path to save the downloaded files.
num_files (int): The number of files to download (default is 1)
no_pbar (bool): If True, disables the progress bar (default is False).
match_string (str): The string to match in the file names (default is JPL).
*args: Variable length argument list.
**kwargs: Arbitrary keyword arguments.
Raises:
ValueError: If the provided input arguments are invalid.
"""
# Check if the save path exists
if not save_path.exists():
raise ValueError("The save path does not exist.")
# Get the available files
available_files = self._search_available_files(
year=year,
day=day,
match_string=match_string,
sample=num_files,
*args,
**kwargs,
)
# Fetch the files
self._threaded_fetch_files(
available_files,
save_path,
no_pbar=no_pbar,
*args,
**kwargs,
)
# Close the FTPFS connection
self.ftpfs.close()
return