"""Download rinex files from CCIDS using curlftpfs.""" import calendar import random import re import shutil import tempfile from datetime import datetime from pathlib import Path import click import tqdm from ...download.ftpserver.mount_server import CDDISMountServer from ...logger.logger import get_logger # ------------------------------------------ Set the logging level ------------------------------ logger = get_logger(__name__) logger.info(f"Staring the logging for {__name__}... ") # ------------------------------------------ END ------------------------------ # ------------------------------------------ END ------------------------------ # ------------------------------------------ Global Variables ------------------------------ # USER EMAIL USER_EMAIL = "anonymous@anonymous.com" # Temporary directory name TEMP_MOUNT_DIR_NAME = "tmp_ccids_mount" # Regex to match the rinex files _obs_regex = r".*_01D_30S_MO.*\.(crx|rnx)\.gz" _nav_regex = r".*_01D*_GN.*\.(crx|rnx)\.gz" # ------------------------------------------ END ------------------------------ # ------------------------------------------ Helper Functions --------------------------------- # Helper function to match files in the given path and subdirectories def _match_files(path: Path, regex: list[str]) -> list[str]: """Match files in the given path and subdirectories recursively.""" matched_files = [] for file in path.iterdir(): if file.is_dir(): matched_files.extend(_match_files(file, regex)) elif file.is_file() and any( [re.match(pattern, file.name) for pattern in regex] ): logger.debug(f"Matched file: {file}") matched_files.append(file) else: logger.debug(f"Skipped file: {file}") return matched_files # Format int 1 to 001 def format_int(num: int) -> str: """Format int to three digits. i.e 1 -> 001, 10 -> 010, 100 -> 100.""" return f"{num:03}" # Get the range of days from 1 to 366 for the given year and month def get_day_range(year: int, month: int) -> tuple[int, int]: """Get the range of days from 1 to 366 for the given year and month. Args: year (int): The year. month (int): The month (1 to 12). Returns: tuple: A tuple containing the start and end day numbers. """ start_date = datetime(year, month, 1) end_date = datetime(year, month, calendar.monthrange(year, month)[1]) start_day = start_date.timetuple().tm_yday end_day = end_date.timetuple().tm_yday return start_day, end_day # Daily Sweep Command def _daily_sweep( mountDir: Path, save_path: Path, year: int, day: int, samples: int ) -> None: """Download RINEX files for the given year and day [1, 366].""" logger.info(f"Starting the daily sweep process for {year}/{day}...") # Make the save path if it does not exist logger.info(f"Making the save path if it does not exist: {save_path}") save_path.mkdir(parents=True, exist_ok=True) # Point to the ftp path of the given year / month ftp_path = ( mountDir / "pub" / "gnss" / "data" / "daily" / f"{year}" / format_int(day) ) # YYD and YYN directories of the given year / month yyd = str(year)[-2:] + "d" yyn = str(year)[-2:] + "n" # Match the files at yyd logger.info(f"Matching the files at {ftp_path / yyd}") obs_files = _match_files(ftp_path / yyd, [_obs_regex]) logger.info(f"Mached files at {ftp_path / yyd}: {obs_files.__len__()}") # Match the files at yyn logger.info(f"Matching the files at {ftp_path / yyn}") nav_files = _match_files(ftp_path / yyn, [_nav_regex]) logger.info(f"Mached files at {ftp_path / yyn}: {nav_files.__len__()}") # Intersection of the stations in obs and nav files logger.debug("Finding the intersection of the stations in obs and nav files") nav_stations = set([file.name.split("_")[0] for file in nav_files]) obs_stations = set([file.name.split("_")[0] for file in obs_files]) # Intersection of the stations in obs and nav files logger.debug("Finding the intersection of the stations in obs and nav files") common_stations = nav_stations.intersection(obs_stations) logger.debug(f"Common stations: {common_stations}") # Filter paths with common stations logger.debug("Filtering paths with common stations") obs_files = [ file for file in obs_files if file.name.split("_")[0] in common_stations ] nav_files = [ file for file in nav_files if file.name.split("_")[0] in common_stations ] # Print the number of files logger.info(f"Number of Intersected Obs files: {obs_files.__len__()}") logger.info(f"Number of Intersected Nav files: {nav_files.__len__()}") # Check if the number of files are equal if obs_files.__len__() != nav_files.__len__(): logger.info("Number of obs and nav files are not equal!") # Make one to one mapping between obs and nav files based on the station name logger.info("Get one obs, nav file for each station in common stations") stationMap = {} # Get one obs, nav file for each station in common stations for station in common_stations: stationMap[station] = [None, None] for obs_file in obs_files: if station == obs_file.name.split("_")[0]: stationMap[station][0] = obs_file break for nav_file in nav_files: if station == nav_file.name.split("_")[0]: stationMap[station][1] = nav_file break # Filter the files having both obs and nav files obs_file = [file[0] for file in stationMap.values() if file[0] is not None] nav_file = [file[1] for file in stationMap.values() if file[1] is not None] # Check if the number of files are equal logger.info(f"Update common number of files: {obs_file.__len__()}") # Sort and zip the nav and obs files (obs_path , nav_path) logger.debug("Sorting and zipping the nav and obs files") obs_files.sort(key=lambda x: x.name.split("_")[0]) nav_files.sort(key=lambda x: x.name.split("_")[0]) files = list(zip(obs_files, nav_files)) # Take the samples if given if samples != -1 and samples < len(files): logger.debug(f"Taking {samples} random samples") files = random.sample(list(files), samples) # Copy the files to the destination path logger.info(f"Copying the files to {save_path}") for obs_file, nav_file in tqdm.tqdm( files, desc="Copying files", total=len(files), bar_format="{l_bar}{bar:20}{r_bar}{bar:-20b}", ): shutil.copy(obs_file, save_path / obs_file.name) shutil.copy(nav_file, save_path / nav_file.name) logger.debug(f"COPY: {obs_file} to {save_path / obs_file.name}") logger.debug(f"COPY: {nav_file} to {save_path / nav_file.name}") # Log the number of files copied logger.info(f"Copied {len(files)} files") return # ------------------------------------------ END ------------------------------ # ------------------------------------------ Click Commands ------------------------------------------------ @click.group(invoke_without_command=True, no_args_is_help=True) @click.pass_context @click.option( "-e", "--email", required=False, default=USER_EMAIL, help=f"Email to login to CCIDS. Default: {USER_EMAIL}", ) @click.version_option(version="1.0.0") def main(ctx: click.Context, email: str) -> None: """Download RINEX files from CCIDS using curlftpfs(Required).""" logger.info("Starting the download process...") # Set the context object to a dictionary ctx.ensure_object(dict) # Create a temporary directory to mount the ftp server mount_dir = Path(tempfile.gettempdir()) / TEMP_MOUNT_DIR_NAME logger.info(f"Creating a temporary mount directory: {mount_dir}") # Instantiate the CDDISMountServer logger.info("Instantiating the CDDISMountServer") mount_server = CDDISMountServer(mount_dir, email) # Add mount server to the context logger.info("Adding the mount server to the context") ctx.obj["mount_server"] = mount_server # Add end script callback to the context logger.debug("Adding the end script callback to the context") ctx.call_on_close(mount_server.unmount) return @main.command() @click.pass_context @click.option( "-p", "--path", required=True, type=click.Path(exists=True, writable=True, path_type=Path), help="Path to save the files", ) @click.option( "-y", "--year", required=True, type=click.INT, help="Year to download RINEX files" ) @click.option( "-d", "--day", required=True, type=click.IntRange(1, 366), help="Day of year to download RINEX files", ) @click.option( "-s", "--samples", required=False, type=click.INT, default=-1, help="Number of samples to download", ) def daily(ctx: click.Context, path: Path, year: int, day: int, samples: int) -> None: """Download RINEX files for the given year and day.""" # path / year / day path = path / str(year) / format_int(day) # Mount the server logger.info("Mounting the server") server: CDDISMountServer = ctx.obj["mount_server"] server.mount() # Sweep the given year and day _daily_sweep(server.mountDir, path, year, day, samples) return @main.command() @click.pass_context @click.option( "-p", "--path", required=True, type=click.Path(exists=True, writable=True, path_type=Path), help="Path to save the files", ) @click.option( "-y", "--year", required=True, type=click.INT, help="Year to download RINEX files" ) @click.option( "-s", "--samples", required=False, type=click.INT, default=-1, help="Number of samples to download", ) def yearly(ctx: click.Context, path: Path, year: int, samples: int) -> None: """Download RINEX files for the given year.""" # path / year path = path / str(year) # Get the range of days start_day, end_day = 1, 366 # If current year, get the range of days till yesterday if year == datetime.now().year: start_day = 1 end_day = datetime.now().timetuple().tm_yday - 1 # Mount the server logger.info("Mounting the server") server: CDDISMountServer = ctx.obj["mount_server"] server.mount() logger.info("Starting the yearly sweep process...") for day in range(start_day, end_day + 1): # path / year / day day_path = path / format_int(day) # Sweep the given year and day try: _daily_sweep(server.mountDir, day_path, year, day, samples) logger.info( f"------------------------Finished the daily sweep process for {year}/{day}----------------------------------" ) except Exception as e: logger.error(f"Error in downloading files for {year}/{day}") logger.error(e) continue logger.info("Finished downloading all files") return # ------------------------------------------ END ---------------------------------------------------------------------------------- # ------------------------------------------ Main Function --------------------------------- if __name__ == "__main__": main() # ------------------------------------------ END ---------------------------------