Navigator / src / navigator / utils / ublox / commands.py
commands.py
Raw
"""Commands for the u-blox module which provides methods for recieving data from the u-blox receiver.

Classes:
    - BaseCommand: Base class for all u-blox commands.
    - NAV-PVT : Command to get the navigation position velocity time solution.
    - RXM-RAWX: Command to get the raw Mesurement data range and carrier phase.    

"""

import typing as tp
from abc import ABC, abstractmethod

import pandas as pd
import pyubx2 as ubx

__all__ = [
    "BaseCommand",
    "NAV_POSLLH",
    "NAV_POSECEF",
    "NAV_PVT",
    "RXM_RAWX",
    "RXM_SFRBX",
    "CFG_RATE",
]


class BaseCommand(ABC):
    """Base class for all u-blox commands."""

    def __init__(self, msg_cls: str, msg_id: str) -> None:
        """Constructor for the BaseCommand class."""
        self.msg_cls = msg_cls
        self.msg_id = msg_id
        return

    def __repr__(self) -> str:
        """Return a string representation of the command."""
        return f"{self.msg_cls}-{self.msg_id}"

    @abstractmethod
    def config_command(self) -> ubx.UBXMessage:
        """Return the configuration command message rate."""
        pass

    @abstractmethod
    def parse_ubx_message(self, ubx_message: ubx.UBXMessage) -> pd.Series:
        """Return the parsed payload data as a pandas Series."""
        pass

    @abstractmethod
    def units(self) -> tp.Dict[str, str]:
        """Return the units for the parsed payload data."""
        pass

    def __eq__(self, value: str) -> bool:
        """Return True if the string representation of the command is equal to the value."""
        return str(self) == value


class NAV_POSLLH(BaseCommand):
    """Class for the NAV-POSLLH command."""

    def __init__(self) -> None:
        """Constructor for the NAV-POSLLH class."""
        return super().__init__("NAV", "POSLLH")

    def config_command(self, on: str = "USB") -> ubx.UBXMessage:
        """Return the configuration command message rate.

        Args:
            on (str): The message rate configuration on channel. Defaults to "USB".

        Returns:
            UBXMessage: The configuration command message rate.
        """
        return ubx.UBXMessage.config_set(
            layers=ubx.SET_LAYER_RAM,
            transaction=ubx.TXN_NONE,
            cfgData=[(f"CFG_MSGOUT_UBX_{self.msg_cls}_{self.msg_id}_{on}", 1)],
        )

    def parse_ubx_message(self, ubx_message: ubx.UBXMessage) -> pd.Series:
        """Return the parsed payload data as a pandas Series.

        Args:
            ubx_message (UBXMessage): The UBX message to parse.

        Returns:
            Series: The parsed payload data.
        """
        return pd.Series(
            {
                "iTOW": ubx_message.iTOW / 1000,
                "lon": ubx_message.lon,
                "lat": ubx_message.lat,
                "height": ubx_message.height / 1000,
                "hMSL": ubx_message.hMSL / 1000,
                "hAcc": ubx_message.hAcc / 1000,
                "vAcc": ubx_message.vAcc / 1000,
            }
        )

    def units(self) -> tp.Dict[str, str]:
        """Return the units for the parsed payload data.

        Returns:
            Dict[str, str]: The units for the parsed payload data.
        """
        return {
            "iTOW": "s",
            "lon": "deg",
            "lat": "deg",
            "height": "m",
            "hMSL": "m",
            "hAcc": "m",
            "vAcc": "m",
        }


