"""Controller module for the Ublox receiver. This module provides a Controller class for interfacing with Ublox receivers through either Serial or socket communication streams. Classes: Controller: Main class for controlling Ublox receivers. Attributes: __all__ (List[str]): List of symbols exported by this module. DEFAULT_COMMAND_WAIT_TIME (float): Default time to wait for an acknowledgment after sending a command. DEFAULT_NAV_WIAT_TIME (float): Default time to wait for navigation data. Example: ```python from serial import Serial from navigator.utility.ublox import Controller # Open a serial port serial_port = Serial("/dev/ttyACM0", 960, timeout=3) # Create a controller object controller = Controller(serial_port) ``` """ import typing as tp from logging import Logger from queue import Queue from socket import socket from threading import Event, Lock, Thread from time import sleep import pyubx2 as ubx from serial import Serial __all__ = ["Controller"] DEFAULT_COMMAND_WAIT_TIME = 0.001 DEFAULT_NAV_WIAT_TIME = 1.0 class Controller: """Controller class for the Ublox receiver. This class provides methods for controlling Ublox receivers through Serial or socket communication streams. Attributes: stream (Union[Serial, socket]): The communication stream with the Ublox receiver. logger (Optional[Logger]): The logger object to log I/O data. no_check (bool): If True, disables all default message rates. io_queue (Queue): Queue for storing received messages. ack_queue (Queue): Queue for storing acknowledgment messages. stop_event (Event): Event flag for stopping the I/O thread. lock (Lock): Thread lock for ensuring thread safety. io_thread (Thread): Thread for handling I/O operations. Example: ```python from serial import Serial from navigator.utility.ublox import Controller # Open a serial port serial_port = Serial("/dev/ttyACM0", 960, timeout=3) # Create a controller object controller = Controller(serial_port) ``` """ def __init__( self, stream: tp.Union[Serial, socket], logger: tp.Optional[Logger] = None, no_check: bool = False, ) -> None: """Initialize the Controller. Args: stream: The communication stream with the Ublox receiver. logger: The logger object to log I/O data. Defaults to None. no_check: If True, disables all default message rates. Defaults to False. Raises: TypeError: If the stream is not an instance of Serial or socket. """ if not isinstance(stream, (Serial, socket)): raise TypeError("The stream should be an instance of Serial or socket.") self.stream: tp.Union[Serial, socket] = stream self.has_logger: bool = logger is not None self.logger: tp.Optional[Logger] = logger self.no_check: bool = no_check self.reader = ubx.UBXReader( datastream=self.stream, msgmode=ubx.GET, protfilter=ubx.UBX_PROTOCOL ) # Queues for storing messages self.io_queue: Queue[ubx.UBXMessage] = Queue() self.ack_queue: Queue[ubx.UBXMessage] = Queue() # Event flag for stopping the I/O thread self.stop_event = Event() self.lock: Lock = Lock() # I/O thread for handling I/O operations self.io_thread: Thread = Thread(target=self.start_io_logic) self.io_thread.start() if not self.no_check: self.clear_output_rate() def clear_ack_queue(self) -> None: """Clear the acknowledgment queue.""" # Acquire the lock to ensure thread safety with self.lock: while not self.ack_queue.empty(): self.ack_queue.get() return def clear_io_queue(self) -> None: """Clear the I/O queue.""" # Acquire the lock to ensure thread safety with self.lock: while not self.io_queue.empty(): self.io_queue.get() return def start_io_logic(self) -> None: """Starts the I/O thread which reads the incoming data from the stream.""" while not self.stop_event.is_set(): if self.stream.in_waiting: try: with self.lock: _, parsed = self.reader.read() if parsed is not None: if parsed.identity.startswith("ACK"): self.ack_queue.put(parsed) else: self.io_queue.put(parsed) if self.has_logger: self.logger.info(f"Received: {parsed.identity}") except Exception as e: if self.has_logger: self.logger.error(e) continue return def send_control_command(self, command: ubx.UBXMessage) -> None: """Send a command to the Ublox receiver. Args: command: The command to be sent. """ # Acquire the lock to ensure thread safety with self.lock: self.stream.write(command.serialize()) # Log the command if a logger is provided if self.has_logger: self.logger.info(f"Sent: {command}") return def send_config_command( self, command: ubx.UBXMessage, wait_for_ack: bool = True ) -> tp.Optional[ubx.UBXMessage]: """Send a command to the Ublox receiver. Args: command: The command to be sent. wait_for_ack: If True, wait for an acknowledgment from the receiver. Defaults to True. Returns: The acknowledgment from the receiver if wait_for_ack is True, else None. """ # Clear the acknowledgment queue first to avoid any stale data self.clear_ack_queue() # Write the command to the stream self.send_control_command(command) if wait_for_ack: # Wait for the acknowledgment while True: sleep(DEFAULT_COMMAND_WAIT_TIME) with self.lock: if not self.ack_queue.empty(): return self.ack_queue.get() return None def send_poll_command(self, command: ubx.UBXMessage) -> None: """Send a command to the Ublox receiver. Args: command: The command to be sent. """ # Send the command self.send_control_command(command) return def clear_output_rate(self) -> None: """Clear the output rate of all message types for MSGOUT.""" cfg_msg = [msg for msg in ubx.UBX_CONFIG_DATABASE if "CFG_MSGOUT" in msg] for msg in cfg_msg: self.send_config_command( command=ubx.UBXMessage.config_set( layers=ubx.SET_LAYER_RAM, transaction=ubx.TXN_NONE, cfgData=[(msg, 0)], ), wait_for_ack=True, ) # Clear the queue self.clear_ack_queue() self.clear_io_queue() return def stop(self) -> None: """Stop the controller.""" # Stop the I/O thread self.stop_event.set() self.io_thread.join() # user is responsible for closing the stream return def flush_messages(self) -> list[ubx.UBXMessage]: """Flush the available messages from the queue. Args: n: The number of messages to flush. Returns: The flushed messages. """ # Acquire the lock to ensure thread safety with self.lock: messages = [] while not self.io_queue.empty(): messages.append(self.io_queue.get()) return messages def __repr__(self) -> str: """Return the string representation of the object.""" return f"Controller(stream={self.stream}, logger={self.logger}, no_check={self.no_check})"