Navigator / src / navigator / utils / scripts / cddis_download_mounted.py
cddis_download_mounted.py
Raw
"""Download rinex files from CCIDS using curlftpfs."""

import calendar
import random
import re
import shutil
import tempfile
from datetime import datetime
from pathlib import Path

import click
import tqdm

from ...download.ftpserver.mount_server import CDDISMountServer
from ...logger.logger import get_logger

# ------------------------------------------ Set the logging level ------------------------------

logger = get_logger(__name__)
logger.info(f"Staring the logging for {__name__}... ")

# ------------------------------------------ END ------------------------------

# ------------------------------------------ END ------------------------------

# ------------------------------------------ Global Variables ------------------------------
# USER EMAIL
USER_EMAIL = "anonymous@anonymous.com"

# Temporary directory name
TEMP_MOUNT_DIR_NAME = "tmp_ccids_mount"

# Regex to match the rinex files
_obs_regex = r".*_01D_30S_MO.*\.(crx|rnx)\.gz"
_nav_regex = r".*_01D*_GN.*\.(crx|rnx)\.gz"

# ------------------------------------------ END ------------------------------


# ------------------------------------------ Helper Functions ---------------------------------
# Helper function to match files in the given path and subdirectories
def _match_files(path: Path, regex: list[str]) -> list[str]:
    """Match files in the given path and subdirectories recursively."""
    matched_files = []
    for file in path.iterdir():
        if file.is_dir():
            matched_files.extend(_match_files(file, regex))
        elif file.is_file() and any(
            [re.match(pattern, file.name) for pattern in regex]
        ):
            logger.debug(f"Matched file: {file}")
            matched_files.append(file)
        else:
            logger.debug(f"Skipped file: {file}")
    return matched_files


# Format int 1 to 001
def format_int(num: int) -> str:
    """Format int to three digits. i.e 1 -> 001, 10 -> 010, 100 -> 100."""
    return f"{num:03}"


# Get the range of days from 1 to 366 for the given year and month
def get_day_range(year: int, month: int) -> tuple[int, int]:
    """Get the range of days from 1 to 366 for the given year and month.

    Args:
        year (int): The year.
        month (int): The month (1 to 12).

    Returns:
        tuple: A tuple containing the start and end day numbers.
    """
    start_date = datetime(year, month, 1)
    end_date = datetime(year, month, calendar.monthrange(year, month)[1])
    start_day = start_date.timetuple().tm_yday
    end_day = end_date.timetuple().tm_yday
    return start_day, end_day


# Daily Sweep Command
def _daily_sweep(
    mountDir: Path, save_path: Path, year: int, day: int, samples: int
) -> None:
    """Download RINEX files for the given year and day [1, 366]."""
    logger.info(f"Starting the daily sweep process for {year}/{day}...")

    # Make the save path if it does not exist
    logger.info(f"Making the save path if it does not exist: {save_path}")
    save_path.mkdir(parents=True, exist_ok=True)

    # Point to the ftp path of the given year / month
    ftp_path = (
        mountDir / "pub" / "gnss" / "data" / "daily" / f"{year}" / format_int(day)
    )

    # YYD and YYN directories of the given year / month
    yyd = str(year)[-2:] + "d"
    yyn = str(year)[-2:] + "n"

    # Match the files at yyd
    logger.info(f"Matching the files at {ftp_path / yyd}")
    obs_files = _match_files(ftp_path / yyd, [_obs_regex])
    logger.info(f"Mached files at {ftp_path / yyd}: {obs_files.__len__()}")

    # Match the files at yyn
    logger.info(f"Matching the files at {ftp_path / yyn}")
    nav_files = _match_files(ftp_path / yyn, [_nav_regex])
    logger.info(f"Mached files at {ftp_path / yyn}: {nav_files.__len__()}")

    # Intersection of the stations in obs and nav files
    logger.debug("Finding the intersection of the stations in obs and nav files")
    nav_stations = set([file.name.split("_")[0] for file in nav_files])
    obs_stations = set([file.name.split("_")[0] for file in obs_files])

    # Intersection of the stations in obs and nav files
    logger.debug("Finding the intersection of the stations in obs and nav files")
    common_stations = nav_stations.intersection(obs_stations)
    logger.debug(f"Common stations: {common_stations}")

    # Filter paths with common stations
    logger.debug("Filtering paths with common stations")
    obs_files = [
        file for file in obs_files if file.name.split("_")[0] in common_stations
    ]
    nav_files = [
        file for file in nav_files if file.name.split("_")[0] in common_stations
    ]

    # Print the number of files
    logger.info(f"Number of Intersected Obs files: {obs_files.__len__()}")
    logger.info(f"Number of Intersected Nav files: {nav_files.__len__()}")

    # Check if the number of files are equal
    if obs_files.__len__() != nav_files.__len__():
        logger.info("Number of obs and nav files are not equal!")
        # Make one to one mapping between obs and nav files based on the station name
        logger.info("Get one obs, nav file for each station in common stations")
        stationMap = {}

        # Get one obs, nav file for each station in common stations
        for station in common_stations:
            stationMap[station] = [None, None]
            for obs_file in obs_files:
                if station == obs_file.name.split("_")[0]:
                    stationMap[station][0] = obs_file
                    break
            for nav_file in nav_files:
                if station == nav_file.name.split("_")[0]:
                    stationMap[station][1] = nav_file
                    break

        # Filter the files having both obs and nav files
        obs_file = [file[0] for file in stationMap.values() if file[0] is not None]
        nav_file = [file[1] for file in stationMap.values() if file[1] is not None]

        # Check if the number of files are equal
        logger.info(f"Update common number of files: {obs_file.__len__()}")

    # Sort and zip the nav and obs files (obs_path , nav_path)
    logger.debug("Sorting and zipping the nav and obs files")
    obs_files.sort(key=lambda x: x.name.split("_")[0])
    nav_files.sort(key=lambda x: x.name.split("_")[0])
    files = list(zip(obs_files, nav_files))

    # Take the samples if given
    if samples != -1 and samples < len(files):
        logger.debug(f"Taking {samples} random samples")
        files = random.sample(list(files), samples)

    # Copy the files to the destination path
    logger.info(f"Copying the  files to {save_path}")
    for obs_file, nav_file in tqdm.tqdm(
        files,
        desc="Copying files",
        total=len(files),
        bar_format="{l_bar}{bar:20}{r_bar}{bar:-20b}",
    ):
        shutil.copy(obs_file, save_path / obs_file.name)
        shutil.copy(nav_file, save_path / nav_file.name)
        logger.debug(f"COPY: {obs_file} to {save_path / obs_file.name}")
        logger.debug(f"COPY: {nav_file} to {save_path / nav_file.name}")

    # Log the number of files copied
    logger.info(f"Copied {len(files)} files")
    return