class NAV_POSECEF(BaseCommand):
    """Class for the NAV-POSECEF command."""

    def __init__(self) -> None:
        """Constructor for the NAV-POSECEF class."""
        return super().__init__("NAV", "POSECEF")

    def config_command(self, on: str = "USB") -> ubx.UBXMessage:
        """Return the configuration command message rate.

        Args:
            on (str): The message rate configuration on channel. Defaults to "USB".

        Returns:
            UBXMessage: The configuration command message rate.
        """
        return ubx.UBXMessage.config_set(
            layers=ubx.SET_LAYER_RAM,
            transaction=ubx.TXN_NONE,
            cfgData=[(f"CFG_MSGOUT_UBX_{self.msg_cls}_{self.msg_id}_{on}", 1)],
        )

    def parse_ubx_message(self, ubx_message: ubx.UBXMessage) -> pd.Series:
        """Return the parsed payload data as a pandas Series.

        Args:
            ubx_message (UBXMessage): The UBX message to parse.

        Returns:
            Series: The parsed payload data.
        """
        return pd.Series(
            {
                "iTOW": ubx_message.iTOW,
                "ecefX": ubx_message.ecefX / 100,
                "ecefY": ubx_message.ecefY / 100,
                "ecefZ": ubx_message.ecefZ / 100,
                "pAcc": ubx_message.pAcc / 100,
            }
        )

    def units(self) -> tp.Dict[str, str]:
        """Return the units for the parsed payload data.

        Returns:
            Dict[str, str]: The units for the parsed payload data.
        """
        return {
            "iTOW": "ms",
            "ecefX": "m",
            "ecefY": "m",
            "ecefZ": "m",
            "pAcc": "m",
        }


class NAV_PVT(BaseCommand):
    """Class for the NAV-PVT command."""

    def __init__(self) -> None:
        """Constructor for the NAV-PVT class."""
        return super().__init__("NAV", "PVT")

    def config_command(self, on: str = "USB") -> ubx.UBXMessage:
        """Return the configuration command message rate.

        Args:
            on (str): The message rate configuration on channel. Defaults to "USB".

        Returns:
            UBXMessage: The configuration command message rate.
        """
        return ubx.UBXMessage.config_set(
            layers=ubx.SET_LAYER_RAM,
            transaction=ubx.TXN_NONE,
            cfgData=[(f"CFG_MSGOUT_UBX_{self.msg_cls}_{self.msg_id}_{on}", 1)],
        )

    def parse_ubx_message(self, ubx_message: ubx.UBXMessage) -> pd.Series:
        """Return the parsed payload data as a pandas Series.

        Args:
            ubx_message (UBXMessage): The UBX message to parse.

        Returns:
            Series: The parsed payload data.
        """
        return pd.Series(
            {
                "iTOW": ubx_message.iTOW,
                "year": ubx_message.year,
                "month": ubx_message.month,
                "day": ubx_message.day,
                "hour": ubx_message.hour,
                "min": ubx_message.min,
                "second": ubx_message.second,
                "validDate": ubx_message.validDate,
                "validTime": ubx_message.validTime,
                "fullyResolved": ubx_message.fullyResolved,
                "validMag": ubx_message.validMag,
                "tAcc": ubx_message.tAcc,
                "nano": ubx_message.nano,
                "fixType": ubx_message.fixType,
                "gnssFixOk": ubx_message.gnssFixOk,
                "psmState": ubx_message.psmState,
                "headVehValid": ubx_message.headVehValid,
                "carrSoln": ubx_message.carrSoln,
                "confirmedAvai": ubx_message.confirmedAvai,
                "confirmedDate": ubx_message.confirmedDate,
                "confirmedTime": ubx_message.confirmedTime,
                "numSV": ubx_message.numSV,
                "lon": ubx_message.lon,
                "lat": ubx_message.lat,
                "height": ubx_message.height / 1000,
                "hMSL": ubx_message.hMSL / 1000,
                "hAcc": ubx_message.hAcc / 1000,
                "vAcc": ubx_message.vAcc / 1000,
                "velN": ubx_message.velN / 1000,
                "velE": ubx_message.velE / 1000,
                "velD": ubx_message.velD / 1000,
                "gSpeed": ubx_message.gSpeed / 1000,
                "headMot": ubx_message.headMot,
                "sAcc": ubx_message.sAcc,
                "headAcc": ubx_message.headAcc,
                "pDOP": ubx_message.pDOP,
                "invalidLlh": ubx_message.invalidLlh,
                "lastCorrectionAge": ubx_message.lastCorrectionAge,
                "reserved0": ubx_message.reserved0,
                "headVeh": ubx_message.headVeh,
                "magDec": ubx_message.magDec,
                "magAcc": ubx_message.magAcc,
            }
        )

    def units(self) -> tp.Dict[str, str]:
        """Return the units for the parsed payload data.

        Returns:
            Dict[str, str]: The units for the parsed payload data.
        """
        return {
            "iTOW": "ms",
            "year": "year",
            "month": "month",
            "day": "day",
            "hour": "hour",
            "min": "min",
            "second": "s",
            "validDate": "bool",
            "validTime": "bool",
            "fullyResolved": "bool",
            "validMag": "bool",
            "tAcc": "ns",
            "nano": "ns",
            "fixType": "enum",
            "gnssFixOk": "bool",
            "difSoln": "bool",
            "psmState": "enum",
            "headVehValid": "bool",
            "carrSoln": "enum",
            "confirmedAvai": "bool",
            "confirmedDate": "year",
            "confirmedTime": "s",
            "numSV": "count",
            "lon": "deg",
            "lat": "deg",
            "height": "m",
            "hMSL": "m",
            "hAcc": "m",
            "vAcc": "m",
            "velN": "m/s",
            "velE": "m/s",
            "velD": "m/s",
            "gSpeed": "m/s",
            "headMot": "deg",  # Check this
            "sAcc": "m/s",
            "headAcc": "deg",  # Check this
            "pDOP": "unitless",
            "invalidLlh": "bool",
            "lastCorrectionAge": "s",
            "reserved0": "reserved",
            "headVeh": "deg",
            "magDec": "deg",
            "magAcc": "deg",
        }


