remote-car / Python / main.py
main.py
Raw

import os
os.environ['SDL_VIDEO_WINDOW_POS'] = "0,0"
os.environ['PYGAME_HIDE_SUPPORT_PROMPT'] = "hide"

import multiprocessing
import pygame
from enum import Enum, IntEnum
import time

from server import METRIC, STREAM_METRICS, TEMP_METRICS, Server
from gps_tracker import GPS_Tracker
from stream_manager import Stream_Manager
from printer import perror
from gamepad_viewer import Gamepad_Viewer
from network_manager import Network_Manager

class DS4_DIGITAL(Enum):
    X = 0
    CIRCLE = 1
    SQUARE = 2
    TRIANGLE = 3
    SHARE = 4
    PS = 5
    OPTIONS = 6
    L3 = 7
    R3 = 8
    L1 = 9
    R1 = 10
    UP = 11
    DOWN = 12
    LEFT = 13
    RIGHT = 14
    TOUCHPAD = 15

class DS4_ANALOG(Enum):
    L_X = 0     # -1 Left -> Right 1
    L_Y = 1     # -1 Up   -> Down 1
    R_X = 2     # -1 Left -> Right 1
    R_Y = 3     # -1 Up   -> Down 1
    L2 = 4      # -1 Out  -> In 1
    R2 = 5      # -1 Out  -> In 1

class COMMAND(IntEnum):
    FORWARD = 0
    BACKWARD = 50
    RIGHT = 100
    LEFT = 150
    SWITCH_CAMERA = 200
    TOGGLE_CLACSON = 201
    TOGGLE_NEON = 202
COMMAND_INTENSITY_MAX = 50

FPS = 30
STICK_DEADZONE = 0.75
TRIGGER_DEADZONE = 0.01

#TODO: colora anche le max, poi magari aggiungi la host cpu temp
red_limit = {
    METRIC.MODEM_TEMP: 60,
    METRIC.CAMERA_TEMP: 60,
    METRIC.CPU_TEMP: 70,
    METRIC.GPU_TEMP: 70,
    METRIC.BATTERY_TEMP: 45,
}
orange_limit = {
    METRIC.MODEM_TEMP: 45,
    METRIC.CAMERA_TEMP: 45,
    METRIC.CPU_TEMP: 55,
    METRIC.GPU_TEMP: 55,
    METRIC.BATTERY_TEMP: 37.5,
}
last_temps = {temp_metric: 0 for temp_metric in TEMP_METRICS}
max_temps = {temp_metric: 0 for temp_metric in TEMP_METRICS}
max_len = max(len(m.name.removesuffix("_TEMP")) for m in TEMP_METRICS)
def draw_temp(surface, temp_metric, position):
    text = f"{temp_metric.name.removesuffix("_TEMP"):<{max_len}} {" "*4}  {max_temps[temp_metric]:>3} °C"
    text_surface = PYGAME_FONT.render(text, True, (0,0,0))
    surface.blit(text_surface, position)

    text = f"{" " * max_len} {last_temps[temp_metric]:>3}"
    if last_temps[temp_metric] >= red_limit[temp_metric]:
        color = (180, 0, 0)
    elif last_temps[temp_metric] >= orange_limit[temp_metric]:
        color = (255, 140, 0)
    else:
        color = (0, 0, 0)
    text_surface = PYGAME_FONT.render(text, True, color)
    surface.blit(text_surface, position)

def telemetry_callback(metric, value):
    if metric == METRIC.POSITION:
        gps_tracker.add_waypoint(value[0], value[1], value[2])
    elif metric in STREAM_METRICS:
        metrics_queue.put_nowait((metric, value))
    elif metric in TEMP_METRICS:
        last_temps[metric] = value
        if max_temps[metric] < value:
            max_temps[metric] = value
    else:
        perror(f"Unhandled metric {metric.name}: {value}")

def calculate_march_intensity(level):
    if level < -1 + TRIGGER_DEADZONE:
        return 0
    if level > 1 - TRIGGER_DEADZONE:
        return COMMAND_INTENSITY_MAX - 1
    
    # -1 <-> 1 remapped to 0 <-> 49
    level_percent = (level + 1)/2 
    return round(level_percent * (COMMAND_INTENSITY_MAX - 1))

