import base64 import numpy as np from collections import deque from typing import Dict, Any, Optional from google.genai import types from google.genai.types import GenerateContentConfig, Content, Part from agents.base_agent import BaseAgent from config.settings import settings, VISION_AGENT_PROMPT class VisionAgent(BaseAgent): """Performs real-time security analysis on video frames using Gemini. This agent maintains a temporal history of previous detections to improve contextual awareness and reduce false positives across a video stream. """ def __init__(self, **kwargs): """Initializes the VisionAgent with a 10-frame sliding window history.""" super().__init__(**kwargs) self.max_history = 10 self.frame_history = deque(maxlen=self.max_history) async def process(self, frame: np.ndarray = None, base64_image: str = None, frame_number: int = 0) -> Dict[str, Any]: """Analyzes visual input for security threats and returns a structured report. Args: frame: Optional numpy array (OpenCV format) of the video frame. base64_image: Optional base64 encoded string of the image. frame_number: Current frame index used for temporal tracking. Returns: Dict[str, Any]: Structured analysis containing incident status, severity, confidence, and reasoning. """ try: image_bytes = self._prepare_image_bytes(frame, base64_image) context = self._build_context() user_prompt = "Analyze the input based on the security protocol." if context: user_prompt += f"\n\nTEMPORAL CONTEXT:\n{context}" # WRAP THIS IN TRY/EXCEPT response = await self.client.aio.models.generate_content( model=self.model_name, contents=[ types.Content(role="user", parts=[ types.Part.from_text(text=user_prompt), types.Part.from_bytes(data=image_bytes, mime_type="image/jpeg") ]) ], config=GenerateContentConfig( system_instruction=VISION_AGENT_PROMPT, temperature=settings.TEMPERATURE, response_mime_type="application/json" ), ) result = self._parse_json_response(response.text) except Exception as e: self.logger.error(f"Vision API Error: {str(e)}") return self._default_result(f"API Error: {str(e)}") if not result: return self._default_result("JSON parsing failed") validated = self._validate_result(result) self._update_history(frame_number, validated) return validated def _prepare_image_bytes(self, frame: Optional[np.ndarray], base64_str: Optional[str]) -> bytes: """Normalizes image input into bytes, handling OpenCV frames and Base64 strings. Args: frame: Raw image array. base64_str: Base64 string (with or without data URI prefix). Returns: bytes: JPEG encoded image data. """ if frame is not None: import cv2 success, buffer = cv2.imencode(".jpg", frame) if not success: raise ValueError("Could not encode frame to JPEG") return buffer.tobytes() if base64_str: if "," in base64_str: base64_str = base64_str.split(",")[-1] # Ensure proper padding for the base64 decoder missing_padding = len(base64_str) % 4 if missing_padding: base64_str += "=" * (4 - missing_padding) return base64.b64decode(base64_str) raise ValueError("No valid image source provided") def _build_context(self) -> str: """Returns the newline-separated history of recent detections.""" return "\n".join(self.frame_history) def _update_history(self, frame_num: int, result: Dict): """Adds the current detection to the sliding window history.""" summary = f"Frame {frame_num}: {result.get('type', 'normal')} ({result.get('severity', 'low')})" self.frame_history.append(summary) def _validate_result(self, result: Dict) -> Dict: """Clamps confidence scores and ensures all required fields are present.""" raw_conf = result.get("confidence", 0) confidence = max(0, min(100, int(raw_conf))) # Incident is only True if confidence exceeds the system threshold is_incident = result.get("incident", False) if confidence < settings.CONFIDENCE_THRESHOLD: is_incident = False return { "incident": is_incident, "type": result.get("type", "unknown"), "severity": str(result.get("severity", "low")).lower(), "confidence": confidence, "reasoning": result.get("reasoning", "No explanation"), "subjects": result.get("subjects", []), "recommended_actions": result.get("recommended_actions", []) } def _default_result(self, error_msg: str) -> Dict[str, Any]: """Returns a safe, non-incident result in case of processing errors.""" return { "incident": False, "type": "error", "severity": "low", "confidence": 0, "reasoning": error_msg }