Some checks failed
CI / lint-and-test (push) Has been cancelled
- Migrate frontend to Alpine.js for reactive state management
- Add source health dashboard in UI and /api/source-health endpoint
- Add event tagging (PATCH /api/events/{id}/tags) and commenting (POST /api/events/{id}/comments)
- Add CSV/JSON export from the UI
- Add rule-based alerting engine (rules.py) with CRUD endpoints (/api/rules)
- Add SIEM export via webhook (siem.py)
- Add AOC audit trail middleware logging all mutations to aoc_audit collection
- Update config with SIEM_ENABLED, SIEM_WEBHOOK_URL, ALERTS_ENABLED
- Add tests for rules engine, tags, comments, and source health
50 lines
1.8 KiB
Python
50 lines
1.8 KiB
Python
from datetime import UTC, datetime
|
|
|
|
from rules import _matches, evaluate_event
|
|
|
|
|
|
def test_matches_equals():
|
|
rule = {"conditions": [{"field": "operation", "op": "eq", "value": "Add user"}]}
|
|
event = {"operation": "Add user", "timestamp": datetime.now(UTC).isoformat()}
|
|
assert _matches(rule, event) is True
|
|
|
|
|
|
def test_matches_not_equals():
|
|
rule = {"conditions": [{"field": "operation", "op": "neq", "value": "Delete user"}]}
|
|
event = {"operation": "Add user", "timestamp": datetime.now(UTC).isoformat()}
|
|
assert _matches(rule, event) is True
|
|
|
|
|
|
def test_matches_contains():
|
|
rule = {"conditions": [{"field": "actor_display", "op": "contains", "value": "Admin"}]}
|
|
event = {"actor_display": "Admin (admin@example.com)", "timestamp": datetime.now(UTC).isoformat()}
|
|
assert _matches(rule, event) is True
|
|
|
|
|
|
def test_matches_after_hours():
|
|
rule = {"conditions": [{"field": "timestamp", "op": "after_hours", "value": None}]}
|
|
event = {"timestamp": "2024-01-01T22:00:00Z"}
|
|
assert _matches(rule, event) is True
|
|
|
|
event2 = {"timestamp": "2024-01-01T10:00:00Z"}
|
|
assert _matches(rule, event2) is False
|
|
|
|
|
|
def test_evaluate_event_creates_alert(monkeypatch):
|
|
from rules import alerts_collection
|
|
|
|
monkeypatch.setattr("rules.load_rules", lambda: [
|
|
{"_id": "r1", "name": "Test rule", "enabled": True, "severity": "high", "conditions": [{"field": "operation", "op": "eq", "value": "Add user"}], "message": "Alert!"}
|
|
])
|
|
|
|
inserted = {}
|
|
def mock_insert(doc):
|
|
inserted["doc"] = doc
|
|
|
|
monkeypatch.setattr(alerts_collection, "insert_one", mock_insert)
|
|
|
|
event = {"id": "e1", "operation": "Add user", "timestamp": datetime.now(UTC).isoformat(), "dedupe_key": "dk1"}
|
|
triggered = evaluate_event(event)
|
|
assert len(triggered) == 1
|
|
assert inserted["doc"]["rule_name"] == "Test rule"
|