import shutil from printer import perror if shutil.which("ffmpeg") is None: perror("ffmpeg is not installed or not found in PATH") #from vidstab import VidStab import multiprocessing import queue import av import cv2 import numpy as np import time import threading from datetime import datetime from enum import Enum from PyQt5 import QtWidgets, QtGui, QtCore import sys from server import METRIC, STREAM_METRICS from printer import perror # Stream parameters ORIGINAL_STREAM_WIDTH = 320 ORIGINAL_STREAM_HEIGHT = 240 FRAMERATE = 30 UDP_PORT = 8001 FRAMETIME_MS = int(1/FRAMERATE * 1000) # Device parameters SCREEN_WIDTH = 1440 SCREEN_HEIGHT = 900 SCALED_WIDTH = int(SCREEN_HEIGHT / ORIGINAL_STREAM_WIDTH * ORIGINAL_STREAM_HEIGHT) class LENS_FACING(Enum): FRONT = 0 BACK = 1 class FONT_SIZE(Enum): SMALL = 0.7 NORMAL = 1.2 BIG = 1.7 HUGE = 2.2 class ICON(Enum): GAS = 0 BATTERY_EMPTY = 1 BATTERY_LOW = 2 BATTERY_HIGH = 3 BATTERY_FULL = 4 SIGNAL_EMPTY = 5 SIGNAL_LOW = 6 SIGNAL_HIGH = 7 SIGNAL_FULL = 8 LIGHTNING = 9 class Stream_Manager: def __init__(self, metrics_queue): self.metrics_queue = metrics_queue self.metrics = { metric: 0 for metric in STREAM_METRICS } self.no_signal_threshold_s = 1 self.last_frame_time = 0 self.process = None self.font = cv2.FONT_HERSHEY_SIMPLEX self.text_thickness = 2 self.border_thickness = 7 self.num_cells_in_series = 2 # cv2.IMREAD_UNCHANGED ensures alpha channel is loaded properly self.icons = { icon: cv2.imread(f"icons/{icon.name.lower()}.png", cv2.IMREAD_UNCHANGED) for icon in ICON} # Shared state between processes self.lens_facing_shared = multiprocessing.Value('i', LENS_FACING.FRONT.value) # GUI placeholders (set in _start) self.app = None self.window = None self.label = None self.frame_queue = None #self.stabilizer = VidStab() def _get_text_size(self, text, scale=FONT_SIZE.NORMAL): return cv2.getTextSize(text, self.font, scale.value, self.text_thickness)[0] def _draw_text(self, frame, text, position, size=FONT_SIZE.NORMAL, color=(255,255,255)): # Draw border (black) border_color = (0, 0, 0) cv2.putText(frame, text, position, self.font, size.value, border_color, self.text_thickness + self.border_thickness, cv2.LINE_AA) # Draw text (white) cv2.putText(frame, text, position, self.font, size.value, color, self.text_thickness, cv2.LINE_AA) def _draw_icon(self, frame, icon, position): icon_img = self.icons[icon] h, w = icon_img.shape[:2] x, y = position # Center the incon in the y position y -= h//2 # Blend the icon with the frame using alpha channel (png's transparency is preserved) alpha = icon_img[:, :, 3] / 255.0 for c in range(3): frame[y:y+h, x:x+w, c] = alpha * icon_img[:, :, c] + (1 - alpha) * frame[y:y+h, x:x+w, c] def _get_lens_facing(self): return LENS_FACING(self.lens_facing_shared.value) def _rotate(self, img): if self._get_lens_facing() == LENS_FACING.FRONT: return cv2.rotate(img, cv2.ROTATE_90_CLOCKWISE) else: return cv2.rotate(img, cv2.ROTATE_90_COUNTERCLOCKWISE) def _create_no_signal_frame(self): frame = np.zeros((SCREEN_HEIGHT*2, SCALED_WIDTH*2, 3), dtype=np.uint8) h, w = frame.shape[:2] text = "- NO SIGNAL -" (text_w, text_h)= self._get_text_size(text, FONT_SIZE.HUGE) text_x = (w - text_w) // 2 text_y = (h + text_h) // 2 self._draw_text(frame, text, (text_x, text_y), FONT_SIZE.HUGE) return frame def _voltage_to_percentage(self, voltage): single_cell_voltage = voltage / self.num_cells_in_series single_cell_full = 4.2 single_cell_empty = 3.2 if single_cell_voltage >= single_cell_full: return 100 if single_cell_voltage <= single_cell_empty: return 0 return round((single_cell_voltage - single_cell_empty) * 100) def _add_overlays(self, frame): h, w = frame.shape[:2] padding = 30 space = 10 icon_size = 64 # Camera source lens_facing = self._get_lens_facing() text = "FRONT CAMERA" if lens_facing == LENS_FACING.FRONT else "BACK CAMERA" (text_w, text_h)= self._get_text_size(text, FONT_SIZE.BIG) text_x = (w - text_w) // 2 text_y = padding + text_h self._draw_text(frame, text, (text_x, text_y), FONT_SIZE.BIG) # Timestamp text = datetime.now().strftime("%X") (text_w, text_h)= self._get_text_size(text, FONT_SIZE.NORMAL) text_x = w - text_w - padding text_y = h - padding self._draw_text(frame, text, (text_x, text_y), FONT_SIZE.NORMAL) # Date text = datetime.now().strftime("%-d %b %Y") (text_w, text_h)= self._get_text_size(text, FONT_SIZE.NORMAL) text_x = padding text_y = h - padding self._draw_text(frame, text, (text_x, text_y), FONT_SIZE.NORMAL) # Battery battey_percent = self.metrics[METRIC.PHONE_BATTERY_PERCENT] if battey_percent >= 80: icon = ICON.BATTERY_FULL color = (255, 255, 255) elif battey_percent >= 50: icon = ICON.BATTERY_HIGH color = (255, 255, 255) elif battey_percent >= 25: icon = ICON.BATTERY_LOW color = (0, 140, 255) else: icon = ICON.BATTERY_EMPTY color = (0, 0, 180) text = f"{battey_percent}%" (text_w, text_h)= self._get_text_size(text, FONT_SIZE.BIG) text_x = w - text_w - padding text_y = padding + text_h self._draw_text(frame, text, (text_x, text_y), FONT_SIZE.BIG, color) self._draw_icon(frame, icon, (text_x - icon_size, text_y - text_h//2)) # Signal signal_level = self.metrics[METRIC.SIGNAL_LEVEL] if signal_level >= 4: icon = ICON.SIGNAL_FULL elif signal_level == 3: icon = ICON.SIGNAL_HIGH elif signal_level == 2: icon = ICON.SIGNAL_LOW else: icon = ICON.SIGNAL_EMPTY text = "LTE" (text_w, text_h)= self._get_text_size(text, FONT_SIZE.BIG) text_x = w - text_w - padding text_y = padding + text_h + 70 self._draw_text(frame, text, (text_x, text_y), FONT_SIZE.BIG) self._draw_icon(frame, icon, (text_x - icon_size - space, text_y - text_h//2)) # Car voltage voltage = self.metrics[METRIC.CAR_BATTERY_VOLTAGE]/10.0 percentage = self._voltage_to_percentage(voltage) if percentage >= 50: color = (255, 255, 255) elif percentage >= 25: color = (0, 140, 255) else: color = (0, 0, 180) # Blinking battery low text if (int(time.time()) % 2 == 0): text = "- CAR BATTERY LOW -" (text_w, text_h)= self._get_text_size(text, FONT_SIZE.HUGE) text_x = (w - text_w) // 2 text_y = h // 2 - space self._draw_text(frame, text, (text_x, text_y), FONT_SIZE.HUGE, color) text = f"{voltage:.1f}V" (text_w, text_h)= self._get_text_size(text, FONT_SIZE.BIG) text_x = padding + icon_size + space text_y = padding + text_h self._draw_text(frame, text, (text_x, text_y), FONT_SIZE.BIG) self._draw_icon(frame, ICON.GAS, (padding, text_y - text_h//2)) text = f"~{percentage}%" text_x = padding + icon_size + space + text_w + space self._draw_text(frame, text, (text_x, text_y), FONT_SIZE.BIG, color) # Electronics voltage voltage = self.metrics[METRIC.ELECTRONICS_BATTERY_VOLTAGE]/10.0 percentage = self._voltage_to_percentage(voltage) if percentage >= 50: color = (255, 255, 255) elif percentage >= 25: color = (0, 140, 255) else: color = (0, 0, 180) # Blinking battery low text if (int(time.time()) % 2 == 0): text = "- ELECTRONICS BATTERY LOW -" (text_w, text_h)= self._get_text_size(text, FONT_SIZE.HUGE) text_x = (w - text_w) // 2 text_y = h // 2 + text_h + space self._draw_text(frame, text, (text_x, text_y), FONT_SIZE.HUGE, color) text = f"{voltage:.1f}V" (text_w, text_h)= self._get_text_size(text, FONT_SIZE.BIG) text_x = padding + icon_size + space text_y = padding + icon_size*2 - space*2 self._draw_text(frame, text, (text_x, text_y), FONT_SIZE.BIG) self._draw_icon(frame, ICON.LIGHTNING, (padding, text_y - text_h//2)) text = f"~{percentage}%" text_x = padding + icon_size + space + text_w + space self._draw_text(frame, text, (text_x, text_y), FONT_SIZE.BIG, color) # Heading text = f"- {self.metrics[METRIC.HEADING]}' -" (text_w, text_h)= self._get_text_size(text, FONT_SIZE.NORMAL) text_x = (w - text_w) // 2 text_y = h - padding - 55 self._draw_text(frame, text, (text_x, text_y), FONT_SIZE.NORMAL) text = self._heading_to_cardinal(self.metrics[METRIC.HEADING]) (text_w, text_h)= self._get_text_size(text, FONT_SIZE.BIG) text_x = (w - text_w) // 2 text_y = h - padding self._draw_text(frame, text, (text_x, text_y), FONT_SIZE.BIG) def _heading_to_cardinal(self, degrees): directions = [ "NORTH", "NORTH EAST", "EAST", "SOUTH EAST", "SOUTH", "SOUTH WEST", "WEST", "NORTH WEST" ] # Each sector is 45°, offset by 22.5° for correct rounding index = int((degrees + 22.5) // 45) % 8 return directions[index] def _telemetry_updater_thread(self): while True: (metric, value) = self.metrics_queue.get() self.metrics[metric] = value def _stream_reader_thread(self): try: input_url = f"udp://0.0.0.0:{UDP_PORT}" options = { "fflags": "nobuffer", "flags": "low_delay", "framedrop": "1" } stream = av.open(input_url, format="h264", mode="r", options=options) for frame in stream.decode(video=0): frame = frame.to_ndarray(format="bgr24") frame = self._rotate(frame) frame = cv2.resize(frame, (SCALED_WIDTH*2, SCREEN_HEIGHT*2)) try: self.frame_queue.put_nowait(frame) self.last_frame_time = time.time() except queue.Full: pass # Queue full, skip frame except Exception as e: perror(f"Stream reader error: {e}") finally: if stream: stream.close() def _update_display(self): frame = None try: frame = self.frame_queue.get_nowait() except queue.Empty: pass else: self._add_overlays(frame) timed_out = time.time() - self.last_frame_time > self.no_signal_threshold_s if frame is None and timed_out: frame = self._create_no_signal_frame() if frame is not None: # convert BGR -> RGB for Qt rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) h, w, ch = rgb.shape bytes_per_line = ch * w qimg = QtGui.QImage(rgb.data, w, h, bytes_per_line, QtGui.QImage.Format_RGB888) pix = QtGui.QPixmap.fromImage(qimg) # scale to label/window if needed preserving aspect pix = pix.scaled(self.label.width(), self.label.height(), QtCore.Qt.KeepAspectRatio) self.label.setPixmap(pix) def _start(self): self.app = QtWidgets.QApplication(sys.argv) self.window = QtWidgets.QWidget() self.window.setWindowTitle("VIDEO STREAM") self.window.setWindowFlags( QtCore.Qt.FramelessWindowHint | QtCore.Qt.WindowStaysOnTopHint) # Create QLabel to hold frames self.label = QtWidgets.QLabel() # make sure label expands to full window self.label.setSizePolicy(QtWidgets.QSizePolicy.Expanding, QtWidgets.QSizePolicy.Expanding) self.label.setAlignment(QtCore.Qt.AlignCenter) layout = QtWidgets.QVBoxLayout() layout.setContentsMargins(0, 0, 0, 0) layout.addWidget(self.label) self.window.setLayout(layout) # Position and size the window (match behavior of your cv2 move/resize) window_w = SCALED_WIDTH window_h = SCREEN_HEIGHT left = (SCREEN_WIDTH - SCALED_WIDTH) // 2 top = 0 # set geometry (x, y, width, height) self.window.setGeometry(left, top, window_w, window_h) # Initially show NO SIGNAL no_signal_frame = self._create_no_signal_frame() rgb = cv2.cvtColor(no_signal_frame, cv2.COLOR_BGR2RGB) h, w, ch = rgb.shape qimg = QtGui.QImage(rgb.data, w, h, ch * w, QtGui.QImage.Format_RGB888) pix = QtGui.QPixmap.fromImage(qimg) pix = pix.scaled(window_w, window_h, QtCore.Qt.KeepAspectRatio) self.label.setPixmap(pix) self.window.show() self.frame_queue = queue.Queue(maxsize=1) threading.Thread(target=self._stream_reader_thread, daemon=True).start() threading.Thread(target=self._telemetry_updater_thread, daemon=True).start() timer = QtCore.QTimer() timer.timeout.connect(self._update_display) timer.start(FRAMETIME_MS) # Run the Qt event loop (blocks until window closed) try: self.app.exec_() except Exception as e: perror(f"Qt exec error: {e}") finally: # cleanup try: self.app.quit() except Exception: pass def switch(self): # TODO: fix this garbage current_value = self.lens_facing_shared.value new_value = LENS_FACING.BACK.value if current_value == LENS_FACING.FRONT.value else LENS_FACING.FRONT.value self.lens_facing_shared.value = new_value print(f"Switched to {'FRONT' if new_value == LENS_FACING.FRONT.value else 'BACK'} camera") def play(self): if self.process and self.process.is_alive(): print("STREAM MANAGER process already launched") return self.process = multiprocessing.Process(target=self._start) self.process.start() print("STREAM MANAGER process launched") def close(self): if self.process: self.process.terminate() self.process = None print("STREAM MANAGER process terminated")