# ------------------------------------------ END ------------------------------


# ------------------------------------------ Click Commands ------------------------------------------------
@click.group(invoke_without_command=True, no_args_is_help=True)
@click.pass_context
@click.option(
    "-e",
    "--email",
    required=False,
    default=USER_EMAIL,
    help=f"Email to login to CCIDS. Default: {USER_EMAIL}",
)
@click.version_option(version="1.0.0")
def main(ctx: click.Context, email: str) -> None:
    """Download RINEX files from CCIDS using curlftpfs(Required)."""
    logger.info("Starting the download process...")

    # Set the context object to a dictionary
    ctx.ensure_object(dict)

    # Create a temporary directory to mount the ftp server
    mount_dir = Path(tempfile.gettempdir()) / TEMP_MOUNT_DIR_NAME
    logger.info(f"Creating a temporary mount directory: {mount_dir}")

    # Instantiate the CDDISMountServer
    logger.info("Instantiating the CDDISMountServer")
    mount_server = CDDISMountServer(mount_dir, email)

    # Add mount server to the context
    logger.info("Adding the mount server to the context")
    ctx.obj["mount_server"] = mount_server

    # Add end script callback to the context
    logger.debug("Adding the end script callback to the context")
    ctx.call_on_close(mount_server.unmount)

    return


@main.command()
@click.pass_context
@click.option(
    "-p",
    "--path",
    required=True,
    type=click.Path(exists=True, writable=True, path_type=Path),
    help="Path to save the files",
)
@click.option(
    "-y", "--year", required=True, type=click.INT, help="Year to download RINEX files"
)
@click.option(
    "-d",
    "--day",
    required=True,
    type=click.IntRange(1, 366),
    help="Day of year to download RINEX files",
)
@click.option(
    "-s",
    "--samples",
    required=False,
    type=click.INT,
    default=-1,
    help="Number of samples to download",
)
def daily(ctx: click.Context, path: Path, year: int, day: int, samples: int) -> None:
    """Download RINEX files for the given year and day."""
    # path / year / day
    path = path / str(year) / format_int(day)

    # Mount the server
    logger.info("Mounting the server")
    server: CDDISMountServer = ctx.obj["mount_server"]
    server.mount()
    # Sweep the given year and day
    _daily_sweep(server.mountDir, path, year, day, samples)
    return


@main.command()
@click.pass_context
@click.option(
    "-p",
    "--path",
    required=True,
    type=click.Path(exists=True, writable=True, path_type=Path),
    help="Path to save the files",
)
@click.option(
    "-y", "--year", required=True, type=click.INT, help="Year to download RINEX files"
)
@click.option(
    "-s",
    "--samples",
    required=False,
    type=click.INT,
    default=-1,
    help="Number of samples to download",
)
def yearly(ctx: click.Context, path: Path, year: int, samples: int) -> None:
    """Download RINEX files for the given year."""
    # path / year
    path = path / str(year)

    # Get the range of days
    start_day, end_day = 1, 366

    # If current year, get the range of days till yesterday
    if year == datetime.now().year:
        start_day = 1
        end_day = datetime.now().timetuple().tm_yday - 1

    # Mount the server
    logger.info("Mounting the server")
    server: CDDISMountServer = ctx.obj["mount_server"]
    server.mount()

    logger.info("Starting the yearly sweep process...")
    for day in range(start_day, end_day + 1):
        # path / year / day
        day_path = path / format_int(day)

        # Sweep the given year and day
        try:
            _daily_sweep(server.mountDir, day_path, year, day, samples)
            logger.info(
                f"------------------------Finished the daily sweep process for {year}/{day}----------------------------------"
            )
        except Exception as e:
            logger.error(f"Error in downloading files for {year}/{day}")
            logger.error(e)
            continue

    logger.info("Finished downloading all files")

    return


# ------------------------------------------ END ----------------------------------------------------------------------------------

# ------------------------------------------ Main Function ---------------------------------
if __name__ == "__main__":
    main()
# ------------------------------------------ END ---------------------------------