AI-SPEAK / LipReadingApp / camera_worker.py
camera_worker.py
Raw
import time 
import cv2 
import mediapipe as mp 
import os
from PyQt5 import QtCore, QtGui 
import random

from config import (
    CAM_INDEX, MIN_DET_CONF, MIN_TRACK_CONF,
    OUTER_LIPS, IMG_WIDTH, IMG_HEIGHT,
    LIP_PAD_W_RATIO, LIP_PAD_H_RATIO,
    PRESCALE_RANGE_1080x1920, PRESCALE_RANGE_OTHER,
    PRESCALE_SEED,RECORD_SECONDS
)

# Create once (fast & consistent)
_CLAHE = cv2.createCLAHE(clipLimit=2.0, tileGridSize=(8, 8))

# Optional deterministic randomness
_rng = random.Random(PRESCALE_SEED) if PRESCALE_SEED is not None else random

mp_face_mesh = mp.solutions.face_mesh 

_rng = random.Random(PRESCALE_SEED) if PRESCALE_SEED is not None else random

common_resolutions = [
    (1280, 720),
    (1920, 1080)
]


def _choose_prescale_range(frame_w: int, frame_h: int):
    if frame_w == 1080 and frame_h == 1920:
        return PRESCALE_RANGE_1080x1920
    return PRESCALE_RANGE_OTHER


def _sample_scale(scale_range):
    lo, hi = float(scale_range[0]), float(scale_range[1])
    if hi < lo:
        lo, hi = hi, lo
    # clamp to sensible bounds
    #lo = max(0.05, lo)
    #hi = min(1.00, hi)
    return _rng.uniform(lo, hi)


def _extract_lip_roi_gray(frame_bgr, face_landmarks, prescale: float = None):
    """
    Extract lip ROI using OUTER_LIPS indices, apply relative padding,
    apply random preshrink from a profile-dependent range,
    final resize to (IMG_WIDTH, IMG_HEIGHT), CLAHE, return uint8 (H,W).
    """
    orig_h, orig_w = frame_bgr.shape[:2]

    # Extract x,y for outer lips
    xs, ys = [], []
    for idx in OUTER_LIPS:
        x = int(face_landmarks.landmark[idx].x * orig_w)
        y = int(face_landmarks.landmark[idx].y * orig_h)
        xs.append(x)
        ys.append(y)

    min_x, max_x = min(xs), max(xs)
    min_y, max_y = min(ys), max(ys)

    # Relative padding based on bbox size
    bw = max(1, max_x - min_x)
    bh = max(1, max_y - min_y)
    pad_w = int(round(LIP_PAD_W_RATIO * bw))
    pad_h = int(round(LIP_PAD_H_RATIO * bh))

    min_x = max(min_x - pad_w, 0)
    max_x = min(max_x + pad_w, orig_w)
    min_y = max(min_y - pad_h, 0)
    max_y = min(max_y + pad_h, orig_h)

    lip_roi = frame_bgr[min_y:max_y, min_x:max_x]
    if lip_roi.size == 0:
        return None

    lip_gray = cv2.cvtColor(lip_roi, cv2.COLOR_BGR2GRAY)

    if prescale is None:
        scale_range = _choose_prescale_range(orig_w, orig_h)
        scale = _sample_scale(scale_range)
    else:
        # clamp to safe bounds
        scale = float(prescale)
        scale = max(0.05, min(1.00, scale))

    h0, w0 = lip_gray.shape[:2]
    new_w = max(1, int(round(w0 * scale)))
    new_h = max(1, int(round(h0 * scale)))

    shrunk = cv2.resize(lip_gray, (new_w, new_h), interpolation=cv2.INTER_AREA)
    resized = cv2.resize(shrunk, (IMG_WIDTH, IMG_HEIGHT), interpolation=cv2.INTER_LINEAR)

    return _CLAHE.apply(resized)



