"""
AegisAI Video Processor Service
Handles webcam ingestion and demo scenarios using Gemini 3 Vision.
"""
import asyncio
import time
import logging
import sys
from typing import Optional, Dict, Any
from datetime import datetime
from pathlib import Path
import cv2
import numpy as np
# Internal AegisAI imports
from config.settings import settings
from agents.vision_agent import VisionAgent
from agents.planner_agent import PlannerAgent
from services.action_executor import action_executor
from services.database_service import db_service
# Logging setup (Redirects to file/console based on level)
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class VideoProcessor:
"""Main pipeline for live video processing."""
def __init__(self, source: int = 0, frame_rate: int = None):
self.source = source
self.frame_rate = frame_rate or settings.FRAME_SAMPLE_RATE
self.cap: Optional[cv2.VideoCapture] = None
# Initialize Gemini 3 Agents
agent_config = {
"model_name": settings.GEMINI_MODEL,
"api_key": settings.GEMINI_API_KEY,
"temperature": settings.TEMPERATURE,
"max_output_tokens": settings.MAX_OUTPUT_TOKENS
}
self.vision_agent = VisionAgent(**agent_config)
self.planner_agent = PlannerAgent(**agent_config)
self.running = False
self.frame_count = 0
def initialize_capture(self) -> bool:
self.cap = cv2.VideoCapture(self.source)
if not self.cap.isOpened():
logger.error(f"Failed to open source: {self.source}")
return False
self.cap.set(cv2.CAP_PROP_FRAME_WIDTH, settings.VIDEO_RESOLUTION_WIDTH)
self.cap.set(cv2.CAP_PROP_FRAME_HEIGHT, settings.VIDEO_RESOLUTION_HEIGHT)
return True
async def process_stream(self, display: bool = True):
if not self.initialize_capture(): return
self.running = True
last_process_time = 0
try:
while self.running:
ret, frame = self.cap.read()
if not ret: break
self.frame_count += 1
now = time.time()
if now - last_process_time >= self.frame_rate:
await self._process_frame(frame)
last_process_time = now
if display:
cv2.imshow('AegisAI Monitor', frame)
if cv2.waitKey(1) & 0xFF == 27: break
await asyncio.sleep(0.01)
finally:
self.cleanup()
async def _process_frame(self, frame: np.ndarray):
analysis = await self.vision_agent._safe_process(frame=frame, frame_number=self.frame_count)
if analysis and analysis.get('incident'):
# Evidence storage
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
path = settings.EVIDENCE_DIR / f"ev_{timestamp}.jpg"
cv2.imwrite(str(path), frame)
# Planning & Action
plan = await self.planner_agent._safe_process(analysis)
incident_id = db_service.save_incident({**analysis, "evidence": str(path), "plan": plan})
if plan:
await action_executor.execute_plan(plan, incident_id, str(path))
def cleanup(self):
if self.cap: self.cap.release()
cv2.destroyAllWindows()
# ============================================================================
class DemoScenarioRunner:
"""Runs high-visibility demo scenarios with terminal output."""
def __init__(self):
agent_config = {
"model_name": settings.GEMINI_MODEL,
"api_key": settings.GEMINI_API_KEY,
"temperature": settings.TEMPERATURE
}
self.vision_agent = VisionAgent(**agent_config)
self.planner_agent = PlannerAgent(**agent_config)
async def run_demo_sequence(self):
print("\n" + "?? Starting AegisAI Demo Scenario".center(60))
print("="*60 + "\n")
scenarios = [
("Normal Activity", "A person walking calmly through a hallway", (0, 255, 0)),
("Suspicious Behavior", "Someone wearing a mask and looking into windows", (0, 165, 255)),
("Critical Threat", "A physical altercation breaking out between two people", (0, 0, 255))
]
for title, desc, color in scenarios:
await self._run_scenario(title, desc, color)
await asyncio.sleep(2)
print("\n? Demo sequence completed\n")
async def _run_scenario(self, title: str, description: str, color: tuple):
print(f"=== SCENARIO: {title} ===")
frame = self._create_demo_frame(description, color)
# Analyze using VisionAgent
result = await self.vision_agent._safe_process(frame=frame, frame_number=1)
if result and result.get('incident'):
print(f"? Detected: {result.get('type', 'Unknown')}")
print(f" Severity: {result.get('severity', 'N/A')}")
print(f" Confidence: {result.get('confidence', 0)}%")
# Use PlannerAgent if incident is found
plan = await self.planner_agent._safe_process(result)
if plan:
print(f" Response plan executed: {len(plan)} actions")
else:
print("? No incident detected - Normal operation")
print()
def _create_demo_frame(self, text: str, color: tuple) -> np.ndarray:
frame = np.zeros((720, 1280, 3), dtype=np.uint8)
cv2.putText(frame, text, (50, 360), cv2.FONT_HERSHEY_SIMPLEX, 1.2, color, 3)
return frame
# ============================================================================
async def main():
if len(sys.argv) > 1 and sys.argv[1] == "demo":
runner = DemoScenarioRunner()
await runner.run_demo_sequence()
else:
processor = VideoProcessor(source=settings.VIDEO_SOURCE)
await processor.process_stream()
if __name__ == "__main__":
try:
asyncio.run(main())
except KeyboardInterrupt:
print("\n?? Shutting down...")