""" Action Executor - Executes real-world responses to incidents """ import logging import asyncio from typing import Dict, List, Any from datetime import datetime from pathlib import Path from config.settings import settings from services.database_service import db_service logger = logging.getLogger(__name__) class ActionExecutor: """Executes security response actions""" def __init__(self): self.executed_actions: List[Dict[str, Any]] = [] async def execute_plan( self, plan: List[Dict[str, Any]], incident_id: int, evidence_path: str = "" ): """ Execute all actions in response plan """ logger.info("Executing response plan for incident #%s", incident_id) priority_order = {"immediate": 0, "high": 1, "medium": 2, "low": 3} sorted_plan = sorted( plan, key=lambda x: ( priority_order.get(x.get("priority", "medium"), 2), x.get("step", 999) ) ) for step in sorted_plan: action_type = step.get("action") params = step.get("parameters", {}) try: await self._execute_action( action_type, incident_id, evidence_path, params ) record = { "incident_id": incident_id, "action": action_type, "status": "completed", "parameters": params, "timestamp": datetime.utcnow().isoformat() } await asyncio.to_thread( db_service.save_action, incident_id, action_type, record ) self.executed_actions.append(record) logger.info( "[OK] Executed action '%s' for incident #%s", action_type, incident_id ) except Exception as e: error_record = { "incident_id": incident_id, "action": action_type, "status": "failed", "error": str(e), "timestamp": datetime.utcnow().isoformat() } await asyncio.to_thread( db_service.save_action, incident_id, action_type, error_record ) self.executed_actions.append(error_record) logger.error( "[FAIL] Action '%s' failed for incident #%s: %s", action_type, incident_id, e ) async def _execute_action( self, action_type: str, incident_id: int, evidence_path: str, params: Dict[str, Any] ): """Execute individual action""" action_map = { "save_evidence": self._save_evidence, "send_alert": self._send_alert, "log_incident": self._log_incident, "lock_door": self._lock_door, "sound_alarm": self._sound_alarm, "contact_authorities": self._contact_authorities, "monitor": self._monitor, "escalate": self._escalate, } handler = action_map.get(action_type) if not handler: logger.warning("Unknown action type: %s", action_type) return await handler(incident_id, evidence_path, params) async def _save_evidence( self, incident_id: int, evidence_path: str, params: Dict ): """Save evidence to permanent storage""" if evidence_path and Path(evidence_path).exists(): logger.info("Evidence saved at %s", evidence_path) else: logger.warning("Evidence path not found") async def _send_alert( self, incident_id: int, evidence_path: str, params: Dict ): """Send alert notifications""" channels = params.get("channels", ["console"]) if "email" in channels and settings.ENABLE_EMAIL_ALERTS: await self._send_email_alert(incident_id) if "sms" in channels and settings.ENABLE_SMS_ALERTS: await self._send_sms_alert(incident_id) logger.warning( "[ALERT] SECURITY INCIDENT #%s | Evidence: %s", incident_id, evidence_path ) async def _send_email_alert(self, incident_id: int): """Send email alert via SMTP""" if not all([ settings.SMTP_HOST, settings.SMTP_USER, settings.SMTP_PASSWORD, settings.ALERT_EMAIL, ]): logger.warning("Email configuration incomplete") return incident = await asyncio.to_thread( db_service.get_incident_by_id, incident_id ) if not incident: logger.error("Incident %s not found for email alert", incident_id) return try: import aiosmtplib from email.message import EmailMessage message = EmailMessage() message["From"] = settings.SMTP_USER message["To"] = settings.ALERT_EMAIL message["Subject"] = f"AegisAI Alert: Incident #{incident_id}" message.set_content( f""" AegisAI Security Alert Incident ID: {incident_id} Type: {incident['type']} Severity: {incident['severity']} Confidence: {incident['confidence']}% Timestamp: {incident['timestamp']} Details: {incident['reasoning']} """ ) await aiosmtplib.send( message, hostname=settings.SMTP_HOST, port=settings.SMTP_PORT, username=settings.SMTP_USER, password=settings.SMTP_PASSWORD, use_tls=True, ) logger.info("Email alert sent to %s", settings.ALERT_EMAIL) except Exception as e: logger.error("Email alert failed: %s", e) async def _send_sms_alert(self, incident_id: int): """Send SMS alert via Twilio""" if not all([ settings.TWILIO_ACCOUNT_SID, settings.TWILIO_AUTH_TOKEN, settings.TWILIO_PHONE, settings.ALERT_PHONE, ]): logger.warning("SMS configuration incomplete") return incident = await asyncio.to_thread( db_service.get_incident_by_id, incident_id ) if not incident: logger.error("Incident %s not found for SMS", incident_id) return try: from twilio.rest import Client client = Client( settings.TWILIO_ACCOUNT_SID, settings.TWILIO_AUTH_TOKEN, ) client.messages.create( body=( f"AegisAI Alert: {incident['type']} | " f"Severity: {incident['severity']} | " f"Confidence: {incident['confidence']}%" ), from_=settings.TWILIO_PHONE, to=settings.ALERT_PHONE, ) logger.info("SMS alert sent to %s", settings.ALERT_PHONE) except Exception as e: logger.error("SMS alert failed: %s", e) async def _log_incident( self, incident_id: int, evidence_path: str, params: Dict ): logger.info("Incident #%s confirmed logged", incident_id) async def _lock_door( self, incident_id: int, evidence_path: str, params: Dict ): if not settings.ENABLE_IOT_ACTIONS: logger.info("[SIMULATED] Doors locked") return logger.info("Doors locked via IoT") async def _sound_alarm( self, incident_id: int, evidence_path: str, params: Dict ): if not settings.ENABLE_IOT_ACTIONS: logger.info("[SIMULATED] Alarm activated") return logger.info("Alarm activated via IoT") async def _contact_authorities( self, incident_id: int, evidence_path: str, params: Dict ): logger.info("[SIMULATED] Authorities contacted for incident #%s", incident_id) async def _monitor( self, incident_id: int, evidence_path: str, params: Dict ): duration = params.get("duration", 300) logger.info( "Monitoring incident #%s for %s seconds", incident_id, duration ) async def _escalate( self, incident_id: int, evidence_path: str, params: Dict ): await asyncio.to_thread( db_service.update_incident_status, incident_id, "escalated" ) logger.info("Incident #%s escalated", incident_id) def get_execution_history(self, limit: int = 10) -> List[Dict]: return self.executed_actions[-limit:] # Singleton instance action_executor = ActionExecutor()