Navigator / src / navigator / utils / scripts / ublox_reciever.py
ublox_reciever.py
Raw
"""Controller for the Ublox receiver."""

import shutil
from pathlib import Path
from time import sleep

import click
import pandas as pd
import pyubx2 as ubx
import tqdm

from ...logger.logger import get_logger
from ..ublox.commands import CFG_RATE, NAV_PVT, RXM_RAWX, RXM_SFRBX
from ..ublox.profile import StreamingProfile

DEFAULT_COMMAND_WAIT_TIME = 0.01
DUMP_MESSAGE_WAIT_TIME = 2.0
WARMUP_TIME = 1


@click.group(invoke_without_command=True, no_args_is_help=True)
@click.pass_context
@click.option(
    "-v",
    "--verbose",
    is_flag=True,
    help="Enable verbose mode.",
    default=False,
)
def main(
    ctx: click.Context,
    verbose: bool = False,
) -> None:
    """Utility CLI interface for the Ublox receiver."""
    # If verbose mode is enabled, set the logger
    logger = get_logger("ublox", dummy=not verbose)

    # Add the logger to the context
    ctx.ensure_object(dict)

    # Add the logger to the context
    ctx.obj["logger"] = logger

    return


def dump_periodic_messages(
    profile: StreamingProfile, rxm_log_file: Path, pvt_log_file: Path
) -> int:
    """Dumps the periodic messages to the log files.

    Args:
        profile (StreamingProfile): The streaming profile object.
        rxm_log_file (Path): The path to the RXM-RAWX log file.
        pvt_log_file (Path): The path to the NAV-PVT log file.

    Returns:
        int: The number of messages collected and dumped.
    """
    # WAIL for the command to be executed
    sleep(DUMP_MESSAGE_WAIT_TIME + DEFAULT_COMMAND_WAIT_TIME)

    # Get the data
    num_msg, data = profile.collect()

    # Write the data to the log files
    for msg in data["RXM-RAWX"]:
        rxm_log_file.write(msg.serialize())

    for msg in data["NAV-PVT"]:
        pvt_log_file.write(msg.serialize())

    for msg in data["RXM-SFRBX"]:
        rxm_log_file.write(msg.serialize())

    sleep(DEFAULT_COMMAND_WAIT_TIME)

    return num_msg


@main.command(name="log")
@click.pass_context
@click.option(
    "-d",
    "--device",
    required=True,
    type=click.Path(
        exists=True, file_okay=True, dir_okay=False, readable=True, path_type=Path
    ),
    help="Path to the serial device file. Example: /dev/ttyACM0",
)
@click.option(
    "-b",
    "--baudrate",
    required=False,
    type=click.IntRange(min=1),
    default=115200,
    help="Baudrate of the serial connection. Default: 115200",
)
@click.option(
    "-rxm",
    "--rxm-log-path",
    required=True,
    type=click.Path(
        exists=False, file_okay=True, dir_okay=False, writable=True, path_type=Path
    ),
    help="Path to the RXM-RAWX log file. Example: /tmp/rxm_rawx.ubx",
)
@click.option(
    "-pvt",
    "--pvt-log-path",
    required=True,
    type=click.Path(
        exists=False, file_okay=True, dir_okay=False, writable=True, path_type=Path
    ),
    help="Path to the NAV-PVT log file. Example: /tmp/nav_pvt.ubx",
)
@click.option(
    "-t",
    "--time",
    required=False,
    type=click.FloatRange(min=1),
    default=60,
    help="Time to collect data. Default: 60 seconds",
)
@click.option(
    "-r",
    "--rate",
    required=False,
    type=click.FloatRange(min=1),
    default=1,
    help="Rate [Hz] to collect data. Default: 1 Hz",
)
def log(
    ctx: click.Context,
    device: Path,
    baudrate: int,
    rxm_log_path: Path,
    pvt_log_path: Path,
    time: float,
    rate: float,
) -> None:
    """Loggs the data from the Ublox receiver into UBX files."""
    # Get the logger from the context
    logger = ctx.obj["logger"]

    # Create a StreamingProfile object
    profile = StreamingProfile(
        commands=[NAV_PVT(), RXM_RAWX(), RXM_SFRBX()],
        device=device,
        baudrate=baudrate,
        logger=logger,
        no_check=False,
    )

    # Sent the rate to the receiver
    in_ms = int(1000 / rate)

    # Set the rate with the logger
    logger.info(f"Setting the rate to {rate} Hz.")

    # Send the rate to the receiver
    rateCmd = CFG_RATE().config_command(measRate=in_ms, navRate=1, timeRef=0)

    # Send the command to the receiver
    out = profile.controller.send_config_command(rateCmd, wait_for_ack=True)
    if out.identity != "ACK-ACK":
        click.echo("Failed to set the rate.")
        # Exit the program
        raise click.Abort()

    # Open the log files
    with (
        rxm_log_path.open("wb") as rxm_log_file,
        pvt_log_path.open("wb") as pvt_log_file,
    ):
        # Start the data collection
        profile.start()

        # Create a time space for the data collection
        TIMESPACE = range(0, int(time), int(DUMP_MESSAGE_WAIT_TIME))

        # Create a progress bar for the data collection
        n = 0
        with tqdm.tqdm(total=len(TIMESPACE), desc=f"Collected {n} messages") as pbar:
            for _ in TIMESPACE:
                msg_collected = dump_periodic_messages(
                    profile, rxm_log_file, pvt_log_file
                )
                n += msg_collected
                # Update the progress bar
                pbar.set_description(f"Collected {n} messages")
                pbar.update(1)

        profile.stop()

    return


