import time import cv2 import mediapipe as mp import os from PyQt5 import QtCore, QtGui import random from config import ( CAM_INDEX, MIN_DET_CONF, MIN_TRACK_CONF, OUTER_LIPS, IMG_WIDTH, IMG_HEIGHT, LIP_PAD_W_RATIO, LIP_PAD_H_RATIO, PRESCALE_RANGE_1080x1920, PRESCALE_RANGE_OTHER, PRESCALE_SEED,RECORD_SECONDS ) # Create once (fast & consistent) _CLAHE = cv2.createCLAHE(clipLimit=2.0, tileGridSize=(8, 8)) # Optional deterministic randomness _rng = random.Random(PRESCALE_SEED) if PRESCALE_SEED is not None else random mp_face_mesh = mp.solutions.face_mesh _rng = random.Random(PRESCALE_SEED) if PRESCALE_SEED is not None else random common_resolutions = [ (1280, 720), (1920, 1080) ] def _choose_prescale_range(frame_w: int, frame_h: int): if frame_w == 1080 and frame_h == 1920: return PRESCALE_RANGE_1080x1920 return PRESCALE_RANGE_OTHER def _sample_scale(scale_range): lo, hi = float(scale_range[0]), float(scale_range[1]) if hi < lo: lo, hi = hi, lo # clamp to sensible bounds #lo = max(0.05, lo) #hi = min(1.00, hi) return _rng.uniform(lo, hi) def _extract_lip_roi_gray(frame_bgr, face_landmarks, prescale: float = None): """ Extract lip ROI using OUTER_LIPS indices, apply relative padding, apply random preshrink from a profile-dependent range, final resize to (IMG_WIDTH, IMG_HEIGHT), CLAHE, return uint8 (H,W). """ orig_h, orig_w = frame_bgr.shape[:2] # Extract x,y for outer lips xs, ys = [], [] for idx in OUTER_LIPS: x = int(face_landmarks.landmark[idx].x * orig_w) y = int(face_landmarks.landmark[idx].y * orig_h) xs.append(x) ys.append(y) min_x, max_x = min(xs), max(xs) min_y, max_y = min(ys), max(ys) # Relative padding based on bbox size bw = max(1, max_x - min_x) bh = max(1, max_y - min_y) pad_w = int(round(LIP_PAD_W_RATIO * bw)) pad_h = int(round(LIP_PAD_H_RATIO * bh)) min_x = max(min_x - pad_w, 0) max_x = min(max_x + pad_w, orig_w) min_y = max(min_y - pad_h, 0) max_y = min(max_y + pad_h, orig_h) lip_roi = frame_bgr[min_y:max_y, min_x:max_x] if lip_roi.size == 0: return None lip_gray = cv2.cvtColor(lip_roi, cv2.COLOR_BGR2GRAY) if prescale is None: scale_range = _choose_prescale_range(orig_w, orig_h) scale = _sample_scale(scale_range) else: # clamp to safe bounds scale = float(prescale) scale = max(0.05, min(1.00, scale)) h0, w0 = lip_gray.shape[:2] new_w = max(1, int(round(w0 * scale))) new_h = max(1, int(round(h0 * scale))) shrunk = cv2.resize(lip_gray, (new_w, new_h), interpolation=cv2.INTER_AREA) resized = cv2.resize(shrunk, (IMG_WIDTH, IMG_HEIGHT), interpolation=cv2.INTER_LINEAR) return _CLAHE.apply(resized) class CameraWorker(QtCore.QObject): frame_ready = QtCore.pyqtSignal(QtGui.QImage) recording_done = QtCore.pyqtSignal(object) status = QtCore.pyqtSignal(str) recording_state_changed = QtCore.pyqtSignal(bool) def __init__(self, cam_index=CAM_INDEX, parent=None): super().__init__(parent) self.cap = cv2.VideoCapture(cam_index) self.record_prescale = None if not self.cap.isOpened(): raise RuntimeError("Could not open camera.") supported = [] for w, h in common_resolutions: self.cap.set(cv2.CAP_PROP_FRAME_WIDTH, w) self.cap.set(cv2.CAP_PROP_FRAME_HEIGHT, h) # read back what actually applied rw = int(self.cap.get(cv2.CAP_PROP_FRAME_WIDTH)) rh = int(self.cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) if (rw, rh) == (w, h): supported.append((w, h)) print(f"Requested {w}x{h} → got {rw}x{rh}") print("Supported (likely):", supported) #self.cap.set(cv2.CAP_PROP_FRAME_WIDTH, 1080) #self.cap.set(cv2.CAP_PROP_FRAME_HEIGHT, 1920) #self.cap.set(cv2.CAP_PROP_BUFFERSIZE, 1) self.face_mesh = mp_face_mesh.FaceMesh( static_image_mode=False, max_num_faces=1, refine_landmarks=True, min_detection_confidence=MIN_DET_CONF, min_tracking_confidence=MIN_TRACK_CONF, ) self._running = True self.recording = False self.record_frames = [] self.record_start_t = None # --- NEW --- self.video_writer = None self.video_path = os.path.abspath("tmp.mp4") self.video_fps = 25 # nominal fps for writer # ------------ self.timer = QtCore.QTimer() self.timer.timeout.connect(self._capture_step) self.timer.start(1) # as fast as possible def _capture_step(self): if not self._running: return ok, frame = self.cap.read() if not ok: return # --- write to file if recording --- if self.recording and self.video_writer is not None: self.video_writer.write(frame) self._last_wh = (int(frame.shape[1]), int(frame.shape[0])) # --- preview --- rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) h, w, ch = rgb.shape qimg = QtGui.QImage(rgb.data, w, h, ch * w, QtGui.QImage.Format_RGB888) self.frame_ready.emit(qimg) # --- process if recording --- if self.recording: results = self.face_mesh.process(rgb) if results.multi_face_landmarks: fl = results.multi_face_landmarks[0] if self.record_prescale is None: scale_range = _choose_prescale_range(frame.shape[1], frame.shape[0]) self.record_prescale = _sample_scale(scale_range) self.status.emit(f"Recording prescale fixed at {self.record_prescale:.3f}") lip_gray = _extract_lip_roi_gray(frame, fl, prescale=self.record_prescale) if lip_gray is not None: self.record_frames.append(lip_gray) # stop after RECORD_SECONDS if time.time() - self.record_start_t >= RECORD_SECONDS: self.recording = False self.recording_state_changed.emit(False) frames = self.record_frames self.record_frames = [] # --- close writer --- if self.video_writer is not None: self.video_writer.release() self.video_writer = None self.status.emit(f"Video saved to {self.video_path}") self.status.emit(f"Recording finished. Frames captured: {len(frames)}") wh = getattr(self, "_last_wh", (0, 0)) self.recording_done.emit((frames, wh)) @QtCore.pyqtSlot() def start_recording(self): if self.recording: self.status.emit("Already recording…") return self.record_frames = [] self.record_start_t = time.time() self.recording = True self.record_prescale = None # --- NEW: create VideoWriter --- fourcc = cv2.VideoWriter_fourcc(*"mp4v") w = int(self.cap.get(cv2.CAP_PROP_FRAME_WIDTH)) h = int(self.cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) self.video_writer = cv2.VideoWriter(self.video_path, fourcc, self.video_fps, (w, h)) # ------------------------------- self.recording_state_changed.emit(True) self.status.emit(f"Recording started for {RECORD_SECONDS:.2f}s -> {self.video_path}") def close(self): self._running = False self.recording_state_changed.emit(False) # NEW self.timer.stop() self.face_mesh.close() if self.video_writer is not None: self.video_writer.release() self.cap.release()