class RXM_RAWX(BaseCommand):
    """Class for the RXM-RAWX command to get the raw Mesurement data range and carrier phase."""

    def __init__(self) -> None:
        """Constructor for the RXM-RAWX class."""
        super().__init__("RXM", "RAWX")

    def config_command(self, on: str = "USB") -> ubx.UBXMessage:
        """Return the configuration command message rate.

        Args:
            on (str): The message rate configuration on channel. Defaults to "USB".

        Returns:
            UBXMessage: The configuration command message rate.
        """
        return ubx.UBXMessage.config_set(
            layers=ubx.SET_LAYER_RAM,
            transaction=ubx.TXN_NONE,
            cfgData=[(f"CFG_MSGOUT_UBX_{self.msg_cls}_{self.msg_id}_{on}", 1)],
        )

    def parse_ubx_message(self, ubx_message: ubx.UBXMessage) -> pd.DataFrame:
        """Return the parsed payload data as a pandas DataFrame.

        Args:
            ubx_message (UBXMessage): The UBX message to parse.

        Returns:
            DataFrame: The parsed payload data.
        """
        # Create a list to store the data
        data = []

        # Loop through the Mesurements
        rcvTow = ubx_message.rcvTow
        rcvWeek = ubx_message.week
        leapS = ubx_message.leapS  #
        numMeas = ubx_message.numMeas
        leapSec = ubx_message.leapSec
        clkReset = ubx_message.clkReset

        # Loop through the numMes
        for id in range(1, numMeas + 1):
            id = str(id).zfill(2)
            subData = {}
            subData["prMes"] = getattr(ubx_message, f"prMes_{id}")
            subData["cpMes"] = getattr(ubx_message, f"cpMes_{id}")
            subData["doMes"] = getattr(ubx_message, f"doMes_{id}")
            subData["gnssId"] = getattr(ubx_message, f"gnssId_{id}")
            subData["svId"] = getattr(ubx_message, f"svId_{id}")
            subData["sigId"] = getattr(ubx_message, f"sigId_{id}")
            subData["freqId"] = getattr(ubx_message, f"freqId_{id}")
            subData["locktime"] = getattr(ubx_message, f"locktime_{id}")
            subData["cno"] = getattr(ubx_message, f"cno_{id}")
            subData["prStd"] = getattr(ubx_message, f"prStd_{id}")
            subData["cpStd"] = getattr(ubx_message, f"cpStd_{id}")
            subData["doStd"] = getattr(ubx_message, f"doStd_{id}")
            subData["prValid"] = getattr(ubx_message, f"prValid_{id}")
            subData["halfCyc"] = getattr(ubx_message, f"halfCyc_{id}")
            subData["subHalfCyc"] = getattr(ubx_message, f"subHalfCyc_{id}")
            subData["rcvTow"] = rcvTow
            subData["rcvWeek"] = rcvWeek
            subData["clockReset"] = clkReset
            subData["leapS"] = leapS
            subData["leapSec"] = leapSec

            data.append(subData)

        return pd.DataFrame(data)

    def units(self) -> tp.Dict[str, str]:
        """Return the units for the parsed payload data.

        Returns:
            Dict[str, str]: The units for the parsed payload data.
        """
        return {
            "rcvTow": "s",
            "rcvWeek": "week",
            "leapS": "s",
            "numMes": "count",
            "recStat": "enum",
            "leapSec": "s",
            "clkReset": "bool",
            "version": "version",
            "reserved0": "reserved",
            "prMes": "m",
            "cpMes": "cycles",
            "doMes": "m",
            "gnssId": "enum",
            "svId": "count",
            "sigId": "enum",
            "freqId": "enum",
            "locktime": "ms",
            "cno": "dBHz",
            "prStdev": "m",
            "prStd": "cycles",
            "cpStdev": "m",
            "cpStd": "cycles",
            "doStdev": "m",
            "doStd": "cycles",
            "trkStat": "enum",
            "prValid": "bool",
            "halfCyc": "cycles",
            "subHalfCyc": "cycles",
            "reserved": "reserved",
        }


