"""Commands for the u-blox module which provides methods for recieving data from the u-blox receiver. Classes: - BaseCommand: Base class for all u-blox commands. - NAV-PVT : Command to get the navigation position velocity time solution. - RXM-RAWX: Command to get the raw Mesurement data range and carrier phase. """ import typing as tp from abc import ABC, abstractmethod import pandas as pd import pyubx2 as ubx __all__ = [ "BaseCommand", "NAV_POSLLH", "NAV_POSECEF", "NAV_PVT", "RXM_RAWX", "RXM_SFRBX", "CFG_RATE", ] class BaseCommand(ABC): """Base class for all u-blox commands.""" def __init__(self, msg_cls: str, msg_id: str) -> None: """Constructor for the BaseCommand class.""" self.msg_cls = msg_cls self.msg_id = msg_id return def __repr__(self) -> str: """Return a string representation of the command.""" return f"{self.msg_cls}-{self.msg_id}" @abstractmethod def config_command(self) -> ubx.UBXMessage: """Return the configuration command message rate.""" pass @abstractmethod def parse_ubx_message(self, ubx_message: ubx.UBXMessage) -> pd.Series: """Return the parsed payload data as a pandas Series.""" pass @abstractmethod def units(self) -> tp.Dict[str, str]: """Return the units for the parsed payload data.""" pass def __eq__(self, value: str) -> bool: """Return True if the string representation of the command is equal to the value.""" return str(self) == value class NAV_POSLLH(BaseCommand): """Class for the NAV-POSLLH command.""" def __init__(self) -> None: """Constructor for the NAV-POSLLH class.""" return super().__init__("NAV", "POSLLH") def config_command(self, on: str = "USB") -> ubx.UBXMessage: """Return the configuration command message rate. Args: on (str): The message rate configuration on channel. Defaults to "USB". Returns: UBXMessage: The configuration command message rate. """ return ubx.UBXMessage.config_set( layers=ubx.SET_LAYER_RAM, transaction=ubx.TXN_NONE, cfgData=[(f"CFG_MSGOUT_UBX_{self.msg_cls}_{self.msg_id}_{on}", 1)], ) def parse_ubx_message(self, ubx_message: ubx.UBXMessage) -> pd.Series: """Return the parsed payload data as a pandas Series. Args: ubx_message (UBXMessage): The UBX message to parse. Returns: Series: The parsed payload data. """ return pd.Series( { "iTOW": ubx_message.iTOW / 1000, "lon": ubx_message.lon, "lat": ubx_message.lat, "height": ubx_message.height / 1000, "hMSL": ubx_message.hMSL / 1000, "hAcc": ubx_message.hAcc / 1000, "vAcc": ubx_message.vAcc / 1000, } ) def units(self) -> tp.Dict[str, str]: """Return the units for the parsed payload data. Returns: Dict[str, str]: The units for the parsed payload data. """ return { "iTOW": "s", "lon": "deg", "lat": "deg", "height": "m", "hMSL": "m", "hAcc": "m", "vAcc": "m", } class NAV_POSECEF(BaseCommand): """Class for the NAV-POSECEF command.""" def __init__(self) -> None: """Constructor for the NAV-POSECEF class.""" return super().__init__("NAV", "POSECEF") def config_command(self, on: str = "USB") -> ubx.UBXMessage: """Return the configuration command message rate. Args: on (str): The message rate configuration on channel. Defaults to "USB". Returns: UBXMessage: The configuration command message rate. """ return ubx.UBXMessage.config_set( layers=ubx.SET_LAYER_RAM, transaction=ubx.TXN_NONE, cfgData=[(f"CFG_MSGOUT_UBX_{self.msg_cls}_{self.msg_id}_{on}", 1)], ) def parse_ubx_message(self, ubx_message: ubx.UBXMessage) -> pd.Series: """Return the parsed payload data as a pandas Series. Args: ubx_message (UBXMessage): The UBX message to parse. Returns: Series: The parsed payload data. """ return pd.Series( { "iTOW": ubx_message.iTOW, "ecefX": ubx_message.ecefX / 100, "ecefY": ubx_message.ecefY / 100, "ecefZ": ubx_message.ecefZ / 100, "pAcc": ubx_message.pAcc / 100, } ) def units(self) -> tp.Dict[str, str]: """Return the units for the parsed payload data. Returns: Dict[str, str]: The units for the parsed payload data. """ return { "iTOW": "ms", "ecefX": "m", "ecefY": "m", "ecefZ": "m", "pAcc": "m", } class NAV_PVT(BaseCommand): """Class for the NAV-PVT command.""" def __init__(self) -> None: """Constructor for the NAV-PVT class.""" return super().__init__("NAV", "PVT") def config_command(self, on: str = "USB") -> ubx.UBXMessage: """Return the configuration command message rate. Args: on (str): The message rate configuration on channel. Defaults to "USB". Returns: UBXMessage: The configuration command message rate. """ return ubx.UBXMessage.config_set( layers=ubx.SET_LAYER_RAM, transaction=ubx.TXN_NONE, cfgData=[(f"CFG_MSGOUT_UBX_{self.msg_cls}_{self.msg_id}_{on}", 1)], ) def parse_ubx_message(self, ubx_message: ubx.UBXMessage) -> pd.Series: """Return the parsed payload data as a pandas Series. Args: ubx_message (UBXMessage): The UBX message to parse. Returns: Series: The parsed payload data. """ return pd.Series( { "iTOW": ubx_message.iTOW, "year": ubx_message.year, "month": ubx_message.month, "day": ubx_message.day, "hour": ubx_message.hour, "min": ubx_message.min, "second": ubx_message.second, "validDate": ubx_message.validDate, "validTime": ubx_message.validTime, "fullyResolved": ubx_message.fullyResolved, "validMag": ubx_message.validMag, "tAcc": ubx_message.tAcc, "nano": ubx_message.nano, "fixType": ubx_message.fixType, "gnssFixOk": ubx_message.gnssFixOk, "psmState": ubx_message.psmState, "headVehValid": ubx_message.headVehValid, "carrSoln": ubx_message.carrSoln, "confirmedAvai": ubx_message.confirmedAvai, "confirmedDate": ubx_message.confirmedDate, "confirmedTime": ubx_message.confirmedTime, "numSV": ubx_message.numSV, "lon": ubx_message.lon, "lat": ubx_message.lat, "height": ubx_message.height / 1000, "hMSL": ubx_message.hMSL / 1000, "hAcc": ubx_message.hAcc / 1000, "vAcc": ubx_message.vAcc / 1000, "velN": ubx_message.velN / 1000, "velE": ubx_message.velE / 1000, "velD": ubx_message.velD / 1000, "gSpeed": ubx_message.gSpeed / 1000, "headMot": ubx_message.headMot, "sAcc": ubx_message.sAcc, "headAcc": ubx_message.headAcc, "pDOP": ubx_message.pDOP, "invalidLlh": ubx_message.invalidLlh, "lastCorrectionAge": ubx_message.lastCorrectionAge, "reserved0": ubx_message.reserved0, "headVeh": ubx_message.headVeh, "magDec": ubx_message.magDec, "magAcc": ubx_message.magAcc, } ) def units(self) -> tp.Dict[str, str]: """Return the units for the parsed payload data. Returns: Dict[str, str]: The units for the parsed payload data. """ return { "iTOW": "ms", "year": "year", "month": "month", "day": "day", "hour": "hour", "min": "min", "second": "s", "validDate": "bool", "validTime": "bool", "fullyResolved": "bool", "validMag": "bool", "tAcc": "ns", "nano": "ns", "fixType": "enum", "gnssFixOk": "bool", "difSoln": "bool", "psmState": "enum", "headVehValid": "bool", "carrSoln": "enum", "confirmedAvai": "bool", "confirmedDate": "year", "confirmedTime": "s", "numSV": "count", "lon": "deg", "lat": "deg", "height": "m", "hMSL": "m", "hAcc": "m", "vAcc": "m", "velN": "m/s", "velE": "m/s", "velD": "m/s", "gSpeed": "m/s", "headMot": "deg", # Check this "sAcc": "m/s", "headAcc": "deg", # Check this "pDOP": "unitless", "invalidLlh": "bool", "lastCorrectionAge": "s", "reserved0": "reserved", "headVeh": "deg", "magDec": "deg", "magAcc": "deg", } class RXM_RAWX(BaseCommand): """Class for the RXM-RAWX command to get the raw Mesurement data range and carrier phase.""" def __init__(self) -> None: """Constructor for the RXM-RAWX class.""" super().__init__("RXM", "RAWX") def config_command(self, on: str = "USB") -> ubx.UBXMessage: """Return the configuration command message rate. Args: on (str): The message rate configuration on channel. Defaults to "USB". Returns: UBXMessage: The configuration command message rate. """ return ubx.UBXMessage.config_set( layers=ubx.SET_LAYER_RAM, transaction=ubx.TXN_NONE, cfgData=[(f"CFG_MSGOUT_UBX_{self.msg_cls}_{self.msg_id}_{on}", 1)], ) def parse_ubx_message(self, ubx_message: ubx.UBXMessage) -> pd.DataFrame: """Return the parsed payload data as a pandas DataFrame. Args: ubx_message (UBXMessage): The UBX message to parse. Returns: DataFrame: The parsed payload data. """ # Create a list to store the data data = [] # Loop through the Mesurements rcvTow = ubx_message.rcvTow rcvWeek = ubx_message.week leapS = ubx_message.leapS # numMeas = ubx_message.numMeas leapSec = ubx_message.leapSec clkReset = ubx_message.clkReset # Loop through the numMes for id in range(1, numMeas + 1): id = str(id).zfill(2) subData = {} subData["prMes"] = getattr(ubx_message, f"prMes_{id}") subData["cpMes"] = getattr(ubx_message, f"cpMes_{id}") subData["doMes"] = getattr(ubx_message, f"doMes_{id}") subData["gnssId"] = getattr(ubx_message, f"gnssId_{id}") subData["svId"] = getattr(ubx_message, f"svId_{id}") subData["sigId"] = getattr(ubx_message, f"sigId_{id}") subData["freqId"] = getattr(ubx_message, f"freqId_{id}") subData["locktime"] = getattr(ubx_message, f"locktime_{id}") subData["cno"] = getattr(ubx_message, f"cno_{id}") subData["prStd"] = getattr(ubx_message, f"prStd_{id}") subData["cpStd"] = getattr(ubx_message, f"cpStd_{id}") subData["doStd"] = getattr(ubx_message, f"doStd_{id}") subData["prValid"] = getattr(ubx_message, f"prValid_{id}") subData["halfCyc"] = getattr(ubx_message, f"halfCyc_{id}") subData["subHalfCyc"] = getattr(ubx_message, f"subHalfCyc_{id}") subData["rcvTow"] = rcvTow subData["rcvWeek"] = rcvWeek subData["clockReset"] = clkReset subData["leapS"] = leapS subData["leapSec"] = leapSec data.append(subData) return pd.DataFrame(data) def units(self) -> tp.Dict[str, str]: """Return the units for the parsed payload data. Returns: Dict[str, str]: The units for the parsed payload data. """ return { "rcvTow": "s", "rcvWeek": "week", "leapS": "s", "numMes": "count", "recStat": "enum", "leapSec": "s", "clkReset": "bool", "version": "version", "reserved0": "reserved", "prMes": "m", "cpMes": "cycles", "doMes": "m", "gnssId": "enum", "svId": "count", "sigId": "enum", "freqId": "enum", "locktime": "ms", "cno": "dBHz", "prStdev": "m", "prStd": "cycles", "cpStdev": "m", "cpStd": "cycles", "doStdev": "m", "doStd": "cycles", "trkStat": "enum", "prValid": "bool", "halfCyc": "cycles", "subHalfCyc": "cycles", "reserved": "reserved", } class RXM_SFRBX(BaseCommand): """Class for the RXM-SFRBX command.""" def __init__(self) -> None: """Constructor for the RXM-SFRBX class.""" super().__init__("RXM", "SFRBX") def config_command(self, on: str = "USB") -> ubx.UBXMessage: """Return the configuration command message rate. Args: on (str): The message rate configuration on channel. Defaults to "USB". Returns: UBXMessage: The configuration command message rate. """ return ubx.UBXMessage.config_set( layers=ubx.SET_LAYER_RAM, transaction=ubx.TXN_NONE, cfgData=[(f"CFG_MSGOUT_UBX_{self.msg_cls}_{self.msg_id}_{on}", 1)], ) def parse_ubx_message(self, ubx_message: ubx.UBXMessage) -> pd.DataFrame: """Return the parsed payload data as a pandas DataFrame.""" raise NotImplementedError( "The parse_ubx_message method is not implemented for RXM-SFRBX." ) def units(self) -> tp.Dict[str, str]: """Return the units for the parsed payload data. Returns: Dict[str, str]: The units for the parsed payload data. """ raise NotImplementedError("The units method is not implemented for RXM-SFRBX.") class CFG_RATE(BaseCommand): """Class for the CFG-RATE command.""" def __init__(self) -> None: """Constructor for the CFG-RATE class.""" super().__init__("CFG", "RATE") def config_command( self, measRate: int = 1000, navRate: int = 1, timeRef: int = 0 ) -> ubx.UBXMessage: """Return the configuration command message rate. Args: measRate (int): Measurement rate in ms. Defaults to 1000. navRate (int): Navigation rate in cycles. Defaults to 1. timeRef (int): Time reference. Defaults to 0. Returns: UBXMessage: The configuration command message rate. """ return ubx.UBXMessage.config_set( layers=ubx.SET_LAYER_RAM, transaction=ubx.TXN_NONE, cfgData=[ ("CFG_RATE_MEAS", measRate), ("CFG_RATE_NAV", navRate), ("CFG_RATE_TIMEREF", timeRef), ], ) def parse_ubx_message(self, ubx_message: ubx.UBXMessage) -> pd.DataFrame: """Return the parsed payload data as a pandas DataFrame.""" raise NotImplementedError( "The parse_ubx_message method is not implemented for CFG-RATE." ) def units(self) -> tp.Dict[str, str]: """Return the units for the parsed payload data. Returns: Dict[str, str]: The units for the parsed payload data. """ raise NotImplementedError("The units method is not implemented for CFG-RATE.")