class CameraWorker(QtCore.QObject):
    frame_ready = QtCore.pyqtSignal(QtGui.QImage)
    recording_done = QtCore.pyqtSignal(object)
    status = QtCore.pyqtSignal(str)
	
    recording_state_changed = QtCore.pyqtSignal(bool)

    def __init__(self, cam_index=CAM_INDEX, parent=None):
        super().__init__(parent)
        self.cap = cv2.VideoCapture(cam_index)
        self.record_prescale = None
        if not self.cap.isOpened():
            raise RuntimeError("Could not open camera.")

        supported = []
        for w, h in common_resolutions:
            self.cap.set(cv2.CAP_PROP_FRAME_WIDTH, w)
            self.cap.set(cv2.CAP_PROP_FRAME_HEIGHT, h)
            # read back what actually applied
            rw = int(self.cap.get(cv2.CAP_PROP_FRAME_WIDTH))
            rh = int(self.cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
            if (rw, rh) == (w, h):
                supported.append((w, h))
            print(f"Requested {w}x{h} → got {rw}x{rh}")
        print("Supported (likely):", supported)

        #self.cap.set(cv2.CAP_PROP_FRAME_WIDTH, 1080)
        #self.cap.set(cv2.CAP_PROP_FRAME_HEIGHT, 1920)
        #self.cap.set(cv2.CAP_PROP_BUFFERSIZE, 1)

        self.face_mesh = mp_face_mesh.FaceMesh(
            static_image_mode=False,
            max_num_faces=1,
            refine_landmarks=True,
            min_detection_confidence=MIN_DET_CONF,
            min_tracking_confidence=MIN_TRACK_CONF,
        )

        self._running = True
        self.recording = False
        self.record_frames = []
        self.record_start_t = None

        # --- NEW ---
        self.video_writer = None
        self.video_path = os.path.abspath("tmp.mp4")
        self.video_fps = 25  # nominal fps for writer
        # ------------

        self.timer = QtCore.QTimer()
        self.timer.timeout.connect(self._capture_step)
        self.timer.start(1)  # as fast as possible

    def _capture_step(self):
        if not self._running:
            return

        ok, frame = self.cap.read()
        if not ok:
            return

        # --- write to file if recording ---
        if self.recording and self.video_writer is not None:
            self.video_writer.write(frame)
			
        self._last_wh = (int(frame.shape[1]), int(frame.shape[0]))

        # --- preview ---
        rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
        h, w, ch = rgb.shape
        qimg = QtGui.QImage(rgb.data, w, h, ch * w, QtGui.QImage.Format_RGB888)
        self.frame_ready.emit(qimg)

        # --- process if recording ---
        if self.recording:
            results = self.face_mesh.process(rgb)
            if results.multi_face_landmarks:
                fl = results.multi_face_landmarks[0]
                if self.record_prescale is None:
                    scale_range = _choose_prescale_range(frame.shape[1], frame.shape[0])
                    self.record_prescale = _sample_scale(scale_range)
                    self.status.emit(f"Recording prescale fixed at {self.record_prescale:.3f}")

                lip_gray = _extract_lip_roi_gray(frame, fl, prescale=self.record_prescale)
                if lip_gray is not None:
                    self.record_frames.append(lip_gray)

            # stop after RECORD_SECONDS
            if time.time() - self.record_start_t >= RECORD_SECONDS:
                self.recording = False
                self.recording_state_changed.emit(False) 
                frames = self.record_frames
                self.record_frames = []

                # --- close writer ---
                if self.video_writer is not None:
                    self.video_writer.release()
                    self.video_writer = None
                    self.status.emit(f"Video saved to {self.video_path}")

                self.status.emit(f"Recording finished. Frames captured: {len(frames)}")
				
                wh = getattr(self, "_last_wh", (0, 0))
                self.recording_done.emit((frames, wh))

    @QtCore.pyqtSlot()
    def start_recording(self):
        if self.recording:
            self.status.emit("Already recording…")
            return
        self.record_frames = []
        self.record_start_t = time.time()
        self.recording = True
        self.record_prescale = None

        # --- NEW: create VideoWriter ---
        fourcc = cv2.VideoWriter_fourcc(*"mp4v")
        w = int(self.cap.get(cv2.CAP_PROP_FRAME_WIDTH))
        h = int(self.cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
        self.video_writer = cv2.VideoWriter(self.video_path, fourcc, self.video_fps, (w, h))
        # -------------------------------

        self.recording_state_changed.emit(True)
        self.status.emit(f"Recording started for {RECORD_SECONDS:.2f}s -> {self.video_path}")

    def close(self):
        self._running = False
        self.recording_state_changed.emit(False)  # NEW
        self.timer.stop()
        self.face_mesh.close()
        if self.video_writer is not None:
            self.video_writer.release()
        self.cap.release()