import shutil
from printer import perror
if shutil.which("ffplay") is None:
perror("ffplay is not installed or not found in PATH")
import subprocess
import psutil
from enum import Enum
from pathlib import Path
from printer import monitor_stderr
# Stream parameters
ORIGINAL_STREAM_WIDTH = 320
ORIGINAL_STREAM_HEIGHT = 240
FRAMERATE = 30
UDP_PORT = 8001
# Device parameters
SCREEN_WIDTH = 1440
SCREEN_HEIGHT = 900
FONT_BOLD = "/Users/fra/Library/Fonts/Mx437_IBM_VGA_8x16.ttf"
FONT_NORMAL = "/Users/fra/Library/Fonts/Mx437_IBM_DOS_ISO8.ttf"
SCALED_WIDTH = SCREEN_HEIGHT / ORIGINAL_STREAM_WIDTH * ORIGINAL_STREAM_HEIGHT
class LENS_FACING(Enum):
FRONT = 0
BACK = 1
# Possible unused flags:
# -fast
# -sync video
INPUT_FLAGS = [
"-loglevel", "error",
"-fflags", "nobuffer",
"-flags", "low_delay",
"-framedrop",
"-f", "h264",
"-framerate", f"{FRAMERATE}",
# Disable the audio (streamed separately)
"-an",
"-noborder",
"-alwaysontop",
# Position the stream in the center of the screen
"-left", f"{(SCREEN_WIDTH - SCALED_WIDTH)//2}",
"-top", "0",
"-i", f"udp://0.0.0.0:{UDP_PORT}"
]
VIDEO_FILTERS = [
# 90° rotation clockwise
"transpose=1",
# Scale the stream to fit the entire screen height
f"scale={SCALED_WIDTH}:{SCREEN_HEIGHT}",
# Auto-updating FRONT/BACK camera text
f"drawtext=fontfile={FONT_BOLD}:"
"textfile=/tmp/camera_source.ffplayvf:"
"fontsize=h/25:"
"fontcolor=white:"
"bordercolor=black:"
"borderw=1:"
"x=(w-text_w)/2:"
"y=20:"
"reload=5:",
# Auto-updating timestamp
f"drawtext='fontfile={FONT_NORMAL}:"
r"text=%{localtime\:%-d %b %Y %X}':" # strftime style
"fontsize=h/30:"
"fontcolor=white:"
"x=w-text_w-10:"
"y=h-text_h-10",
]
CMD = (
["ffplay"] +
INPUT_FLAGS +
["-vf"] +
[",".join(VIDEO_FILTERS)]
)
class Stream_Manager:
def __init__(self):
self.background_process = None
self.lens_facing = LENS_FACING.FRONT
# Initialize the files referenced in the video filters
self.camera_source_file = Path("/tmp/camera_source.ffplayvf")
self.camera_source_file.write_text(f"{self.lens_facing.name} CAMERA")
def play(self):
if self.background_process:
print(f"Stream already launched")
return
try:
self.background_process = subprocess.Popen(CMD, stderr=subprocess.PIPE, text=True)
monitor_stderr(self.background_process, "FFPLAY")
except Exception as e:
perror(f"Failed to play the stream: {e}")
def switch(self):
self.lens_facing = LENS_FACING.FRONT if self.lens_facing == LENS_FACING.BACK else LENS_FACING.BACK
self.camera_source_file.write_text(f"{self.lens_facing.name} CAMERA")
def close(self):
if self.background_process:
parent = psutil.Process(self.background_process.pid)
for child in parent.children(recursive=True):
child.kill()
parent.kill()
print(f"Stream process terminated")
else:
print(f"No stream process to terminate")