aegisai / backend / services / video_processor.py
video_processor.py
Raw
"""
AegisAI Video Processor Service
Handles webcam ingestion and demo scenarios using Gemini 3 Vision.
"""

import asyncio
import time
import logging
import sys
from typing import Optional, Dict, Any
from datetime import datetime
from pathlib import Path

import cv2
import numpy as np

# Internal AegisAI imports
from config.settings import settings
from agents.vision_agent import VisionAgent
from agents.planner_agent import PlannerAgent
from services.action_executor import action_executor
from services.database_service import db_service

# Logging setup (Redirects to file/console based on level)
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

class VideoProcessor:
    """Main pipeline for live video processing."""
    
    def __init__(self, source: int = 0, frame_rate: int = None):
        self.source = source
        self.frame_rate = frame_rate or settings.FRAME_SAMPLE_RATE
        self.cap: Optional[cv2.VideoCapture] = None
        
        # Initialize Gemini 3 Agents
        agent_config = {
            "model_name": settings.GEMINI_MODEL,
            "api_key": settings.GEMINI_API_KEY,
            "temperature": settings.TEMPERATURE,
            "max_output_tokens": settings.MAX_OUTPUT_TOKENS
        }
        self.vision_agent = VisionAgent(**agent_config)
        self.planner_agent = PlannerAgent(**agent_config)
        
        self.running = False
        self.frame_count = 0

    def initialize_capture(self) -> bool:
        self.cap = cv2.VideoCapture(self.source)
        if not self.cap.isOpened():
            logger.error(f"Failed to open source: {self.source}")
            return False
        self.cap.set(cv2.CAP_PROP_FRAME_WIDTH, settings.VIDEO_RESOLUTION_WIDTH)
        self.cap.set(cv2.CAP_PROP_FRAME_HEIGHT, settings.VIDEO_RESOLUTION_HEIGHT)
        return True

    async def process_stream(self, display: bool = True):
        if not self.initialize_capture(): return
        self.running = True
        last_process_time = 0

        try:
            while self.running:
                ret, frame = self.cap.read()
                if not ret: break
                
                self.frame_count += 1
                now = time.time()
                
                if now - last_process_time >= self.frame_rate:
                    await self._process_frame(frame)
                    last_process_time = now

                if display:
                    cv2.imshow('AegisAI Monitor', frame)
                    if cv2.waitKey(1) & 0xFF == 27: break
                
                await asyncio.sleep(0.01)
        finally:
            self.cleanup()

    async def _process_frame(self, frame: np.ndarray):
        analysis = await self.vision_agent._safe_process(frame=frame, frame_number=self.frame_count)
        if analysis and analysis.get('incident'):
            # Evidence storage
            timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
            path = settings.EVIDENCE_DIR / f"ev_{timestamp}.jpg"
            cv2.imwrite(str(path), frame)
            
            # Planning & Action
            plan = await self.planner_agent._safe_process(analysis)
            incident_id = db_service.save_incident({**analysis, "evidence": str(path), "plan": plan})
            if plan:
                await action_executor.execute_plan(plan, incident_id, str(path))

    def cleanup(self):
        if self.cap: self.cap.release()
        cv2.destroyAllWindows()

# ============================================================================

class DemoScenarioRunner:
    """Runs high-visibility demo scenarios with terminal output."""
    
    def __init__(self):
        agent_config = {
            "model_name": settings.GEMINI_MODEL,
            "api_key": settings.GEMINI_API_KEY,
            "temperature": settings.TEMPERATURE
        }
        self.vision_agent = VisionAgent(**agent_config)
        self.planner_agent = PlannerAgent(**agent_config)

    async def run_demo_sequence(self):
        print("\n" + "?? Starting AegisAI Demo Scenario".center(60))
        print("="*60 + "\n")

        scenarios = [
            ("Normal Activity", "A person walking calmly through a hallway", (0, 255, 0)),
            ("Suspicious Behavior", "Someone wearing a mask and looking into windows", (0, 165, 255)),
            ("Critical Threat", "A physical altercation breaking out between two people", (0, 0, 255))
        ]

        for title, desc, color in scenarios:
            await self._run_scenario(title, desc, color)
            await asyncio.sleep(2)

        print("\n? Demo sequence completed\n")

    async def _run_scenario(self, title: str, description: str, color: tuple):
        print(f"=== SCENARIO: {title} ===")
        frame = self._create_demo_frame(description, color)
        
        # Analyze using VisionAgent
        result = await self.vision_agent._safe_process(frame=frame, frame_number=1)

        if result and result.get('incident'):
            print(f"? Detected: {result.get('type', 'Unknown')}")
            print(f"  Severity: {result.get('severity', 'N/A')}")
            print(f"  Confidence: {result.get('confidence', 0)}%")
            
            # Use PlannerAgent if incident is found
            plan = await self.planner_agent._safe_process(result)
            if plan:
                print(f"  Response plan executed: {len(plan)} actions")
        else:
            print("? No incident detected - Normal operation")
        print()

    def _create_demo_frame(self, text: str, color: tuple) -> np.ndarray:
        frame = np.zeros((720, 1280, 3), dtype=np.uint8)
        cv2.putText(frame, text, (50, 360), cv2.FONT_HERSHEY_SIMPLEX, 1.2, color, 3)
        return frame

# ============================================================================

async def main():
    if len(sys.argv) > 1 and sys.argv[1] == "demo":
        runner = DemoScenarioRunner()
        await runner.run_demo_sequence()
    else:
        processor = VideoProcessor(source=settings.VIDEO_SOURCE)
        await processor.process_stream()

if __name__ == "__main__":
    try:
        asyncio.run(main())
    except KeyboardInterrupt:
        print("\n?? Shutting down...")