import os
os.environ['SDL_VIDEO_WINDOW_POS'] = "0,0"
os.environ['PYGAME_HIDE_SUPPORT_PROMPT'] = "hide"
import multiprocessing
import pygame
from enum import Enum, IntEnum
import time
from server import METRIC, STREAM_METRICS, TEMP_METRICS, Server
from gps_tracker import GPS_Tracker
from stream_manager import Stream_Manager
from printer import perror
from gamepad_viewer import Gamepad_Viewer
from network_manager import Network_Manager
class DS4_DIGITAL(Enum):
X = 0
CIRCLE = 1
SQUARE = 2
TRIANGLE = 3
SHARE = 4
PS = 5
OPTIONS = 6
L3 = 7
R3 = 8
L1 = 9
R1 = 10
UP = 11
DOWN = 12
LEFT = 13
RIGHT = 14
TOUCHPAD = 15
class DS4_ANALOG(Enum):
L_X = 0 # -1 Left -> Right 1
L_Y = 1 # -1 Up -> Down 1
R_X = 2 # -1 Left -> Right 1
R_Y = 3 # -1 Up -> Down 1
L2 = 4 # -1 Out -> In 1
R2 = 5 # -1 Out -> In 1
class COMMAND(IntEnum):
FORWARD = 0
BACKWARD = 50
RIGHT = 100
LEFT = 150
SWITCH_CAMERA = 200
TOGGLE_CLACSON = 201
TOGGLE_NEON = 202
COMMAND_INTENSITY_MAX = 50
FPS = 30
STICK_DEADZONE = 0.75
TRIGGER_DEADZONE = 0.01
#TODO: colora anche le max, poi magari aggiungi la host cpu temp
red_limit = {
METRIC.MODEM_TEMP: 60,
METRIC.CAMERA_TEMP: 60,
METRIC.CPU_TEMP: 70,
METRIC.GPU_TEMP: 70,
METRIC.BATTERY_TEMP: 45,
}
orange_limit = {
METRIC.MODEM_TEMP: 45,
METRIC.CAMERA_TEMP: 45,
METRIC.CPU_TEMP: 55,
METRIC.GPU_TEMP: 55,
METRIC.BATTERY_TEMP: 37.5,
}
last_temps = {temp_metric: 0 for temp_metric in TEMP_METRICS}
max_temps = {temp_metric: 0 for temp_metric in TEMP_METRICS}
max_len = max(len(m.name.removesuffix("_TEMP")) for m in TEMP_METRICS)
def draw_temp(surface, temp_metric, position):
text = f"{temp_metric.name.removesuffix("_TEMP"):<{max_len}} {" "*4} {max_temps[temp_metric]:>3} °C"
text_surface = PYGAME_FONT.render(text, True, (0,0,0))
surface.blit(text_surface, position)
text = f"{" " * max_len} {last_temps[temp_metric]:>3}"
if last_temps[temp_metric] >= red_limit[temp_metric]:
color = (180, 0, 0)
elif last_temps[temp_metric] >= orange_limit[temp_metric]:
color = (255, 140, 0)
else:
color = (0, 0, 0)
text_surface = PYGAME_FONT.render(text, True, color)
surface.blit(text_surface, position)
def telemetry_callback(metric, value):
if metric == METRIC.POSITION:
gps_tracker.add_waypoint(value[0], value[1], value[2])
elif metric in STREAM_METRICS:
metrics_queue.put_nowait((metric, value))
elif metric in TEMP_METRICS:
last_temps[metric] = value
if max_temps[metric] < value:
max_temps[metric] = value
else:
perror(f"Unhandled metric {metric.name}: {value}")
def calculate_march_intensity(level):
if level < -1 + TRIGGER_DEADZONE:
return 0
if level > 1 - TRIGGER_DEADZONE:
return COMMAND_INTENSITY_MAX - 1
# -1 <-> 1 remapped to 0 <-> 49
level_percent = (level + 1)/2
return round(level_percent * (COMMAND_INTENSITY_MAX - 1))
def calculate_steer_intensity(level):
if abs(level) < STICK_DEADZONE:
return 0
if abs(level) > 1 - STICK_DEADZONE:
return COMMAND_INTENSITY_MAX - 1
return COMMAND_INTENSITY_MAX - 1
# State tracking for bandwidth optimization
last_sent_states = { direction: 0 for direction in COMMAND }
def should_send(direction, intensity):
if last_sent_states[direction] == intensity:
return False
last_sent_states[direction] = intensity
return True
def ui_loop():
screen = pygame.display.set_mode((382, 320), pygame.NOFRAME)
pygame.display.set_caption("RC++")
clock = pygame.time.Clock()
has_focus = False
joystick = None
while True:
for event in pygame.event.get():
if event.type == pygame.WINDOWFOCUSGAINED:
has_focus = True
elif event.type == pygame.WINDOWFOCUSLOST:
has_focus = False
if event.type == pygame.JOYBUTTONDOWN:
button = DS4_DIGITAL(event.button)
if button == DS4_DIGITAL.TOUCHPAD:
return
elif button == DS4_DIGITAL.TRIANGLE:
server.send_command(COMMAND.SWITCH_CAMERA.to_bytes(1))
video_stream.switch()
elif button == DS4_DIGITAL.SQUARE:
server.send_command(COMMAND.TOGGLE_NEON.to_bytes(1))
elif button == DS4_DIGITAL.X:
server.send_command(COMMAND.TOGGLE_CLACSON.to_bytes(1))
elif event.type == pygame.JOYBUTTONUP:
if button == DS4_DIGITAL.X:
server.send_command(COMMAND.TOGGLE_CLACSON.to_bytes(1))
elif event.type == pygame.JOYAXISMOTION:
level = event.value
analog_control = DS4_ANALOG(event.axis)
direction = None
intensity = None
if analog_control == DS4_ANALOG.R2:
direction = COMMAND.FORWARD
intensity = calculate_march_intensity(level)
elif analog_control == DS4_ANALOG.L2:
direction = COMMAND.BACKWARD
intensity = calculate_march_intensity(level)
elif analog_control == DS4_ANALOG.L_X:
direction = COMMAND.LEFT if level < 0 else COMMAND.RIGHT
intensity = calculate_steer_intensity(level)
if direction is not None and intensity is not None and should_send(direction, intensity):
command = direction + intensity
server.send_command(command.to_bytes(1))
#print(f"Sent {direction.name} {intensity}/{COMMAND_INTENSITY_MAX - 1} - {command}")
elif event.type == pygame.JOYDEVICEADDED:
joystick = pygame.joystick.Joystick(event.device_index)
print(f"{joystick.get_name()} connencted")
elif event.type == pygame.JOYDEVICEREMOVED:
print(f"{joystick.get_name()} disconnected")
joystick = None
if not has_focus:
colors = [(255, 255, 255), (0,0,0)]
index = int(time.time()) % 2
screen.fill(colors[index])
else:
screen.fill((255, 255, 255))
text_surface = PYGAME_FONT.render(f"{max_len * " "} LAST MAX", True, (0,0,0))
screen.blit(text_surface, (50, 30))
for index, metric in enumerate(TEMP_METRICS):
draw_temp(screen, metric, (50, index*30 + 60))
pygame.display.flip() # Update the screen
clock.tick(FPS)
if __name__ == "__main__":
pygame.init()
normal = "mx437ibmdosiso8"
bold = "mx437ibmvga8x16"
PYGAME_FONT = pygame.font.SysFont(bold, 28)
gamepad_viewer = Gamepad_Viewer()
gamepad_viewer.start_mirroring()
gps_tracker = GPS_Tracker()
gps_tracker.open_live_map()
network_manager = Network_Manager()
network_manager.start_monitoring()
server = Server(telemetry_callback)
server.start()
metrics_queue = multiprocessing.Queue()
video_stream = Stream_Manager(metrics_queue)
video_stream.play()
try:
ui_loop() # Blocking
except KeyboardInterrupt:
pass
gamepad_viewer.stop_mirroring()
gps_tracker.close_live_map()
network_manager.stop_monitoring()
video_stream.close()
pygame.quit()
print("Done")