Navigator / src / navigator / download / idownload / rinex / nasa_cddis.py
nasa_cddis.py
Raw
"""NASA CDDIS RINEX V3 Downloader Module.

This module facilitates the downloading of RINEX Version 3 (V3) files from the NASA CDDIS
(Crustal Dynamics Data Information System) FTP server. It offers a class, NasaCddis, that
enables establishing connections, checking the connection status, and downloading RINEX V3
files with specific criteria.

The module uses FTPFS from the fs.ftpfs package to interact with the FTP server and perform
file downloads. It leverages concurrent threading for efficient file retrieval.

Classes:
    NasaCddis: A class to download RINEX V3 files from the NASA CDDIS FTP server.

Attributes:
    None

Functions:
    None

Example:
    An example use case might be:

    ```
    downloader = NasaCddis(email="your_email@example.com", threads=8)
    downloader.connect()
    if downloader.is_alive():
        downloader.download(year=2023, day=300, save_path=Path("/path/to/save"))
    else:
        print("Connection to the FTP server is not available.")
    ```

Note:
    Ensure that the provided email address is valid and anonymous access is allowed on
    the NASA CDDIS FTP server.

Author:
    Nischal Bhattarai 
    nbhattarai@crimson.ua.edu

Version:
    0.1.0
"""

import random
from concurrent.futures import ThreadPoolExecutor
from logging import NullHandler
from pathlib import Path

import tqdm

from ....logger.logger import get_logger
from ....utils.matcher.matcher import GpsNav3DailyMatcher, MixedObs3DailyMatcher
from ...ftpserver.ftpfs_server import FTPFSServer

__all__ = ["NasaCddisV3"]