@main.command(name="pvt-to-csv")
@click.pass_context
@click.option(
    "-p",
    "--pvt-log-path",
    required=True,
    type=click.Path(
        exists=True, file_okay=True, dir_okay=False, readable=True, path_type=Path
    ),
    help="Path to the NAV-PVT log file. Example: /tmp/nav_pvt.ubx",
)
@click.option(
    "-c",
    "--csv-path",
    required=True,
    type=click.Path(
        exists=False, file_okay=True, dir_okay=False, writable=True, path_type=Path
    ),
    help="Path to the CSV file. Example: /tmp/nav_pvt.csv",
)
def pvt_to_csv(ctx: click.Context, pvt_log_path: Path, csv_path: Path) -> None:
    """Converts the NAV-PVT log file to a CSV file."""
    # Get the logger from the context
    logger = ctx.obj["logger"]

    # Open the log file
    logger.info(f"Opening the log file: {pvt_log_path}")

    # Create a NAV-PVT object
    nav_pvt_cmd = NAV_PVT()

    with pvt_log_path.open("rb") as pvt_log_file:
        # Create a UBX reader
        reader = ubx.UBXReader(
            datastream=pvt_log_file,
            protfilter=ubx.UBX_PROTOCOL,
        )

        # Loop through the messages
        data = []

        for raw, parsed in reader:
            # If message is NAV-PVT
            if parsed.identity == nav_pvt_cmd:
                data.append(nav_pvt_cmd.parse_ubx_message(parsed))

    # Create a DataFrame
    df = pd.DataFrame(data)

    # Convert the time column to timestamp
    timecols = ["year", "month", "day", "hour", "min", "second"]

    df["timestamp"] = pd.to_datetime(
        df[timecols].rename(
            columns={
                "year": "year",
                "month": "month",
                "day": "day",
                "hour": "hour",
                "min": "minute",
                "seconds": "second",
            }
        )
    )

    # Drop the time columns
    df.drop(columns=timecols, inplace=True)

    # Save the DataFrame to a CSV file
    df.to_csv(csv_path, index=False)

    return


@main.command(name="rxm-to-rinex")
@click.pass_context
@click.option(
    "-r",
    "--rxm-log-path",
    required=True,
    type=click.Path(
        exists=True, file_okay=True, dir_okay=False, readable=True, path_type=Path
    ),
    help="Path to the RXM-RAWX log file. Example: /tmp/rxm_rawx.ubx",
)
def rxm_to_rinex(ctx: click.Context, rxm_log_path: Path) -> None:
    """Converts the RXM-RAWX log file to a RINEX file."""
    logger = ctx.obj["logger"]

    logger.info(f"Opening the log file: {rxm_log_path}")

    command = "convbin"

    if shutil.which(command) is None:
        click.echo(
            f"Conversion command {command} not found on PATH. Please install RTKLIB and add it to PATH."
        )
        click.Abort()
        return

    try:
        logger.info(f"Converting {rxm_log_path} to RINEX format using {command}.")
        shutil.run([command, str(rxm_log_path)])
        logger.info("Conversion completed successfully.")
    except Exception as e:
        logger.error(f"An error occurred during conversion: {e}")


if __name__ == "__main__":
    main()