from datetime import UTC, datetime import structlog from database import db logger = structlog.get_logger("aoc.rules") rules_collection = db["alert_rules"] alerts_collection = db["alerts"] def load_rules() -> list[dict]: return list(rules_collection.find({"enabled": True})) def evaluate_event(event: dict) -> list[dict]: """Evaluate a normalized event against stored alert rules.""" triggered = [] rules = load_rules() for rule in rules: if _matches(rule, event): triggered.append(rule) _create_alert(rule, event) return triggered def _matches(rule: dict, event: dict) -> bool: conditions = rule.get("conditions", []) if not conditions: return False for cond in conditions: field = cond.get("field") op = cond.get("op", "eq") value = cond.get("value") event_value = _get_nested(event, field) if op == "eq" and event_value != value: return False if op == "neq" and event_value == value: return False if op == "contains" and (not isinstance(event_value, str) or value not in event_value): return False if op == "in" and event_value not in (value if isinstance(value, list) else [value]): return False if op == "after_hours": try: ts = datetime.fromisoformat(event.get("timestamp", "").replace("Z", "+00:00")) hour = ts.hour if 9 <= hour < 17: return False except Exception: return False return True def _get_nested(obj: dict, path: str): parts = path.split(".") val = obj for p in parts: if isinstance(val, dict): val = val.get(p) else: return None return val def _create_alert(rule: dict, event: dict): alert = { "timestamp": datetime.now(UTC).isoformat(), "rule_id": str(rule.get("_id")), "rule_name": rule.get("name", "Unnamed rule"), "severity": rule.get("severity", "medium"), "event_id": event.get("id"), "event_dedupe_key": event.get("dedupe_key"), "message": rule.get("message", f"Rule '{rule.get('name')}' triggered"), } try: alerts_collection.insert_one(alert) logger.info("Alert created", rule=rule.get("name"), event_id=event.get("id")) except Exception as exc: logger.warning("Failed to create alert", error=str(exc))