Navigator / src / navigator / utils / ublox / controller.py
controller.py
Raw
"""Controller module for the Ublox receiver.

This module provides a Controller class for interfacing with Ublox receivers through either Serial or socket communication streams.

Classes:
    Controller: Main class for controlling Ublox receivers.

Attributes:
    __all__ (List[str]): List of symbols exported by this module.
    DEFAULT_COMMAND_WAIT_TIME (float): Default time to wait for an acknowledgment after sending a command.
    DEFAULT_NAV_WIAT_TIME (float): Default time to wait for navigation data.

Example:
    ```python
    from serial import Serial
    from navigator.utility.ublox import Controller

    # Open a serial port
    serial_port = Serial("/dev/ttyACM0", 960, timeout=3)

    # Create a controller object
    controller = Controller(serial_port)
    ```
"""

import typing as tp
from logging import Logger
from queue import Queue
from socket import socket
from threading import Event, Lock, Thread
from time import sleep

import pyubx2 as ubx
from serial import Serial

__all__ = ["Controller"]

DEFAULT_COMMAND_WAIT_TIME = 0.001
DEFAULT_NAV_WIAT_TIME = 1.0


class Controller:
    """Controller class for the Ublox receiver.

    This class provides methods for controlling Ublox receivers through Serial or socket communication streams.

    Attributes:
        stream (Union[Serial, socket]): The communication stream with the Ublox receiver.
        logger (Optional[Logger]): The logger object to log I/O data.
        no_check (bool): If True, disables all default message rates.
        io_queue (Queue): Queue for storing received messages.
        ack_queue (Queue): Queue for storing acknowledgment messages.
        stop_event (Event): Event flag for stopping the I/O thread.
        lock (Lock): Thread lock for ensuring thread safety.
        io_thread (Thread): Thread for handling I/O operations.

    Example:
        ```python
        from serial import Serial
        from navigator.utility.ublox import Controller

        # Open a serial port
        serial_port = Serial("/dev/ttyACM0", 960, timeout=3)

        # Create a controller object
        controller = Controller(serial_port)
        ```
    """

    def __init__(
        self,
        stream: tp.Union[Serial, socket],
        logger: tp.Optional[Logger] = None,
        no_check: bool = False,
    ) -> None:
        """Initialize the Controller.

        Args:
            stream: The communication stream with the Ublox receiver.
            logger: The logger object to log I/O data. Defaults to None.
            no_check: If True, disables all default message rates. Defaults to False.

        Raises:
            TypeError: If the stream is not an instance of Serial or socket.
        """
        if not isinstance(stream, (Serial, socket)):
            raise TypeError("The stream should be an instance of Serial or socket.")

        self.stream: tp.Union[Serial, socket] = stream
        self.has_logger: bool = logger is not None
        self.logger: tp.Optional[Logger] = logger
        self.no_check: bool = no_check
        self.reader = ubx.UBXReader(
            datastream=self.stream, msgmode=ubx.GET, protfilter=ubx.UBX_PROTOCOL
        )

        # Queues for storing messages
        self.io_queue: Queue[ubx.UBXMessage] = Queue()
        self.ack_queue: Queue[ubx.UBXMessage] = Queue()

        # Event flag for stopping the I/O thread
        self.stop_event = Event()
        self.lock: Lock = Lock()

        # I/O thread for handling I/O operations
        self.io_thread: Thread = Thread(target=self.start_io_logic)
        self.io_thread.start()

        if not self.no_check:
            self.clear_output_rate()

    def clear_ack_queue(self) -> None:
        """Clear the acknowledgment queue."""
        # Acquire the lock to ensure thread safety
        with self.lock:
            while not self.ack_queue.empty():
                self.ack_queue.get()
        return

    def clear_io_queue(self) -> None:
        """Clear the I/O queue."""
        # Acquire the lock to ensure thread safety
        with self.lock:
            while not self.io_queue.empty():
                self.io_queue.get()
        return

    def start_io_logic(self) -> None:
        """Starts the I/O thread which reads the incoming data from the stream."""
        while not self.stop_event.is_set():
            if self.stream.in_waiting:
                try:
                    with self.lock:
                        _, parsed = self.reader.read()
                        if parsed is not None:
                            if parsed.identity.startswith("ACK"):
                                self.ack_queue.put(parsed)
                            else:
                                self.io_queue.put(parsed)
                            if self.has_logger:
                                self.logger.info(f"Received: {parsed.identity}")

                except Exception as e:
                    if self.has_logger:

                        self.logger.error(e)
                    continue

        return

    def send_control_command(self, command: ubx.UBXMessage) -> None:
        """Send a command to the Ublox receiver.

        Args:
            command: The command to be sent.
        """
        # Acquire the lock to ensure thread safety
        with self.lock:
            self.stream.write(command.serialize())

        # Log the command if a logger is provided
        if self.has_logger:
            self.logger.info(f"Sent: {command}")

        return

    def send_config_command(
        self, command: ubx.UBXMessage, wait_for_ack: bool = True
    ) -> tp.Optional[ubx.UBXMessage]:
        """Send a command to the Ublox receiver.

        Args:
            command: The command to be sent.
            wait_for_ack: If True, wait for an acknowledgment from the receiver. Defaults to True.

        Returns:
            The acknowledgment from the receiver if wait_for_ack is True, else None.
        """
        # Clear the acknowledgment queue first to avoid any stale data
        self.clear_ack_queue()
        # Write the command to the stream
        self.send_control_command(command)

        if wait_for_ack:
            # Wait for the acknowledgment
            while True:
                sleep(DEFAULT_COMMAND_WAIT_TIME)
                with self.lock:
                    if not self.ack_queue.empty():
                        return self.ack_queue.get()
        return None

    def send_poll_command(self, command: ubx.UBXMessage) -> None:
        """Send a command to the Ublox receiver.

        Args:
            command: The command to be sent.
        """
        # Send the command
        self.send_control_command(command)
        return

    def clear_output_rate(self) -> None:
        """Clear the output rate of all message types for MSGOUT."""
        cfg_msg = [msg for msg in ubx.UBX_CONFIG_DATABASE if "CFG_MSGOUT" in msg]

        for msg in cfg_msg:
            self.send_config_command(
                command=ubx.UBXMessage.config_set(
                    layers=ubx.SET_LAYER_RAM,
                    transaction=ubx.TXN_NONE,
                    cfgData=[(msg, 0)],
                ),
                wait_for_ack=True,
            )
        # Clear the queue
        self.clear_ack_queue()
        self.clear_io_queue()
        return

    def stop(self) -> None:
        """Stop the controller."""
        # Stop the I/O thread
        self.stop_event.set()
        self.io_thread.join()
        # user is responsible for closing the stream
        return

    def flush_messages(self) -> list[ubx.UBXMessage]:
        """Flush the available messages from the queue.

        Args:
            n: The number of messages to flush.

        Returns:
            The flushed messages.
        """
        # Acquire the lock to ensure thread safety
        with self.lock:
            messages = []
            while not self.io_queue.empty():
                messages.append(self.io_queue.get())

        return messages

    def __repr__(self) -> str:
        """Return the string representation of the object."""
        return f"Controller(stream={self.stream}, logger={self.logger}, no_check={self.no_check})"