"""Controller for the Ublox receiver.""" import shutil from pathlib import Path from time import sleep import click import pandas as pd import pyubx2 as ubx import tqdm from ...logger.logger import get_logger from ..ublox.commands import CFG_RATE, NAV_PVT, RXM_RAWX, RXM_SFRBX from ..ublox.profile import StreamingProfile DEFAULT_COMMAND_WAIT_TIME = 0.01 DUMP_MESSAGE_WAIT_TIME = 2.0 WARMUP_TIME = 1 @click.group(invoke_without_command=True, no_args_is_help=True) @click.pass_context @click.option( "-v", "--verbose", is_flag=True, help="Enable verbose mode.", default=False, ) def main( ctx: click.Context, verbose: bool = False, ) -> None: """Utility CLI interface for the Ublox receiver.""" # If verbose mode is enabled, set the logger logger = get_logger("ublox", dummy=not verbose) # Add the logger to the context ctx.ensure_object(dict) # Add the logger to the context ctx.obj["logger"] = logger return def dump_periodic_messages( profile: StreamingProfile, rxm_log_file: Path, pvt_log_file: Path ) -> int: """Dumps the periodic messages to the log files. Args: profile (StreamingProfile): The streaming profile object. rxm_log_file (Path): The path to the RXM-RAWX log file. pvt_log_file (Path): The path to the NAV-PVT log file. Returns: int: The number of messages collected and dumped. """ # WAIL for the command to be executed sleep(DUMP_MESSAGE_WAIT_TIME + DEFAULT_COMMAND_WAIT_TIME) # Get the data num_msg, data = profile.collect() # Write the data to the log files for msg in data["RXM-RAWX"]: rxm_log_file.write(msg.serialize()) for msg in data["NAV-PVT"]: pvt_log_file.write(msg.serialize()) for msg in data["RXM-SFRBX"]: rxm_log_file.write(msg.serialize()) sleep(DEFAULT_COMMAND_WAIT_TIME) return num_msg @main.command(name="log") @click.pass_context @click.option( "-d", "--device", required=True, type=click.Path( exists=True, file_okay=True, dir_okay=False, readable=True, path_type=Path ), help="Path to the serial device file. Example: /dev/ttyACM0", ) @click.option( "-b", "--baudrate", required=False, type=click.IntRange(min=1), default=115200, help="Baudrate of the serial connection. Default: 115200", ) @click.option( "-rxm", "--rxm-log-path", required=True, type=click.Path( exists=False, file_okay=True, dir_okay=False, writable=True, path_type=Path ), help="Path to the RXM-RAWX log file. Example: /tmp/rxm_rawx.ubx", ) @click.option( "-pvt", "--pvt-log-path", required=True, type=click.Path( exists=False, file_okay=True, dir_okay=False, writable=True, path_type=Path ), help="Path to the NAV-PVT log file. Example: /tmp/nav_pvt.ubx", ) @click.option( "-t", "--time", required=False, type=click.FloatRange(min=1), default=60, help="Time to collect data. Default: 60 seconds", ) @click.option( "-r", "--rate", required=False, type=click.FloatRange(min=1), default=1, help="Rate [Hz] to collect data. Default: 1 Hz", ) def log( ctx: click.Context, device: Path, baudrate: int, rxm_log_path: Path, pvt_log_path: Path, time: float, rate: float, ) -> None: """Loggs the data from the Ublox receiver into UBX files.""" # Get the logger from the context logger = ctx.obj["logger"] # Create a StreamingProfile object profile = StreamingProfile( commands=[NAV_PVT(), RXM_RAWX(), RXM_SFRBX()], device=device, baudrate=baudrate, logger=logger, no_check=False, ) # Sent the rate to the receiver in_ms = int(1000 / rate) # Set the rate with the logger logger.info(f"Setting the rate to {rate} Hz.") # Send the rate to the receiver rateCmd = CFG_RATE().config_command(measRate=in_ms, navRate=1, timeRef=0) # Send the command to the receiver out = profile.controller.send_config_command(rateCmd, wait_for_ack=True) if out.identity != "ACK-ACK": click.echo("Failed to set the rate.") # Exit the program raise click.Abort() # Open the log files with ( rxm_log_path.open("wb") as rxm_log_file, pvt_log_path.open("wb") as pvt_log_file, ): # Start the data collection profile.start() # Create a time space for the data collection TIMESPACE = range(0, int(time), int(DUMP_MESSAGE_WAIT_TIME)) # Create a progress bar for the data collection n = 0 with tqdm.tqdm(total=len(TIMESPACE), desc=f"Collected {n} messages") as pbar: for _ in TIMESPACE: msg_collected = dump_periodic_messages( profile, rxm_log_file, pvt_log_file ) n += msg_collected # Update the progress bar pbar.set_description(f"Collected {n} messages") pbar.update(1) profile.stop() return @main.command(name="pvt-to-csv") @click.pass_context @click.option( "-p", "--pvt-log-path", required=True, type=click.Path( exists=True, file_okay=True, dir_okay=False, readable=True, path_type=Path ), help="Path to the NAV-PVT log file. Example: /tmp/nav_pvt.ubx", ) @click.option( "-c", "--csv-path", required=True, type=click.Path( exists=False, file_okay=True, dir_okay=False, writable=True, path_type=Path ), help="Path to the CSV file. Example: /tmp/nav_pvt.csv", ) def pvt_to_csv(ctx: click.Context, pvt_log_path: Path, csv_path: Path) -> None: """Converts the NAV-PVT log file to a CSV file.""" # Get the logger from the context logger = ctx.obj["logger"] # Open the log file logger.info(f"Opening the log file: {pvt_log_path}") # Create a NAV-PVT object nav_pvt_cmd = NAV_PVT() with pvt_log_path.open("rb") as pvt_log_file: # Create a UBX reader reader = ubx.UBXReader( datastream=pvt_log_file, protfilter=ubx.UBX_PROTOCOL, ) # Loop through the messages data = [] for raw, parsed in reader: # If message is NAV-PVT if parsed.identity == nav_pvt_cmd: data.append(nav_pvt_cmd.parse_ubx_message(parsed)) # Create a DataFrame df = pd.DataFrame(data) # Convert the time column to timestamp timecols = ["year", "month", "day", "hour", "min", "second"] df["timestamp"] = pd.to_datetime( df[timecols].rename( columns={ "year": "year", "month": "month", "day": "day", "hour": "hour", "min": "minute", "seconds": "second", } ) ) # Drop the time columns df.drop(columns=timecols, inplace=True) # Save the DataFrame to a CSV file df.to_csv(csv_path, index=False) return @main.command(name="rxm-to-rinex") @click.pass_context @click.option( "-r", "--rxm-log-path", required=True, type=click.Path( exists=True, file_okay=True, dir_okay=False, readable=True, path_type=Path ), help="Path to the RXM-RAWX log file. Example: /tmp/rxm_rawx.ubx", ) def rxm_to_rinex(ctx: click.Context, rxm_log_path: Path) -> None: """Converts the RXM-RAWX log file to a RINEX file.""" logger = ctx.obj["logger"] logger.info(f"Opening the log file: {rxm_log_path}") command = "convbin" if shutil.which(command) is None: click.echo( f"Conversion command {command} not found on PATH. Please install RTKLIB and add it to PATH." ) click.Abort() return try: logger.info(f"Converting {rxm_log_path} to RINEX format using {command}.") shutil.run([command, str(rxm_log_path)]) logger.info("Conversion completed successfully.") except Exception as e: logger.error(f"An error occurred during conversion: {e}") if __name__ == "__main__": main()