""" AegisAI Video Processor Service Handles webcam ingestion and demo scenarios using Gemini 3 Vision. """ import asyncio import time import logging import sys from typing import Optional, Dict, Any from datetime import datetime from pathlib import Path import cv2 import numpy as np # Internal AegisAI imports from config.settings import settings from agents.vision_agent import VisionAgent from agents.planner_agent import PlannerAgent from services.action_executor import action_executor from services.database_service import db_service # Logging setup (Redirects to file/console based on level) logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) class VideoProcessor: """Main pipeline for live video processing.""" def __init__(self, source: int = 0, frame_rate: int = None): self.source = source self.frame_rate = frame_rate or settings.FRAME_SAMPLE_RATE self.cap: Optional[cv2.VideoCapture] = None # Initialize Gemini 3 Agents agent_config = { "model_name": settings.GEMINI_MODEL, "api_key": settings.GEMINI_API_KEY, "temperature": settings.TEMPERATURE, "max_output_tokens": settings.MAX_OUTPUT_TOKENS } self.vision_agent = VisionAgent(**agent_config) self.planner_agent = PlannerAgent(**agent_config) self.running = False self.frame_count = 0 def initialize_capture(self) -> bool: self.cap = cv2.VideoCapture(self.source) if not self.cap.isOpened(): logger.error(f"Failed to open source: {self.source}") return False self.cap.set(cv2.CAP_PROP_FRAME_WIDTH, settings.VIDEO_RESOLUTION_WIDTH) self.cap.set(cv2.CAP_PROP_FRAME_HEIGHT, settings.VIDEO_RESOLUTION_HEIGHT) return True async def process_stream(self, display: bool = True): if not self.initialize_capture(): return self.running = True last_process_time = 0 try: while self.running: ret, frame = self.cap.read() if not ret: break self.frame_count += 1 now = time.time() if now - last_process_time >= self.frame_rate: await self._process_frame(frame) last_process_time = now if display: cv2.imshow('AegisAI Monitor', frame) if cv2.waitKey(1) & 0xFF == 27: break await asyncio.sleep(0.01) finally: self.cleanup() async def _process_frame(self, frame: np.ndarray): analysis = await self.vision_agent._safe_process(frame=frame, frame_number=self.frame_count) if analysis and analysis.get('incident'): # Evidence storage timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") path = settings.EVIDENCE_DIR / f"ev_{timestamp}.jpg" cv2.imwrite(str(path), frame) # Planning & Action plan = await self.planner_agent._safe_process(analysis) incident_id = db_service.save_incident({**analysis, "evidence": str(path), "plan": plan}) if plan: await action_executor.execute_plan(plan, incident_id, str(path)) def cleanup(self): if self.cap: self.cap.release() cv2.destroyAllWindows() # ============================================================================ class DemoScenarioRunner: """Runs high-visibility demo scenarios with terminal output.""" def __init__(self): agent_config = { "model_name": settings.GEMINI_MODEL, "api_key": settings.GEMINI_API_KEY, "temperature": settings.TEMPERATURE } self.vision_agent = VisionAgent(**agent_config) self.planner_agent = PlannerAgent(**agent_config) async def run_demo_sequence(self): print("\n" + "?? Starting AegisAI Demo Scenario".center(60)) print("="*60 + "\n") scenarios = [ ("Normal Activity", "A person walking calmly through a hallway", (0, 255, 0)), ("Suspicious Behavior", "Someone wearing a mask and looking into windows", (0, 165, 255)), ("Critical Threat", "A physical altercation breaking out between two people", (0, 0, 255)) ] for title, desc, color in scenarios: await self._run_scenario(title, desc, color) await asyncio.sleep(2) print("\n? Demo sequence completed\n") async def _run_scenario(self, title: str, description: str, color: tuple): print(f"=== SCENARIO: {title} ===") frame = self._create_demo_frame(description, color) # Analyze using VisionAgent result = await self.vision_agent._safe_process(frame=frame, frame_number=1) if result and result.get('incident'): print(f"? Detected: {result.get('type', 'Unknown')}") print(f" Severity: {result.get('severity', 'N/A')}") print(f" Confidence: {result.get('confidence', 0)}%") # Use PlannerAgent if incident is found plan = await self.planner_agent._safe_process(result) if plan: print(f" Response plan executed: {len(plan)} actions") else: print("? No incident detected - Normal operation") print() def _create_demo_frame(self, text: str, color: tuple) -> np.ndarray: frame = np.zeros((720, 1280, 3), dtype=np.uint8) cv2.putText(frame, text, (50, 360), cv2.FONT_HERSHEY_SIMPLEX, 1.2, color, 3) return frame # ============================================================================ async def main(): if len(sys.argv) > 1 and sys.argv[1] == "demo": runner = DemoScenarioRunner() await runner.run_demo_sequence() else: processor = VideoProcessor(source=settings.VIDEO_SOURCE) await processor.process_stream() if __name__ == "__main__": try: asyncio.run(main()) except KeyboardInterrupt: print("\n?? Shutting down...")