""" Database Service - SQLite operations with async support """ import sqlite3 import json import logging from typing import Dict, List, Optional, Any from datetime import datetime from pathlib import Path from config.settings import settings logger = logging.getLogger(__name__) class DatabaseService: """Thread-safe database operations for incident tracking""" def __init__(self, db_path: Optional[Path] = None): self.db_path = db_path or settings.DB_PATH self._ensure_database() def _ensure_database(self): """Create database and tables if they don't exist""" conn = sqlite3.connect(self.db_path) cursor = conn.cursor() # Incidents table cursor.execute(""" CREATE TABLE IF NOT EXISTS incidents ( id INTEGER PRIMARY KEY AUTOINCREMENT, timestamp TEXT NOT NULL, incident_type TEXT NOT NULL, severity TEXT NOT NULL, confidence REAL NOT NULL, reasoning TEXT NOT NULL, subjects TEXT, evidence_path TEXT, response_plan TEXT, status TEXT DEFAULT 'active', created_at TEXT DEFAULT CURRENT_TIMESTAMP ) """) # Actions table cursor.execute(""" CREATE TABLE IF NOT EXISTS actions ( id INTEGER PRIMARY KEY AUTOINCREMENT, incident_id INTEGER, action_type TEXT NOT NULL, action_data TEXT, status TEXT DEFAULT 'pending', executed_at TEXT, created_at TEXT DEFAULT CURRENT_TIMESTAMP, FOREIGN KEY (incident_id) REFERENCES incidents(id) ) """) # System stats table cursor.execute(""" CREATE TABLE IF NOT EXISTS system_stats ( id INTEGER PRIMARY KEY AUTOINCREMENT, component TEXT NOT NULL, metric TEXT NOT NULL, value REAL NOT NULL, timestamp TEXT DEFAULT CURRENT_TIMESTAMP ) """) # Indexes for performance cursor.execute(""" CREATE INDEX IF NOT EXISTS idx_incidents_created ON incidents(created_at DESC) """) cursor.execute(""" CREATE INDEX IF NOT EXISTS idx_incidents_severity ON incidents(severity) """) conn.commit() conn.close() logger.info(f"Database initialized: {self.db_path}") def save_incident(self, incident_data: Dict[str, Any]) -> int: """ Save incident to database Args: incident_data: Incident details Returns: Incident ID """ conn = sqlite3.connect(self.db_path) cursor = conn.cursor() try: cursor.execute(""" INSERT INTO incidents ( timestamp, incident_type, severity, confidence, reasoning, subjects, evidence_path, response_plan, created_at ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?) """, ( incident_data.get('timestamp', datetime.now().isoformat()), incident_data.get('type', 'unknown'), incident_data.get('severity', 'low'), incident_data.get('confidence', 0), incident_data.get('reasoning', ''), json.dumps(incident_data.get('subjects', [])), incident_data.get('evidence_path', ''), json.dumps(incident_data.get('response_plan', [])), incident_data.get('created_at', datetime.now().isoformat()) # <-- fix: allows injecting old timestamp )) incident_id = cursor.lastrowid conn.commit() logger.info(f"Saved incident #{incident_id}") return incident_id except Exception as e: logger.error(f"Failed to save incident: {e}") conn.rollback() return -1 finally: conn.close() def save_action( self, incident_id: int, action_type: str, action_data: Dict[str, Any] ) -> int: """Save executed action""" conn = sqlite3.connect(self.db_path) cursor = conn.cursor() try: cursor.execute(""" INSERT INTO actions ( incident_id, action_type, action_data, status, executed_at ) VALUES (?, ?, ?, ?, ?) """, ( incident_id, action_type, json.dumps(action_data), action_data.get('status', 'completed'), datetime.now().isoformat() )) action_id = cursor.lastrowid conn.commit() return action_id except Exception as e: logger.error(f"Failed to save action: {e}") conn.rollback() return -1 finally: conn.close() def get_recent_incidents( self, limit: int = 50, severity: Optional[str] = None ) -> List[Dict]: """Retrieve recent incidents with optional severity filter""" conn = sqlite3.connect(self.db_path) cursor = conn.cursor() try: query = """ SELECT id, timestamp, incident_type, severity, confidence, reasoning, subjects, evidence_path, status, created_at FROM incidents """ params = [] if severity: query += " WHERE severity = ?" params.append(severity) query += " ORDER BY timestamp DESC LIMIT ?" params.append(limit) cursor.execute(query, params) rows = cursor.fetchall() incidents = [] for row in rows: incidents.append({ 'id': row[0], 'timestamp': row[1], 'type': row[2], 'severity': row[3], 'confidence': row[4], 'reasoning': row[5], 'subjects': json.loads(row[6]) if row[6] else [], 'evidence_path': row[7], 'status': row[8], 'created_at': row[9] }) return incidents except Exception as e: logger.error(f"Failed to retrieve incidents: {e}") return [] finally: conn.close() def get_incident_by_id(self, incident_id: int) -> Optional[Dict]: """Get single incident by ID""" conn = sqlite3.connect(self.db_path) cursor = conn.cursor() try: cursor.execute(""" SELECT * FROM incidents WHERE id = ? """, (incident_id,)) row = cursor.fetchone() if row: return { 'id': row[0], 'timestamp': row[1], 'type': row[2], 'severity': row[3], 'confidence': row[4], 'reasoning': row[5], 'subjects': json.loads(row[6]) if row[6] else [], 'evidence_path': row[7], 'response_plan': json.loads(row[8]) if row[8] else [], 'status': row[9], 'created_at': row[10] } return None except Exception as e: logger.error(f"Failed to get incident: {e}") return None finally: conn.close() def get_statistics(self) -> Dict[str, Any]: """Get system statistics""" conn = sqlite3.connect(self.db_path) cursor = conn.cursor() try: # Total incidents cursor.execute("SELECT COUNT(*) FROM incidents") total_incidents = cursor.fetchone()[0] # Active incidents cursor.execute( "SELECT COUNT(*) FROM incidents WHERE status = 'active'" ) active_incidents = cursor.fetchone()[0] # Severity breakdown cursor.execute(""" SELECT severity, COUNT(*) FROM incidents GROUP BY severity """) severity_breakdown = dict(cursor.fetchall()) # Recent incidents (last 24h) cursor.execute(""" SELECT COUNT(*) FROM incidents WHERE datetime(created_at) > datetime('now', '-1 day') """) recent_incidents = cursor.fetchone()[0] return { 'total_incidents': total_incidents, 'active_incidents': active_incidents, 'severity_breakdown': severity_breakdown, 'recent_24h': recent_incidents } except Exception as e: logger.error(f"Failed to get statistics: {e}") return {} finally: conn.close() def update_incident_status(self, incident_id: int, status: str) -> bool: """Update incident status""" conn = sqlite3.connect(self.db_path) cursor = conn.cursor() try: cursor.execute(""" UPDATE incidents SET status = ? WHERE id = ? """, (status, incident_id)) conn.commit() return cursor.rowcount > 0 except Exception as e: logger.error(f"Failed to update incident: {e}") conn.rollback() return False finally: conn.close() def cleanup_old_incidents(self, days: int = 30) -> int: """Delete incidents older than specified days""" conn = sqlite3.connect(self.db_path) cursor = conn.cursor() try: cutoff_modifier = f'-{days} days' cursor.execute(""" DELETE FROM incidents WHERE datetime(created_at) < datetime('now', ?) """, (cutoff_modifier,)) deleted_count = cursor.rowcount conn.commit() logger.info(f"Cleaned up {deleted_count} old incidents") return deleted_count except Exception as e: logger.error(f"Cleanup failed: {e}") conn.rollback() return 0 finally: conn.close() # Singleton instance db_service = DatabaseService()