class NasaCddisV3:
    """A class to download RINEX V3 files from NASA CDDIS FTP server.

    This class enables downloading RINEX V3 files from the NASA CDDIS (Crustal
    Dynamics Data Information System) FTP server. It provides methods to establish
    connections, check connection status, and download RINEX V3 files.

    Attributes:
        server_address (str): The address of the FTP server.
        username (str): The username for authentication.
        account (str): The account name for authentication.
        tls (bool): Indicates whether TLS encryption should be used.
    """

    server_address: str = "gdc.cddis.eosdis.nasa.gov"
    usename = "anonymous"
    tls = True

    def __init__(
        self,
        email: str = "anonymous@gmail.com",
        threads: int = 5,
        logging: bool = False,
    ) -> None:
        """Initialize NasaCddis object to download RINEX V3 files.

        Args:
            email (str): Email address used for authentication.
            threads (int): Number of threads for concurrent downloads (default is 5).
            logging (bool): If True, enables logging (default is False).

        Raises:
            ValueError: If the provided email is invalid.
        """
        # Set the number of threads
        self.threads = threads

        # Get the logger
        self.logger = get_logger(__name__, dummy=not logging)

        # Disable logging if logging is False
        if not logging:
            self.logger.handlers.clear()
            self.logger.addHandler(NullHandler())

        # Matcher for GPS Nav Files
        self.gps_nav_matcher = GpsNav3DailyMatcher()
        self.obs_matcher = MixedObs3DailyMatcher()

        self.logger.info(f"Instantiating NasaCddisV3 with email: {email}")
        # Initialize the FTPFS server
        self.ftpfs = FTPFSServer(
            host=self.server_address, user="anonymous", acct=email, tls=True
        )

        super().__init__()

    def _threaded_fetch_files(
        self,
        files: list[str],
        save_path: Path,
        no_pbar: bool = False,
        *args,  # noqa : ARG006
        **kwargs,  # noqa : ARG006
    ) -> None:
        """Fetches the file names from the FTP server.

        This method fetches the file names from the FTP server for the provided
        observation and navigation paths. It also updates the progress bar
        accordingly.

        Args:
            obs_paths (list[str]): The observation paths to fetch file names from.
            files (list[str]): The list to store the file names.
            save_path (Path): The path to save the downloaded files.
            no_pbar (bool): If True, disables the progress bar (default is False).
            *args: Variable length argument list.
            **kwargs: Arbitrary keyword arguments.

        Raises:
            ValueError: If the provided input arguments are invalid.
        """
        # Log the number of files to download
        self.logger.info(f"Number of Files to Download: {len(files)}")
        # Initialize the progress bar
        with tqdm.tqdm(
            total=len(files), desc="Downloading", disable=not no_pbar
        ) as pbar:
            with ThreadPoolExecutor(max_workers=self.threads) as executor:
                futures = []

                for fname in files:
                    # Submit the download job to the executor
                    futures.append(
                        executor.submit(self.ftpfs.download, fname, save_path)
                    )
                    # Add a callback to update the progress bar
                    futures[-1].add_done_callback(
                        lambda x: pbar.update(1)  # noqa : ARG005
                    )

                # Wait for all the futures to complete
                executor.shutdown(wait=True)
        # Log the download completion
        self.logger.info("Download Complete!")
        self.logger.info(f"Downloaded {len(files)} files to {save_path.absolute()}")
        return

    def _search_available_files(
        self,
        year: int,
        day: int,
        match_string: str = None,
        sample: int = -1,
        *args,  # noqa : ARG006
        **kwargs,  # noqa : ARG006
    ) -> list[str]:
        """Searches for available files on the FTP server.

        This method searches for available files on the FTP server for the provided year, day, and match string.

        Args:
            year (int): The year of the RINEX files.
            day (int): The day of the RINEX files. [1-366]
            match_string (str): The string to match in the file names (default is None).
            sample (int): The number of files to sample (default is -1).
            *args: Variable length argument list.
            **kwargs: Arbitrary keyword arguments.

        Returns:
            list[str]: The list of available files on the FTP server.
        """
        # Default path to daily data
        default_obs_path = (
            f"/pub/gnss/data/daily/{year}/{str(day).zfill(3)}/{str(year)[-2:]}d"
        )
        default_nav_path = (
            f"/pub/gnss/data/daily/{year}/{str(day).zfill(3)}/{str(year)[-2:]}n"
        )
        # Log the default paths
        self.logger.info(f"Default OBS Path: {default_obs_path}")
        self.logger.info(f"Default NAV Path: {default_nav_path}")

        # Get the file names under the default path
        obs_file_names = self.ftpfs.listdir(default_obs_path)
        nav_file_names = self.ftpfs.listdir(default_nav_path)

        # Get only the matched file names
        obs_file_names = [fname for fname in obs_file_names if self.obs_matcher(fname)]
        nav_file_names = [
            fname for fname in nav_file_names if self.gps_nav_matcher(fname)
        ]
        self.logger.info(f"Number of OBS Files: {len(obs_file_names)}")
        self.logger.info(f"Number of NAV Files: {len(nav_file_names)}")

        # Get the station name in both nav and obs
        obs_stations = [
            metadata["station_name"]
            for metadata in map(self.obs_matcher.extract_metadata, obs_file_names)
        ]
        nav_stations = [
            metadata["station_name"]
            for metadata in map(self.gps_nav_matcher.extract_metadata, nav_file_names)
        ]

        # Intersect the stations
        stations = set(obs_stations).intersection(set(nav_stations))

        # Filter stations based on match_string
        if match_string is not None:
            stations = [
                station for station in stations if match_string.upper() in station
            ]
            self.logger.info(f"Filtered Stations: {stations}")

        # Raise error if no stations are found
        if len(stations) == 0:
            raise ValueError("No stations found with the provided match_string")

        # File pairs
        file_pairs = []
        self.logger.info("Getting file pairs!")
        # Get (OBS, NAV) file per station
        for station_name in stations:
            station_pair = [None, None]
            for obs_fname in obs_file_names:
                if (
                    station_name
                    == self.obs_matcher.extract_metadata(obs_fname)["station_name"]
                ):
                    station_pair[0] = obs_fname
                    break
            for nav_fname in nav_file_names:
                if (
                    station_name
                    == self.gps_nav_matcher.extract_metadata(nav_fname)["station_name"]
                ):
                    station_pair[1] = nav_fname
                    break
            file_pairs.append(
                [
                    default_obs_path + "/" + station_pair[0],
                    default_nav_path + "/" + station_pair[1],
                ]
            )
        self.logger.info(f"Number of File Pairs: {len(file_pairs)}")

        # Sample the file pairs if sample is not -1
        if sample != -1:
            # Sample must be less than or equal to the number of files
            if sample > len(file_pairs):
                raise ValueError(
                    f"requested sample must be less than or equal to {len(file_pairs)}"
                )
            file_pairs = random.sample(file_pairs, sample)
            # Log the sample
            self.logger.info(f"Number of File Pairs after sampling: {len(file_pairs)}")

        # Flatten the file pairs
        return [pair for pair in file_pairs for pair in pair]

    def download(
        self,
        year: int,
        day: int,
        save_path: Path,
        num_files: int = 1,
        no_pbar: bool = False,
        match_string: str = "JPL",
        *args,  # noqa : ARG006
        **kwargs,  # noqa : ARG006
    ) -> None:
        """Downloads RINEX V3 files from the FTP server.

        This method downloads RINEX V3 files for a specific day and year from
        the NASA CDDIS FTP server. It identifies matching observation and GPS
        navigation files, downloads them concurrently using multiple threads,
        and saves them to the specified local directory.

        Args:
            year (int): The year of the RINEX files.
            day (int): The day of the RINEX files. [1-366]
            save_path (Path): The path to save the downloaded files.
            num_files (int): The number of files to download  (default is 1)
            no_pbar (bool): If True, disables the progress bar (default is False).
            match_string (str): The string to match in the file names (default is JPL).
            *args: Variable length argument list.
            **kwargs: Arbitrary keyword arguments.

        Raises:
            ValueError: If the provided input arguments are invalid.
        """
        # Check if the save path exists
        if not save_path.exists():
            raise ValueError("The save path does not exist.")

        # Get the available files
        available_files = self._search_available_files(
            year=year,
            day=day,
            match_string=match_string,
            sample=num_files,
            *args,
            **kwargs,
        )

        # Fetch the files
        self._threaded_fetch_files(
            available_files,
            save_path,
            no_pbar=no_pbar,
            *args,
            **kwargs,
        )

        # Close the FTPFS connection
        self.ftpfs.close()

        return