"""Ublox Receiver Streaming Profile Module.
This module defines the StreamingProfile class, which provides methods for controlling Ublox receivers
through serial communication streams. The StreamingProfile class is designed to collect and manage data
from the receiver, including navigation solution data and RAWX observation data for RINEX conversion.
It also includes functionality for sending commands to configure the receiver's data collection behavior.
Classes:
StreamingProfile: Manages the data collection and command communication with the Ublox receiver.
Constants:
COLLECTION_TOUT: A timeout value used for data collection operations.
"""
import typing as tp
from logging import Logger
from pathlib import Path
import pyubx2 as ubx
from serial import Serial
from .commands import BaseCommand
from .controller import Controller
__all__ = ["StreamingProfile"]
COLLECTION_TOUT = 1.2
class StreamingProfile:
"""StreamingProfile class for the Ublox receiver.
This class provides methods for controlling Ublox receivers through Serial communication streams.
The streaming profile is used to collect two types of data:
- Navigation solution data
- RAWX observation data for RINEX conversion
Attributes:
PVT_MESSAGE_CMD: Command to set the PVT message rate.
RAWX_MESSAGE_CMD: Command to set the RAWX message rate.
serial_port (Serial): The serial port for communication with the receiver.
controller (Controller): The controller object to manage communication and data collection.
Methods:
__init__(device, baudrate=115200, timeout=3, logger=None, no_check=False): Initializes the StreamingProfile.
send_collection_command(): Sends the collection command to the controller.
collect(n): Collects data from the receiver.
start(): Starts the collection of data.
stop(): Stops the collection of data.
"""
def __init__(
self,
commands: tp.List[BaseCommand],
device: Path,
baudrate: int = 115200,
timeout: int = 3,
logger: tp.Optional[Logger] = None,
no_check: bool = False,
) -> None:
"""Initializes the StreamingProfile object.
Args:
commands (List[BaseCommand]): The list of commands to send to the receiver.
device (Path): The path to the device file.
baudrate (int, optional): The baudrate of the serial connection. Defaults to 115200.
timeout (int, optional): The timeout of the serial connection. Defaults to 3.
logger (Optional[Logger], optional): The logger object to log I/O data. Defaults to None.
no_check (bool, optional): If True, disables all default message rates. Defaults to False.
Raises:
ValueError: If the device file does not exist.
Note:
The device file must exist and should be something like "/dev/ttyACM0".
"""
# Store the commands list
self.commands = commands
# Create a serial port
self.serial_port = Serial(str(device), baudrate, timeout=timeout)
# Create a controller object
self.controller = Controller(self.serial_port, logger=logger, no_check=no_check)
def send_message_recording_command(self) -> None:
"""Sends the collection command to the controller.
Returns:
None
"""
# Send the collection command to activate the data collection
for command in self.commands:
self.controller.send_config_command(command=command.config_command())
def collect(
self,
parse: bool = False,
) -> tuple[
int,
dict[str, list[ubx.UBXMessage]],
]:
"""Collects data from the receiver.
Args:
n (int): The number of messages to collect. Defaults to -1.
parse (bool): If True, the messages are parsed. Defaults to False.
Returns:
tuple[int, dict[str, list[ubx.UBXMessage]]]: The number of messages collected and the messages.
"""
get_io_msg = self.controller.flush_messages()
# Make a dictionary to store the messages
cmd_data_map = {str(cmd): [] for cmd in self.commands}
# Store the messages in the dictionary
for msg in get_io_msg:
# Change "-" to "_" for comaptiblaity NAV-POSLLH" -> "NAV_POSLLH"
for cmd in self.commands:
if cmd == msg.identity:
cmd_data_map[msg.identity].append(
cmd.parse_ubx_message(msg) if parse else msg
)
return (int(len(get_io_msg)), cmd_data_map)
def start(self) -> None:
"""Starts the collection of data.
Returns:
None
"""
# Send the collection command
self.send_message_recording_command()
def stop(self) -> None:
"""Stops the collection of data.
Returns:
None
"""
# Stop the collection of data
self.controller.stop()
# Close the serial port
self.serial_port.close()