Files
aoc/backend/siem.py
Tomas Kracmar 8d951fc335
All checks were successful
CI / lint-and-test (push) Successful in 22s
Release / build-and-push (push) Successful in 1m7s
v1.7.14: LLM/SIEM domain allowlists, SRI hashes, auth misconfig warning, Azure Key Vault integration
2026-04-27 16:45:06 +02:00

46 lines
1.8 KiB
Python

import ipaddress
import requests
import structlog
from config import SIEM_ALLOWED_DOMAINS, SIEM_ENABLED, SIEM_WEBHOOK_URL
logger = structlog.get_logger("aoc.siem")
def _validate_siem_url(url: str):
"""Prevent SSRF by rejecting internal/reserved addresses and enforcing domain allowlist."""
from urllib.parse import urlparse
parsed = urlparse(url)
if parsed.scheme != "https":
raise RuntimeError("SIEM_WEBHOOK_URL must use HTTPS")
hostname = (parsed.hostname or "").lower()
if not hostname:
raise RuntimeError("SIEM_WEBHOOK_URL must have a valid hostname")
blocked = {"localhost", "127.0.0.1", "0.0.0.0", "::1", "169.254.169.254"}
if hostname in blocked:
raise RuntimeError(f"SIEM_WEBHOOK_URL hostname '{hostname}' is not allowed")
try:
ip = ipaddress.ip_address(hostname)
if ip.is_private or ip.is_loopback or ip.is_link_local or ip.is_reserved:
raise RuntimeError(f"SIEM_WEBHOOK_URL IP '{hostname}' is not allowed")
except ValueError:
pass
if SIEM_ALLOWED_DOMAINS:
allowed = any(hostname == d or (d.startswith("*.") and hostname.endswith(d[1:])) for d in SIEM_ALLOWED_DOMAINS)
if not allowed:
raise RuntimeError(f"SIEM_WEBHOOK_URL domain '{hostname}' is not in SIEM_ALLOWED_DOMAINS")
def forward_event(event: dict):
"""Forward a normalized event to the configured SIEM webhook."""
if not SIEM_ENABLED or not SIEM_WEBHOOK_URL:
return
try:
_validate_siem_url(SIEM_WEBHOOK_URL)
res = requests.post(SIEM_WEBHOOK_URL, json=event, timeout=10)
res.raise_for_status()
logger.debug("Event forwarded to SIEM", event_id=event.get("id"))
except Exception as exc:
logger.warning("SIEM forward failed", error=str(exc))