remote-car / Python / stream_manager.py
stream_manager.py
Raw

import shutil
from printer import perror
if shutil.which("ffmpeg") is None:
    perror("ffmpeg is not installed or not found in PATH")

#from vidstab import VidStab
import multiprocessing
import queue
import av
import cv2
import numpy as np
import time
import threading
from datetime import datetime
from enum import Enum
from PyQt5 import QtWidgets, QtGui, QtCore
import sys

from server import METRIC, STREAM_METRICS
from printer import perror

# Stream parameters
ORIGINAL_STREAM_WIDTH = 320
ORIGINAL_STREAM_HEIGHT = 240
FRAMERATE = 30
UDP_PORT = 8001
FRAMETIME_MS = int(1/FRAMERATE * 1000)

# Device parameters
SCREEN_WIDTH = 1440
SCREEN_HEIGHT = 900
SCALED_WIDTH = int(SCREEN_HEIGHT / ORIGINAL_STREAM_WIDTH * ORIGINAL_STREAM_HEIGHT)

class LENS_FACING(Enum):
    FRONT = 0
    BACK = 1

class FONT_SIZE(Enum):
    SMALL = 0.7
    NORMAL = 1.2
    BIG = 1.7
    HUGE = 2.2

class ICON(Enum):
    GAS = 0
    BATTERY_EMPTY = 1
    BATTERY_LOW = 2
    BATTERY_HIGH = 3
    BATTERY_FULL = 4
    SIGNAL_EMPTY = 5
    SIGNAL_LOW = 6
    SIGNAL_HIGH = 7
    SIGNAL_FULL = 8
    LIGHTNING = 9

