"""This module contains the script to triangulate the data from the RINEX files.""" import secrets from pathlib import Path import click import matplotlib.pyplot as plt import numpy as np import pandas as pd import seaborn as sns from ...core.triangulate import ( IterativeTriangulationInterface, Triangulate, ) from ...epoch import from_rinex_files from ...logger.logger import get_logger from ..igs_network.igs_network import IGSNetwork @click.group(invoke_without_command=True, no_args_is_help=True) @click.pass_context @click.option( "-v", "--verbose", required=False, is_flag=True, default=False, help="Enable verbose logging", ) def main( ctx: click.Context, verbose: bool, ) -> None: """Triangulate the data from the RINEX files.""" logger = get_logger(name=__name__, dummy=not verbose) logger.info("Starting triangulation!") # Ensure that the context object is dictionary-like ctx.ensure_object(dict) # Add the logger to the context object ctx.obj["logger"] = logger @main.command(name="wls") @click.pass_context @click.option( "-nav", "--nav-file", required=True, type=click.Path(exists=True, path_type=Path), help="The path to the RINEX navigation file", ) @click.option( "-obs", "--obs-file", required=True, type=click.Path(exists=True, path_type=Path), help="The path to the RINEX observation file", ) @click.option( "-mode", "--mode", required=False, type=click.Choice(["single", "dual"]), default="single", help="The mode to triangulate the data", ) @click.option( "-apply-tropo", "--apply-tropo", required=False, is_flag=True, default=False, help="Apply tropospheric correction to the data", ) @click.option( "-apply-iono", "--apply-iono", required=False, is_flag=True, default=False, help="Apply ionospheric correction to the data", ) @click.option( "-t", "--output-type", required=False, type=click.Choice(["csv", "html"]), default="html", help="The output file type", ) @click.option( "--match-mode", required=False, type=click.Choice(["maxsv", "nearest"]), default="maxsv", help="The mode to pair the observations and navigation messages", ) @click.option( "-i", "--ignore", required=False, type=click.INT, default=5, help="The epoch to ignore having less than this number of satellites", ) @click.option( "-code-only", "--code-only", required=False, is_flag=True, default=False, help="Use only the code measurements for triangulation without the phase measurements", ) @click.option( "-s", "--save", required=False, type=click.Path(exists=True, dir_okay=True, writable=True, path_type=Path), help="The path to save the data to", default=Path.cwd(), ) @click.option( "-skip", "--skip", required=False, default=0, type=click.IntRange(min=0), help="The number of steps to skip in the data", ) def wls( ctx: click.Context, nav_file: Path, obs_file: Path, mode: str, apply_tropo: bool, apply_iono: bool, output_type: str, match_mode: str, ignore: int, save: str, code_only: bool, skip: int, ) -> None: """Triangulate the data from the RINEX files using Weighted Least Squares (WLS).""" logger = ctx.obj["logger"] logger.info("Triangulating using Weighted Least Squares (WLS)!") logger.info("Epochifying the data!") # Epochify the data epoches = list( from_rinex_files( observation_file=obs_file, navigation_file=nav_file, mode=match_mode, ) ) logger.info(f"Found {len(epoches)} epoches!") N = skip # Filter out epoches with less than 5 satellites epoches = [epoch for epoch in epoches if len(epoch) > ignore] logger.info(f"Found {len(epoches)} epoches with more than {ignore} satellites!") # Triangulate the data triangulator = Triangulate( interface=IterativeTriangulationInterface(code_only=code_only) ) triangulation_epoches = epoches[::N] # Change the profile of the epoches for epoch in triangulation_epoches: epoch.profile["apply_tropo"] = apply_tropo epoch.profile["apply_iono"] = apply_iono epoch.profile["mode"] = mode logger.info( f"""Triangulating {len(triangulation_epoches)} epoches with the following profile: - Apply Tropospheric Correction: {apply_tropo} - Apply Ionospheric Correction: {apply_iono} - Mode: {mode} - Match Mode: {match_mode} - Ignore: {ignore} - Skip: {N} """ ) df = triangulator.triangulate_time_series( epoches=triangulation_epoches, override=True ) logger.info("Traingulation Completed! Saving the data!") # Convert to dataframe df = pd.DataFrame(df) # Save the data to the specified output type if output_type == "csv": df.to_csv( save / f"traingulation-wls-{secrets.token_urlsafe(nbytes=4)}.csv", index=False, ) elif output_type == "html": df.to_html( save / f"traingulation-wls-{secrets.token_urlsafe(nbytes=4)}.html", index=False, ) return @main.command(name="igs-diff-plot") @click.pass_context @click.option( "-out-csv", "--output-csv", required=True, type=click.Path(exists=True, file_okay=True, readable=True, path_type=Path), help="The path to the output csv file from the triangulation", ) @click.option( "-st", "--station-name", required=True, type=click.STRING, help="The name of the station to plot", ) @click.option( "-s", "--save-path", required=False, type=click.Path(exists=True, dir_okay=True, writable=True, path_type=Path), help="The path to save the plot to", default=Path.cwd(), ) def igs_diff_plot( ctx: click.Context, output_csv: Path, station_name: str, save_path: Path, ) -> None: """Plot the IGS differences.""" logger = ctx.obj["logger"] logger.info( "Plotting the differences between the IGS and the calculated positions!" ) # Read the data df = pd.read_csv(output_csv) # Get the station coordinates network = IGSNetwork() if station_name not in network.stations.index: raise ValueError(f"Station {station_name} not found in the IGS network!") true_coords = network.get_xyz( station=station_name, ) true_coords = pd.Series( { "x": true_coords[0], "y": true_coords[1], "z": true_coords[2], } ) enu_error = Triangulate.enu_error( predicted=df, actual=true_coords, ) # Plot the data fig, ax = plt.subplots(1, 3, figsize=(15, 5)) # Set seaborn theme sns.set_theme( style="darkgrid", ) colnames = ["E_error", "N_error", "U_error"] for i, name in enumerate(colnames): sns.lineplot( x=df.index, y=enu_error[name], ax=ax[i], label=f"{name}", ) ax[i].set_title(f"{name} vs Time") plt.tight_layout() # Save the plot plt.savefig(save_path / f"enu-plot_{station_name}.png") # Plot the norm of the error fig, ax = plt.subplots(1, 1, figsize=(10, 5)) sns.lineplot( x=df.index, y=enu_error.apply(np.linalg.norm, axis=1), ax=ax, label="Total Error", ) ax.set_title("Total Error vs Time") # Save the plot plt.savefig(save_path / f"error_plot_{station_name}.png") # Plot the histogram of the error fig, ax = plt.subplots(1, 1, figsize=(10, 5)) sns.histplot( x=enu_error.apply(np.linalg.norm, axis=1), ax=ax, kde=True, ) ax.set_title("Histogram of the Total Error") # Save the plot plt.savefig( save_path / f"hist-error-plot_{station_name}.png", ) return