- Add pluggable notification system (webhook, Slack, Teams) with retry
- Add alert deduplication: same rule + actor within 15 min = one alert
- Add 10 pre-built admin-ops rule templates seeded on startup:
- Failed Conditional Access, After-Hours Admin Activity
- New Application Registration, Admin Role Assignment
- License Change, Bulk User Deletion
- Device Compliance Failure, Exchange Transport Rule Change
- Service Principal Credential Added, External Sharing Enabled
- Add /api/alerts, /api/alerts/{id}/status, /api/alerts/summary endpoints
- Add alert dashboard to frontend with status filters and ack/resolve buttons
- Add alert summary badge in hero header (high/medium/low counts)
- New env vars: ALERT_WEBHOOK_URL, ALERT_WEBHOOK_FORMAT, ALERT_DEDUPE_MINUTES
68 lines
2.1 KiB
Python
68 lines
2.1 KiB
Python
from datetime import UTC, datetime
|
|
|
|
|
|
def test_matches_equals():
|
|
rule = {"conditions": [{"field": "operation", "op": "eq", "value": "Add user"}]}
|
|
event = {"operation": "Add user", "timestamp": datetime.now(UTC).isoformat()}
|
|
from rules import _matches
|
|
|
|
assert _matches(rule, event) is True
|
|
|
|
|
|
def test_matches_not_equals():
|
|
rule = {"conditions": [{"field": "operation", "op": "neq", "value": "Delete user"}]}
|
|
event = {"operation": "Add user", "timestamp": datetime.now(UTC).isoformat()}
|
|
from rules import _matches
|
|
|
|
assert _matches(rule, event) is True
|
|
|
|
|
|
def test_matches_contains():
|
|
rule = {"conditions": [{"field": "actor_display", "op": "contains", "value": "Admin"}]}
|
|
event = {"actor_display": "Admin (admin@example.com)", "timestamp": datetime.now(UTC).isoformat()}
|
|
from rules import _matches
|
|
|
|
assert _matches(rule, event) is True
|
|
|
|
|
|
def test_matches_after_hours():
|
|
rule = {"conditions": [{"field": "timestamp", "op": "after_hours", "value": None}]}
|
|
event = {"timestamp": "2024-01-01T22:00:00Z"}
|
|
from rules import _matches
|
|
|
|
assert _matches(rule, event) is True
|
|
|
|
event2 = {"timestamp": "2024-01-01T10:00:00Z"}
|
|
assert _matches(rule, event2) is False
|
|
|
|
|
|
def test_evaluate_event_creates_alert(monkeypatch):
|
|
from rules import alerts_collection, evaluate_event
|
|
|
|
monkeypatch.setattr(
|
|
"rules.load_rules",
|
|
lambda: [
|
|
{
|
|
"_id": "r1",
|
|
"name": "Test rule",
|
|
"enabled": True,
|
|
"severity": "high",
|
|
"conditions": [{"field": "operation", "op": "eq", "value": "Add user"}],
|
|
"message": "Alert!",
|
|
}
|
|
],
|
|
)
|
|
|
|
inserted = {}
|
|
|
|
def mock_insert(doc):
|
|
inserted["doc"] = doc
|
|
|
|
monkeypatch.setattr(alerts_collection, "insert_one", mock_insert)
|
|
monkeypatch.setattr(alerts_collection, "count_documents", lambda *args, **kwargs: 0)
|
|
|
|
event = {"id": "e1", "operation": "Add user", "timestamp": datetime.now(UTC).isoformat(), "dedupe_key": "dk1"}
|
|
triggered = evaluate_event(event)
|
|
assert len(triggered) == 1
|
|
assert inserted["doc"]["rule_name"] == "Test rule"
|