"""
Action Executor - Executes real-world responses to incidents
"""
import logging
import asyncio
from typing import Dict, List, Any
from datetime import datetime
from pathlib import Path
from config.settings import settings
from services.database_service import db_service
logger = logging.getLogger(__name__)
class ActionExecutor:
"""Executes security response actions"""
def __init__(self):
self.executed_actions: List[Dict[str, Any]] = []
async def execute_plan(
self,
plan: List[Dict[str, Any]],
incident_id: int,
evidence_path: str = ""
):
"""
Execute all actions in response plan
"""
logger.info("Executing response plan for incident #%s", incident_id)
priority_order = {"immediate": 0, "high": 1, "medium": 2, "low": 3}
sorted_plan = sorted(
plan,
key=lambda x: (
priority_order.get(x.get("priority", "medium"), 2),
x.get("step", 999)
)
)
for step in sorted_plan:
action_type = step.get("action")
params = step.get("parameters", {})
try:
await self._execute_action(
action_type,
incident_id,
evidence_path,
params
)
record = {
"incident_id": incident_id,
"action": action_type,
"status": "completed",
"parameters": params,
"timestamp": datetime.utcnow().isoformat()
}
await asyncio.to_thread(
db_service.save_action,
incident_id,
action_type,
record
)
self.executed_actions.append(record)
logger.info(
"[OK] Executed action '%s' for incident #%s",
action_type,
incident_id
)
except Exception as e:
error_record = {
"incident_id": incident_id,
"action": action_type,
"status": "failed",
"error": str(e),
"timestamp": datetime.utcnow().isoformat()
}
await asyncio.to_thread(
db_service.save_action,
incident_id,
action_type,
error_record
)
self.executed_actions.append(error_record)
logger.error(
"[FAIL] Action '%s' failed for incident #%s: %s",
action_type,
incident_id,
e
)
async def _execute_action(
self,
action_type: str,
incident_id: int,
evidence_path: str,
params: Dict[str, Any]
):
"""Execute individual action"""
action_map = {
"save_evidence": self._save_evidence,
"send_alert": self._send_alert,
"log_incident": self._log_incident,
"lock_door": self._lock_door,
"sound_alarm": self._sound_alarm,
"contact_authorities": self._contact_authorities,
"monitor": self._monitor,
"escalate": self._escalate,
}
handler = action_map.get(action_type)
if not handler:
logger.warning("Unknown action type: %s", action_type)
return
await handler(incident_id, evidence_path, params)
async def _save_evidence(
self,
incident_id: int,
evidence_path: str,
params: Dict
):
"""Save evidence to permanent storage"""
if evidence_path and Path(evidence_path).exists():
logger.info("Evidence saved at %s", evidence_path)
else:
logger.warning("Evidence path not found")
async def _send_alert(
self,
incident_id: int,
evidence_path: str,
params: Dict
):
"""Send alert notifications"""
channels = params.get("channels", ["console"])
if "email" in channels and settings.ENABLE_EMAIL_ALERTS:
await self._send_email_alert(incident_id)
if "sms" in channels and settings.ENABLE_SMS_ALERTS:
await self._send_sms_alert(incident_id)
logger.warning(
"[ALERT] SECURITY INCIDENT #%s | Evidence: %s",
incident_id,
evidence_path
)
async def _send_email_alert(self, incident_id: int):
"""Send email alert via SMTP"""
if not all([
settings.SMTP_HOST,
settings.SMTP_USER,
settings.SMTP_PASSWORD,
settings.ALERT_EMAIL,
]):
logger.warning("Email configuration incomplete")
return
incident = await asyncio.to_thread(
db_service.get_incident_by_id,
incident_id
)
if not incident:
logger.error("Incident %s not found for email alert", incident_id)
return
try:
import aiosmtplib
from email.message import EmailMessage
message = EmailMessage()
message["From"] = settings.SMTP_USER
message["To"] = settings.ALERT_EMAIL
message["Subject"] = f"AegisAI Alert: Incident #{incident_id}"
message.set_content(
f"""
AegisAI Security Alert
Incident ID: {incident_id}
Type: {incident['type']}
Severity: {incident['severity']}
Confidence: {incident['confidence']}%
Timestamp: {incident['timestamp']}
Details:
{incident['reasoning']}
"""
)
await aiosmtplib.send(
message,
hostname=settings.SMTP_HOST,
port=settings.SMTP_PORT,
username=settings.SMTP_USER,
password=settings.SMTP_PASSWORD,
use_tls=True,
)
logger.info("Email alert sent to %s", settings.ALERT_EMAIL)
except Exception as e:
logger.error("Email alert failed: %s", e)
async def _send_sms_alert(self, incident_id: int):
"""Send SMS alert via Twilio"""
if not all([
settings.TWILIO_ACCOUNT_SID,
settings.TWILIO_AUTH_TOKEN,
settings.TWILIO_PHONE,
settings.ALERT_PHONE,
]):
logger.warning("SMS configuration incomplete")
return
incident = await asyncio.to_thread(
db_service.get_incident_by_id,
incident_id
)
if not incident:
logger.error("Incident %s not found for SMS", incident_id)
return
try:
from twilio.rest import Client
client = Client(
settings.TWILIO_ACCOUNT_SID,
settings.TWILIO_AUTH_TOKEN,
)
client.messages.create(
body=(
f"AegisAI Alert: {incident['type']} | "
f"Severity: {incident['severity']} | "
f"Confidence: {incident['confidence']}%"
),
from_=settings.TWILIO_PHONE,
to=settings.ALERT_PHONE,
)
logger.info("SMS alert sent to %s", settings.ALERT_PHONE)
except Exception as e:
logger.error("SMS alert failed: %s", e)
async def _log_incident(
self,
incident_id: int,
evidence_path: str,
params: Dict
):
logger.info("Incident #%s confirmed logged", incident_id)
async def _lock_door(
self,
incident_id: int,
evidence_path: str,
params: Dict
):
if not settings.ENABLE_IOT_ACTIONS:
logger.info("[SIMULATED] Doors locked")
return
logger.info("Doors locked via IoT")
async def _sound_alarm(
self,
incident_id: int,
evidence_path: str,
params: Dict
):
if not settings.ENABLE_IOT_ACTIONS:
logger.info("[SIMULATED] Alarm activated")
return
logger.info("Alarm activated via IoT")
async def _contact_authorities(
self,
incident_id: int,
evidence_path: str,
params: Dict
):
logger.info("[SIMULATED] Authorities contacted for incident #%s", incident_id)
async def _monitor(
self,
incident_id: int,
evidence_path: str,
params: Dict
):
duration = params.get("duration", 300)
logger.info(
"Monitoring incident #%s for %s seconds",
incident_id,
duration
)
async def _escalate(
self,
incident_id: int,
evidence_path: str,
params: Dict
):
await asyncio.to_thread(
db_service.update_incident_status,
incident_id,
"escalated"
)
logger.info("Incident #%s escalated", incident_id)
def get_execution_history(self, limit: int = 10) -> List[Dict]:
return self.executed_actions[-limit:]
# Singleton instance
action_executor = ActionExecutor()