remote-car / Python / stream_manager_ffplay.py
stream_manager_ffplay.py
Raw

import shutil
from printer import perror
if shutil.which("ffplay") is None:
    perror("ffplay is not installed or not found in PATH")

import subprocess
import psutil
from enum import Enum
from pathlib import Path
from printer import monitor_stderr

# Stream parameters
ORIGINAL_STREAM_WIDTH = 320
ORIGINAL_STREAM_HEIGHT = 240
FRAMERATE = 30
UDP_PORT = 8001

# Device parameters
SCREEN_WIDTH = 1440
SCREEN_HEIGHT = 900
FONT_BOLD = "/Users/fra/Library/Fonts/Mx437_IBM_VGA_8x16.ttf"
FONT_NORMAL = "/Users/fra/Library/Fonts/Mx437_IBM_DOS_ISO8.ttf"

SCALED_WIDTH = SCREEN_HEIGHT / ORIGINAL_STREAM_WIDTH * ORIGINAL_STREAM_HEIGHT

class LENS_FACING(Enum):
    FRONT = 0
    BACK = 1

# Possible unused flags:
#  -fast
#  -sync video

INPUT_FLAGS = [
    "-loglevel", "error",
    "-fflags", "nobuffer",
    "-flags", "low_delay", 
    "-framedrop",
    "-f", "h264",
    "-framerate", f"{FRAMERATE}",
    # Disable the audio (streamed separately)
    "-an", 
    "-noborder",
    "-alwaysontop",
    # Position the stream in the center of the screen
    "-left", f"{(SCREEN_WIDTH - SCALED_WIDTH)//2}",
    "-top", "0",
    "-i", f"udp://0.0.0.0:{UDP_PORT}"   
]

VIDEO_FILTERS = [
    # 90° rotation clockwise
    "transpose=1",

    # Scale the stream to fit the entire screen height
    f"scale={SCALED_WIDTH}:{SCREEN_HEIGHT}",

    # Auto-updating FRONT/BACK camera text
    f"drawtext=fontfile={FONT_BOLD}:"
    "textfile=/tmp/camera_source.ffplayvf:"
    "fontsize=h/25:"
    "fontcolor=white:"
    "bordercolor=black:"
    "borderw=1:"
    "x=(w-text_w)/2:"
    "y=20:"
    "reload=5:",

    # Auto-updating timestamp
    f"drawtext='fontfile={FONT_NORMAL}:"
    r"text=%{localtime\:%-d %b %Y %X}':" # strftime style 
    "fontsize=h/30:"
    "fontcolor=white:"
    "x=w-text_w-10:"
    "y=h-text_h-10",
]

CMD = (
    ["ffplay"] + 
    INPUT_FLAGS + 
    ["-vf"] +
    [",".join(VIDEO_FILTERS)]
)

class Stream_Manager:

    def __init__(self):
        self.background_process = None
        self.lens_facing = LENS_FACING.FRONT

        # Initialize the files referenced in the video filters
        self.camera_source_file = Path("/tmp/camera_source.ffplayvf")
        self.camera_source_file.write_text(f"{self.lens_facing.name} CAMERA")

    def play(self):
        if self.background_process:
            print(f"Stream already launched")
            return
        try:
            self.background_process = subprocess.Popen(CMD, stderr=subprocess.PIPE, text=True)
            monitor_stderr(self.background_process, "FFPLAY")
        except Exception as e:
            perror(f"Failed to play the stream: {e}")

    def switch(self):
        self.lens_facing = LENS_FACING.FRONT if self.lens_facing == LENS_FACING.BACK else LENS_FACING.BACK
        self.camera_source_file.write_text(f"{self.lens_facing.name} CAMERA")

    def close(self):
        if self.background_process:
            parent = psutil.Process(self.background_process.pid)
            for child in parent.children(recursive=True):
                child.kill()
            parent.kill()
            print(f"Stream process terminated")
        else:
            print(f"No stream process to terminate")