class Stream_Manager:

    def __init__(self, metrics_queue):
        self.metrics_queue = metrics_queue
        self.metrics = { metric: 0 for metric in STREAM_METRICS }
        self.no_signal_threshold_s = 1
        self.last_frame_time = 0
        self.process = None
        self.font = cv2.FONT_HERSHEY_SIMPLEX
        self.text_thickness = 2
        self.border_thickness = 7
        self.num_cells_in_series = 2

        # cv2.IMREAD_UNCHANGED ensures alpha channel is loaded properly
        self.icons = { icon: cv2.imread(f"icons/{icon.name.lower()}.png", cv2.IMREAD_UNCHANGED) for icon in ICON}

        # Shared state between processes
        self.lens_facing_shared = multiprocessing.Value('i', LENS_FACING.FRONT.value)

        # GUI placeholders (set in _start)
        self.app = None
        self.window = None
        self.label = None
        self.frame_queue = None

        #self.stabilizer = VidStab()

    def _get_text_size(self, text, scale=FONT_SIZE.NORMAL):
        return cv2.getTextSize(text, self.font, scale.value, self.text_thickness)[0]

    def _draw_text(self, frame, text, position, size=FONT_SIZE.NORMAL, color=(255,255,255)):
        # Draw border (black)
        border_color = (0, 0, 0)
        cv2.putText(frame, text, position, self.font, size.value, 
            border_color, self.text_thickness + self.border_thickness, cv2.LINE_AA)
        
        # Draw text (white)
        cv2.putText(frame, text, position, self.font, size.value, 
            color, self.text_thickness, cv2.LINE_AA)
        
        
    def _draw_icon(self, frame, icon, position):
        icon_img = self.icons[icon]
        h, w = icon_img.shape[:2]
        x, y = position
        
        # Center the incon in the y position
        y -= h//2
        
        # Blend the icon with the frame using alpha channel (png's transparency is preserved)
        alpha = icon_img[:, :, 3] / 255.0
        for c in range(3):
            frame[y:y+h, x:x+w, c] = alpha * icon_img[:, :, c] + (1 - alpha) * frame[y:y+h, x:x+w, c]

    def _get_lens_facing(self):
        return LENS_FACING(self.lens_facing_shared.value)

    def _rotate(self, img):
        if self._get_lens_facing() == LENS_FACING.FRONT:
            return cv2.rotate(img, cv2.ROTATE_90_CLOCKWISE)
        else:
            return cv2.rotate(img, cv2.ROTATE_90_COUNTERCLOCKWISE)

    def _create_no_signal_frame(self):
        frame = np.zeros((SCREEN_HEIGHT*2, SCALED_WIDTH*2, 3), dtype=np.uint8)
        h, w = frame.shape[:2]
        
        text = "- NO SIGNAL -"
        (text_w, text_h)= self._get_text_size(text, FONT_SIZE.HUGE)
        text_x = (w - text_w) // 2
        text_y = (h + text_h) // 2
        self._draw_text(frame, text, (text_x, text_y), FONT_SIZE.HUGE)
        
        return frame

    def _voltage_to_percentage(self, voltage):
        single_cell_voltage = voltage / self.num_cells_in_series
        single_cell_full = 4.2
        single_cell_empty = 3.2
        if single_cell_voltage >= single_cell_full:
            return 100
        if single_cell_voltage <= single_cell_empty:
            return 0
        return round((single_cell_voltage - single_cell_empty) * 100)

    def _add_overlays(self, frame):
        h, w = frame.shape[:2]
        padding = 30
        space = 10
        icon_size = 64

        # Camera source
        lens_facing = self._get_lens_facing()
        text = "FRONT CAMERA" if lens_facing == LENS_FACING.FRONT else "BACK CAMERA"
        (text_w, text_h)= self._get_text_size(text, FONT_SIZE.BIG)
        text_x = (w - text_w) // 2
        text_y = padding + text_h
        self._draw_text(frame, text, (text_x, text_y), FONT_SIZE.BIG)
        
        # Timestamp
        text = datetime.now().strftime("%X")
        (text_w, text_h)= self._get_text_size(text, FONT_SIZE.NORMAL)
        text_x = w - text_w - padding
        text_y = h - padding
        self._draw_text(frame, text, (text_x, text_y), FONT_SIZE.NORMAL)
    
        # Date
        text = datetime.now().strftime("%-d %b %Y")
        (text_w, text_h)= self._get_text_size(text, FONT_SIZE.NORMAL)
        text_x = padding
        text_y = h - padding
        self._draw_text(frame, text, (text_x, text_y), FONT_SIZE.NORMAL)

        # Battery
        battey_percent = self.metrics[METRIC.PHONE_BATTERY_PERCENT]
        if battey_percent >= 80:
            icon = ICON.BATTERY_FULL
            color = (255, 255, 255)
        elif battey_percent >= 50:
            icon = ICON.BATTERY_HIGH
            color = (255, 255, 255)
        elif battey_percent >= 25:
            icon = ICON.BATTERY_LOW
            color = (0, 140, 255)
        else:
            icon = ICON.BATTERY_EMPTY
            color = (0, 0, 180)
        
        text = f"{battey_percent}%"
        (text_w, text_h)= self._get_text_size(text, FONT_SIZE.BIG)
        text_x = w - text_w - padding
        text_y = padding + text_h
        self._draw_text(frame, text, (text_x, text_y), FONT_SIZE.BIG, color)
        self._draw_icon(frame, icon, (text_x - icon_size, text_y - text_h//2))

        # Signal
        signal_level = self.metrics[METRIC.SIGNAL_LEVEL]

        if signal_level >= 4:
            icon = ICON.SIGNAL_FULL
        elif signal_level == 3:
            icon = ICON.SIGNAL_HIGH
        elif signal_level == 2:
            icon = ICON.SIGNAL_LOW
        else:
            icon = ICON.SIGNAL_EMPTY
        
        text = "LTE"
        (text_w, text_h)= self._get_text_size(text, FONT_SIZE.BIG)
        text_x = w - text_w - padding
        text_y = padding + text_h + 70
        self._draw_text(frame, text, (text_x, text_y), FONT_SIZE.BIG)
        self._draw_icon(frame, icon, (text_x - icon_size - space, text_y - text_h//2))

        # Car voltage
        voltage = self.metrics[METRIC.CAR_BATTERY_VOLTAGE]/10.0    
        percentage = self._voltage_to_percentage(voltage)

        if percentage >= 50:
            color = (255, 255, 255)
        elif percentage >= 25:
            color = (0, 140, 255)
        else:
            color = (0, 0, 180)
            # Blinking battery low text
            if (int(time.time()) % 2 == 0):
                text = "- CAR BATTERY LOW -"
                (text_w, text_h)= self._get_text_size(text, FONT_SIZE.HUGE)
                text_x = (w - text_w) // 2
                text_y = h // 2 - space
                self._draw_text(frame, text, (text_x, text_y), FONT_SIZE.HUGE, color)

        text = f"{voltage:.1f}V"
        (text_w, text_h)= self._get_text_size(text, FONT_SIZE.BIG)
        text_x = padding + icon_size + space
        text_y = padding + text_h
        self._draw_text(frame, text, (text_x, text_y), FONT_SIZE.BIG)
        self._draw_icon(frame, ICON.GAS, (padding, text_y - text_h//2))
    
        text = f"~{percentage}%"
        text_x = padding + icon_size + space + text_w + space
        self._draw_text(frame, text, (text_x, text_y), FONT_SIZE.BIG, color)

        # Electronics voltage
        voltage = self.metrics[METRIC.ELECTRONICS_BATTERY_VOLTAGE]/10.0    
        percentage = self._voltage_to_percentage(voltage)

        if percentage >= 50:
            color = (255, 255, 255)
        elif percentage >= 25:
            color = (0, 140, 255)
        else:
            color = (0, 0, 180)
            # Blinking battery low text
            if (int(time.time()) % 2 == 0):
                text = "- ELECTRONICS BATTERY LOW -"
                (text_w, text_h)= self._get_text_size(text, FONT_SIZE.HUGE)
                text_x = (w - text_w) // 2
                text_y = h // 2 + text_h + space
                self._draw_text(frame, text, (text_x, text_y), FONT_SIZE.HUGE, color)

        text = f"{voltage:.1f}V"
        (text_w, text_h)= self._get_text_size(text, FONT_SIZE.BIG)
        text_x = padding + icon_size + space
        text_y = padding + icon_size*2 - space*2
        self._draw_text(frame, text, (text_x, text_y), FONT_SIZE.BIG)
        self._draw_icon(frame, ICON.LIGHTNING, (padding, text_y - text_h//2))
    
        text = f"~{percentage}%"
        text_x = padding + icon_size + space + text_w + space
        self._draw_text(frame, text, (text_x, text_y), FONT_SIZE.BIG, color)


        # Heading
        text = f"- {self.metrics[METRIC.HEADING]}' -"
        (text_w, text_h)= self._get_text_size(text, FONT_SIZE.NORMAL)
        text_x = (w - text_w) // 2
        text_y = h - padding - 55
        self._draw_text(frame, text, (text_x, text_y), FONT_SIZE.NORMAL)

        text = self._heading_to_cardinal(self.metrics[METRIC.HEADING])
        (text_w, text_h)= self._get_text_size(text, FONT_SIZE.BIG)
        text_x = (w - text_w) // 2
        text_y = h - padding
        self._draw_text(frame, text, (text_x, text_y), FONT_SIZE.BIG)

    def _heading_to_cardinal(self, degrees):
        directions = [
            "NORTH", 
            "NORTH EAST", 
            "EAST", 
            "SOUTH EAST", 
            "SOUTH", 
            "SOUTH WEST", 
            "WEST", 
            "NORTH WEST"
        ]        
        # Each sector is 45°, offset by 22.5° for correct rounding
        index = int((degrees + 22.5) // 45) % 8
        return directions[index]

    def _telemetry_updater_thread(self):
        while True:
            (metric, value) = self.metrics_queue.get()
            self.metrics[metric] = value

    def _stream_reader_thread(self):
        try:
            input_url = f"udp://0.0.0.0:{UDP_PORT}"
            options = {
                "fflags": "nobuffer",
                "flags": "low_delay",
                "framedrop": "1"
            }
            stream = av.open(input_url, format="h264", mode="r", options=options)
            
            for frame in stream.decode(video=0):
                frame = frame.to_ndarray(format="bgr24")
                frame = self._rotate(frame)
                frame = cv2.resize(frame, (SCALED_WIDTH*2, SCREEN_HEIGHT*2))
                
                try:
                    self.frame_queue.put_nowait(frame)
                    self.last_frame_time = time.time()
                except queue.Full:
                    pass  # Queue full, skip frame
                    
        except Exception as e:
            perror(f"Stream reader error: {e}")
        finally:
            if stream:
                stream.close()

    def _update_display(self):
        frame = None
        try:
            frame = self.frame_queue.get_nowait()
        except queue.Empty:
            pass
        else:
            self._add_overlays(frame)
            
        timed_out = time.time() - self.last_frame_time > self.no_signal_threshold_s
        if frame is None and timed_out:
            frame = self._create_no_signal_frame()

        if frame is not None:
            # convert BGR -> RGB for Qt
            rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
            h, w, ch = rgb.shape
            bytes_per_line = ch * w
            qimg = QtGui.QImage(rgb.data, w, h, bytes_per_line, QtGui.QImage.Format_RGB888)
            pix = QtGui.QPixmap.fromImage(qimg)

            # scale to label/window if needed preserving aspect
            pix = pix.scaled(self.label.width(), self.label.height(), QtCore.Qt.KeepAspectRatio)
            self.label.setPixmap(pix)

    def _start(self):
        self.app = QtWidgets.QApplication(sys.argv)

        self.window = QtWidgets.QWidget()
        self.window.setWindowTitle("VIDEO STREAM")
        self.window.setWindowFlags(
            QtCore.Qt.FramelessWindowHint | QtCore.Qt.WindowStaysOnTopHint)

        # Create QLabel to hold frames
        self.label = QtWidgets.QLabel()
        # make sure label expands to full window
        self.label.setSizePolicy(QtWidgets.QSizePolicy.Expanding, QtWidgets.QSizePolicy.Expanding)
        self.label.setAlignment(QtCore.Qt.AlignCenter)

        layout = QtWidgets.QVBoxLayout()
        layout.setContentsMargins(0, 0, 0, 0)
        layout.addWidget(self.label)
        self.window.setLayout(layout)

        # Position and size the window (match behavior of your cv2 move/resize)
        window_w = SCALED_WIDTH
        window_h = SCREEN_HEIGHT
        left = (SCREEN_WIDTH - SCALED_WIDTH) // 2
        top = 0
        # set geometry (x, y, width, height)
        self.window.setGeometry(left, top, window_w, window_h)

        # Initially show NO SIGNAL
        no_signal_frame = self._create_no_signal_frame()
        rgb = cv2.cvtColor(no_signal_frame, cv2.COLOR_BGR2RGB)
        h, w, ch = rgb.shape
        qimg = QtGui.QImage(rgb.data, w, h, ch * w, QtGui.QImage.Format_RGB888)
        pix = QtGui.QPixmap.fromImage(qimg)
        pix = pix.scaled(window_w, window_h, QtCore.Qt.KeepAspectRatio)
        self.label.setPixmap(pix)

        self.window.show()

        self.frame_queue = queue.Queue(maxsize=1)

        threading.Thread(target=self._stream_reader_thread, daemon=True).start()
        threading.Thread(target=self._telemetry_updater_thread, daemon=True).start()

        timer = QtCore.QTimer()
        timer.timeout.connect(self._update_display)
        timer.start(FRAMETIME_MS)

        # Run the Qt event loop (blocks until window closed)
        try:
            self.app.exec_()
        except Exception as e:
            perror(f"Qt exec error: {e}")
        finally:
            # cleanup
            try:
                self.app.quit()
            except Exception:
                pass

    def switch(self):
        # TODO: fix this garbage
        current_value = self.lens_facing_shared.value
        new_value = LENS_FACING.BACK.value if current_value == LENS_FACING.FRONT.value else LENS_FACING.FRONT.value
        self.lens_facing_shared.value = new_value
        print(f"Switched to {'FRONT' if new_value == LENS_FACING.FRONT.value else 'BACK'} camera")

    def play(self):
        if self.process and self.process.is_alive():
            print("STREAM MANAGER process already launched")
            return

        self.process = multiprocessing.Process(target=self._start)
        self.process.start()
        print("STREAM MANAGER process launched")

    def close(self):
        if self.process:
            self.process.terminate()
            self.process = None
        print("STREAM MANAGER process terminated")