def calculate_steer_intensity(level):
    if abs(level) < STICK_DEADZONE:
        return 0
    if abs(level) > 1 - STICK_DEADZONE:
        return COMMAND_INTENSITY_MAX - 1
    
    return COMMAND_INTENSITY_MAX - 1

# State tracking for bandwidth optimization
last_sent_states = { direction: 0 for direction in COMMAND }
def should_send(direction, intensity):
    if last_sent_states[direction] == intensity:
        return False
    last_sent_states[direction] = intensity
    return True

def ui_loop():
    screen = pygame.display.set_mode((382, 320), pygame.NOFRAME)
    pygame.display.set_caption("RC++")
    clock = pygame.time.Clock()

    has_focus = False
    joystick = None
    while True:

        for event in pygame.event.get():
            
            if event.type == pygame.WINDOWFOCUSGAINED:
                has_focus = True
            elif event.type == pygame.WINDOWFOCUSLOST:
                has_focus = False

            if event.type == pygame.JOYBUTTONDOWN:
                button = DS4_DIGITAL(event.button)
                if button == DS4_DIGITAL.TOUCHPAD:
                    return
                elif button == DS4_DIGITAL.TRIANGLE:
                    server.send_command(COMMAND.SWITCH_CAMERA.to_bytes(1))
                    video_stream.switch()
                elif button == DS4_DIGITAL.SQUARE:
                    server.send_command(COMMAND.TOGGLE_NEON.to_bytes(1))
                elif button == DS4_DIGITAL.X:
                    server.send_command(COMMAND.TOGGLE_CLACSON.to_bytes(1))

            elif event.type == pygame.JOYBUTTONUP:
                if button == DS4_DIGITAL.X:
                    server.send_command(COMMAND.TOGGLE_CLACSON.to_bytes(1))

            elif event.type == pygame.JOYAXISMOTION:
                level = event.value
                analog_control = DS4_ANALOG(event.axis)

                direction = None
                intensity = None
                if analog_control == DS4_ANALOG.R2:
                    direction = COMMAND.FORWARD
                    intensity = calculate_march_intensity(level)
                elif analog_control == DS4_ANALOG.L2:
                    direction = COMMAND.BACKWARD 
                    intensity = calculate_march_intensity(level)
                elif analog_control == DS4_ANALOG.L_X:
                    direction = COMMAND.LEFT if level < 0 else COMMAND.RIGHT
                    intensity = calculate_steer_intensity(level)

                if direction is not None and intensity is not None and should_send(direction, intensity):
                    command = direction + intensity
                    server.send_command(command.to_bytes(1))
                    #print(f"Sent {direction.name} {intensity}/{COMMAND_INTENSITY_MAX - 1} - {command}")

            elif event.type == pygame.JOYDEVICEADDED:
                joystick = pygame.joystick.Joystick(event.device_index)
                print(f"{joystick.get_name()} connencted")

            elif event.type == pygame.JOYDEVICEREMOVED:
                print(f"{joystick.get_name()} disconnected")
                joystick = None
        
        if not has_focus:
            colors = [(255, 255, 255), (0,0,0)]
            index = int(time.time()) % 2
            screen.fill(colors[index])
        else:
            screen.fill((255, 255, 255))

        text_surface = PYGAME_FONT.render(f"{max_len * " "} LAST  MAX", True, (0,0,0))
        screen.blit(text_surface, (50, 30))

        for index, metric in enumerate(TEMP_METRICS):
            draw_temp(screen, metric, (50, index*30 + 60))

        pygame.display.flip() # Update the screen
        clock.tick(FPS)
        
if __name__ == "__main__":
    pygame.init()

    normal = "mx437ibmdosiso8"
    bold = "mx437ibmvga8x16"
    PYGAME_FONT = pygame.font.SysFont(bold, 28)

    gamepad_viewer = Gamepad_Viewer()
    gamepad_viewer.start_mirroring()

    gps_tracker = GPS_Tracker()
    gps_tracker.open_live_map()

    network_manager = Network_Manager()
    network_manager.start_monitoring()

    server = Server(telemetry_callback)
    server.start()

    metrics_queue = multiprocessing.Queue()
    video_stream = Stream_Manager(metrics_queue)
    video_stream.play()

    try:
        ui_loop() # Blocking
    except KeyboardInterrupt:
        pass

    gamepad_viewer.stop_mirroring()
    gps_tracker.close_live_map()
    network_manager.stop_monitoring()
    video_stream.close()
    pygame.quit()
    
    print("Done")