import time
import cv2
import mediapipe as mp
import os
from PyQt5 import QtCore, QtGui
import random
from config import (
CAM_INDEX, MIN_DET_CONF, MIN_TRACK_CONF,
OUTER_LIPS, IMG_WIDTH, IMG_HEIGHT,
LIP_PAD_W_RATIO, LIP_PAD_H_RATIO,
PRESCALE_RANGE_1080x1920, PRESCALE_RANGE_OTHER,
PRESCALE_SEED,RECORD_SECONDS
)
# Create once (fast & consistent)
_CLAHE = cv2.createCLAHE(clipLimit=2.0, tileGridSize=(8, 8))
# Optional deterministic randomness
_rng = random.Random(PRESCALE_SEED) if PRESCALE_SEED is not None else random
mp_face_mesh = mp.solutions.face_mesh
_rng = random.Random(PRESCALE_SEED) if PRESCALE_SEED is not None else random
common_resolutions = [
(1280, 720),
(1920, 1080)
]
def _choose_prescale_range(frame_w: int, frame_h: int):
if frame_w == 1080 and frame_h == 1920:
return PRESCALE_RANGE_1080x1920
return PRESCALE_RANGE_OTHER
def _sample_scale(scale_range):
lo, hi = float(scale_range[0]), float(scale_range[1])
if hi < lo:
lo, hi = hi, lo
# clamp to sensible bounds
#lo = max(0.05, lo)
#hi = min(1.00, hi)
return _rng.uniform(lo, hi)
def _extract_lip_roi_gray(frame_bgr, face_landmarks, prescale: float = None):
"""
Extract lip ROI using OUTER_LIPS indices, apply relative padding,
apply random preshrink from a profile-dependent range,
final resize to (IMG_WIDTH, IMG_HEIGHT), CLAHE, return uint8 (H,W).
"""
orig_h, orig_w = frame_bgr.shape[:2]
# Extract x,y for outer lips
xs, ys = [], []
for idx in OUTER_LIPS:
x = int(face_landmarks.landmark[idx].x * orig_w)
y = int(face_landmarks.landmark[idx].y * orig_h)
xs.append(x)
ys.append(y)
min_x, max_x = min(xs), max(xs)
min_y, max_y = min(ys), max(ys)
# Relative padding based on bbox size
bw = max(1, max_x - min_x)
bh = max(1, max_y - min_y)
pad_w = int(round(LIP_PAD_W_RATIO * bw))
pad_h = int(round(LIP_PAD_H_RATIO * bh))
min_x = max(min_x - pad_w, 0)
max_x = min(max_x + pad_w, orig_w)
min_y = max(min_y - pad_h, 0)
max_y = min(max_y + pad_h, orig_h)
lip_roi = frame_bgr[min_y:max_y, min_x:max_x]
if lip_roi.size == 0:
return None
lip_gray = cv2.cvtColor(lip_roi, cv2.COLOR_BGR2GRAY)
if prescale is None:
scale_range = _choose_prescale_range(orig_w, orig_h)
scale = _sample_scale(scale_range)
else:
# clamp to safe bounds
scale = float(prescale)
scale = max(0.05, min(1.00, scale))
h0, w0 = lip_gray.shape[:2]
new_w = max(1, int(round(w0 * scale)))
new_h = max(1, int(round(h0 * scale)))
shrunk = cv2.resize(lip_gray, (new_w, new_h), interpolation=cv2.INTER_AREA)
resized = cv2.resize(shrunk, (IMG_WIDTH, IMG_HEIGHT), interpolation=cv2.INTER_LINEAR)
return _CLAHE.apply(resized)
class CameraWorker(QtCore.QObject):
frame_ready = QtCore.pyqtSignal(QtGui.QImage)
recording_done = QtCore.pyqtSignal(object)
status = QtCore.pyqtSignal(str)
recording_state_changed = QtCore.pyqtSignal(bool)
def __init__(self, cam_index=CAM_INDEX, parent=None):
super().__init__(parent)
self.cap = cv2.VideoCapture(cam_index)
self.record_prescale = None
if not self.cap.isOpened():
raise RuntimeError("Could not open camera.")
supported = []
for w, h in common_resolutions:
self.cap.set(cv2.CAP_PROP_FRAME_WIDTH, w)
self.cap.set(cv2.CAP_PROP_FRAME_HEIGHT, h)
# read back what actually applied
rw = int(self.cap.get(cv2.CAP_PROP_FRAME_WIDTH))
rh = int(self.cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
if (rw, rh) == (w, h):
supported.append((w, h))
print(f"Requested {w}x{h} → got {rw}x{rh}")
print("Supported (likely):", supported)
#self.cap.set(cv2.CAP_PROP_FRAME_WIDTH, 1080)
#self.cap.set(cv2.CAP_PROP_FRAME_HEIGHT, 1920)
#self.cap.set(cv2.CAP_PROP_BUFFERSIZE, 1)
self.face_mesh = mp_face_mesh.FaceMesh(
static_image_mode=False,
max_num_faces=1,
refine_landmarks=True,
min_detection_confidence=MIN_DET_CONF,
min_tracking_confidence=MIN_TRACK_CONF,
)
self._running = True
self.recording = False
self.record_frames = []
self.record_start_t = None
# --- NEW ---
self.video_writer = None
self.video_path = os.path.abspath("tmp.mp4")
self.video_fps = 25 # nominal fps for writer
# ------------
self.timer = QtCore.QTimer()
self.timer.timeout.connect(self._capture_step)
self.timer.start(1) # as fast as possible
def _capture_step(self):
if not self._running:
return
ok, frame = self.cap.read()
if not ok:
return
# --- write to file if recording ---
if self.recording and self.video_writer is not None:
self.video_writer.write(frame)
self._last_wh = (int(frame.shape[1]), int(frame.shape[0]))
# --- preview ---
rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
h, w, ch = rgb.shape
qimg = QtGui.QImage(rgb.data, w, h, ch * w, QtGui.QImage.Format_RGB888)
self.frame_ready.emit(qimg)
# --- process if recording ---
if self.recording:
results = self.face_mesh.process(rgb)
if results.multi_face_landmarks:
fl = results.multi_face_landmarks[0]
if self.record_prescale is None:
scale_range = _choose_prescale_range(frame.shape[1], frame.shape[0])
self.record_prescale = _sample_scale(scale_range)
self.status.emit(f"Recording prescale fixed at {self.record_prescale:.3f}")
lip_gray = _extract_lip_roi_gray(frame, fl, prescale=self.record_prescale)
if lip_gray is not None:
self.record_frames.append(lip_gray)
# stop after RECORD_SECONDS
if time.time() - self.record_start_t >= RECORD_SECONDS:
self.recording = False
self.recording_state_changed.emit(False)
frames = self.record_frames
self.record_frames = []
# --- close writer ---
if self.video_writer is not None:
self.video_writer.release()
self.video_writer = None
self.status.emit(f"Video saved to {self.video_path}")
self.status.emit(f"Recording finished. Frames captured: {len(frames)}")
wh = getattr(self, "_last_wh", (0, 0))
self.recording_done.emit((frames, wh))
@QtCore.pyqtSlot()
def start_recording(self):
if self.recording:
self.status.emit("Already recording…")
return
self.record_frames = []
self.record_start_t = time.time()
self.recording = True
self.record_prescale = None
# --- NEW: create VideoWriter ---
fourcc = cv2.VideoWriter_fourcc(*"mp4v")
w = int(self.cap.get(cv2.CAP_PROP_FRAME_WIDTH))
h = int(self.cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
self.video_writer = cv2.VideoWriter(self.video_path, fourcc, self.video_fps, (w, h))
# -------------------------------
self.recording_state_changed.emit(True)
self.status.emit(f"Recording started for {RECORD_SECONDS:.2f}s -> {self.video_path}")
def close(self):
self._running = False
self.recording_state_changed.emit(False) # NEW
self.timer.stop()
self.face_mesh.close()
if self.video_writer is not None:
self.video_writer.release()
self.cap.release()