class RXM_SFRBX(BaseCommand):
    """Class for the RXM-SFRBX command."""

    def __init__(self) -> None:
        """Constructor for the RXM-SFRBX class."""
        super().__init__("RXM", "SFRBX")

    def config_command(self, on: str = "USB") -> ubx.UBXMessage:
        """Return the configuration command message rate.

        Args:
            on (str): The message rate configuration on channel. Defaults to "USB".

        Returns:
            UBXMessage: The configuration command message rate.
        """
        return ubx.UBXMessage.config_set(
            layers=ubx.SET_LAYER_RAM,
            transaction=ubx.TXN_NONE,
            cfgData=[(f"CFG_MSGOUT_UBX_{self.msg_cls}_{self.msg_id}_{on}", 1)],
        )

    def parse_ubx_message(self, ubx_message: ubx.UBXMessage) -> pd.DataFrame:
        """Return the parsed payload data as a pandas DataFrame."""
        raise NotImplementedError(
            "The parse_ubx_message method is not implemented for RXM-SFRBX."
        )

    def units(self) -> tp.Dict[str, str]:
        """Return the units for the parsed payload data.

        Returns:
            Dict[str, str]: The units for the parsed payload data.
        """
        raise NotImplementedError("The units method is not implemented for RXM-SFRBX.")


class CFG_RATE(BaseCommand):
    """Class for the CFG-RATE command."""

    def __init__(self) -> None:
        """Constructor for the CFG-RATE class."""
        super().__init__("CFG", "RATE")

    def config_command(
        self, measRate: int = 1000, navRate: int = 1, timeRef: int = 0
    ) -> ubx.UBXMessage:
        """Return the configuration command message rate.

        Args:
            measRate (int): Measurement rate in ms. Defaults to 1000.
            navRate (int): Navigation rate in cycles. Defaults to 1.
            timeRef (int): Time reference. Defaults to 0.

        Returns:
            UBXMessage: The configuration command message rate.
        """
        return ubx.UBXMessage.config_set(
            layers=ubx.SET_LAYER_RAM,
            transaction=ubx.TXN_NONE,
            cfgData=[
                ("CFG_RATE_MEAS", measRate),
                ("CFG_RATE_NAV", navRate),
                ("CFG_RATE_TIMEREF", timeRef),
            ],
        )

    def parse_ubx_message(self, ubx_message: ubx.UBXMessage) -> pd.DataFrame:
        """Return the parsed payload data as a pandas DataFrame."""
        raise NotImplementedError(
            "The parse_ubx_message method is not implemented for CFG-RATE."
        )

    def units(self) -> tp.Dict[str, str]:
        """Return the units for the parsed payload data.

        Returns:
            Dict[str, str]: The units for the parsed payload data.
        """
        raise NotImplementedError("The units method is not implemented for CFG-RATE.")