Files
aoc/backend/tests/test_api.py
Tomas Kracmar b35cac42e0
Some checks failed
CI / lint-and-test (push) Has been cancelled
feat: implement Phase 4 enhancements
- Migrate frontend to Alpine.js for reactive state management
- Add source health dashboard in UI and /api/source-health endpoint
- Add event tagging (PATCH /api/events/{id}/tags) and commenting (POST /api/events/{id}/comments)
- Add CSV/JSON export from the UI
- Add rule-based alerting engine (rules.py) with CRUD endpoints (/api/rules)
- Add SIEM export via webhook (siem.py)
- Add AOC audit trail middleware logging all mutations to aoc_audit collection
- Update config with SIEM_ENABLED, SIEM_WEBHOOK_URL, ALERTS_ENABLED
- Add tests for rules engine, tags, comments, and source health
2026-04-14 15:38:39 +02:00

208 lines
6.8 KiB
Python

from datetime import UTC, datetime
def test_health(client):
response = client.get("/health")
assert response.status_code == 200
data = response.json()
assert data["status"] == "ok"
assert data["database"] == "connected"
def test_metrics(client):
response = client.get("/metrics")
assert response.status_code == 200
assert "aoc_request_duration_seconds" in response.text
def test_list_events_empty(client):
response = client.get("/api/events")
assert response.status_code == 200
data = response.json()
assert data["items"] == []
assert data["next_cursor"] is None
def test_list_events_cursor_pagination(client, mock_events_collection):
for i in range(5):
mock_events_collection.insert_one({
"id": f"evt-{i}",
"timestamp": datetime.now(UTC).isoformat(),
"service": "Directory",
"operation": "Add user",
"result": "success",
"actor_display": f"Actor {i}",
"raw_text": "",
})
response = client.get("/api/events?page_size=2")
assert response.status_code == 200
data = response.json()
assert len(data["items"]) == 2
assert data["next_cursor"] is not None
response2 = client.get(f"/api/events?page_size=2&cursor={data['next_cursor']}")
assert response2.status_code == 200
data2 = response2.json()
assert len(data2["items"]) == 2
assert data2["next_cursor"] is not None
def test_list_events_filter_by_service(client, mock_events_collection):
mock_events_collection.insert_one({
"id": "evt-1",
"timestamp": datetime.now(UTC).isoformat(),
"service": "Exchange",
"operation": "Update",
"result": "success",
"actor_display": "Alice",
"raw_text": "",
})
mock_events_collection.insert_one({
"id": "evt-2",
"timestamp": datetime.now(UTC).isoformat(),
"service": "Directory",
"operation": "Add",
"result": "success",
"actor_display": "Bob",
"raw_text": "",
})
response = client.get("/api/events?service=Exchange")
assert response.status_code == 200
data = response.json()
assert len(data["items"]) == 1
assert data["items"][0]["service"] == "Exchange"
def test_list_events_page_size_validation(client):
response = client.get("/api/events?page_size=0")
assert response.status_code == 422
response = client.get("/api/events?page_size=501")
assert response.status_code == 422
def test_filter_options(client, mock_events_collection):
mock_events_collection.insert_one({
"id": "evt-1",
"timestamp": datetime.now(UTC).isoformat(),
"service": "Intune",
"operation": "Assign",
"result": "failure",
"actor_display": "Charlie",
"actor_upn": "charlie@example.com",
"raw_text": "",
})
response = client.get("/api/filter-options")
assert response.status_code == 200
data = response.json()
assert "Intune" in data["services"]
assert "Assign" in data["operations"]
assert "failure" in data["results"]
assert "Charlie" in data["actors"]
assert "charlie@example.com" in data["actor_upns"]
def test_fetch_audit_logs_validation(client):
response = client.get("/api/fetch-audit-logs?hours=0")
assert response.status_code == 422
response = client.get("/api/fetch-audit-logs?hours=721")
assert response.status_code == 422
def test_graph_webhook_validation(client):
token = "test-validation-token-123"
response = client.post("/api/webhooks/graph?validationToken=" + token)
assert response.status_code == 200
assert response.text == token
assert response.headers["content-type"] == "text/plain; charset=utf-8"
def test_graph_webhook_notification(client):
payload = {
"value": [
{
"changeType": "updated",
"resource": "auditLogs/directoryAudits",
"clientState": "secret",
}
]
}
response = client.post("/api/webhooks/graph", json=payload)
assert response.status_code == 200
assert response.json()["status"] == "accepted"
def test_update_tags(client, mock_events_collection):
mock_events_collection.insert_one({
"id": "evt-tags",
"timestamp": datetime.now(UTC).isoformat(),
"service": "Directory",
"operation": "Add user",
"result": "success",
"actor_display": "Alice",
"raw_text": "",
})
response = client.patch("/api/events/evt-tags/tags", json={"tags": ["investigating", "urgent"]})
assert response.status_code == 200
assert response.json()["tags"] == ["investigating", "urgent"]
doc = mock_events_collection.find_one({"id": "evt-tags"})
assert doc["tags"] == ["investigating", "urgent"]
def test_add_comment(client, mock_events_collection):
mock_events_collection.insert_one({
"id": "evt-comment",
"timestamp": datetime.now(UTC).isoformat(),
"service": "Directory",
"operation": "Add user",
"result": "success",
"actor_display": "Alice",
"raw_text": "",
})
response = client.post("/api/events/evt-comment/comments", json={"text": "Looks suspicious"})
assert response.status_code == 200
data = response.json()
assert data["text"] == "Looks suspicious"
doc = mock_events_collection.find_one({"id": "evt-comment"})
assert len(doc["comments"]) == 1
assert doc["comments"][0]["text"] == "Looks suspicious"
def test_source_health(client, mock_watermarks_collection):
mock_watermarks_collection.insert_one({"source": "directory", "last_fetch_time": "2024-01-01T00:00:00Z"})
response = client.get("/api/source-health")
assert response.status_code == 200
data = response.json()
directory = next((x for x in data if x["source"] == "directory"), None)
assert directory["status"] == "healthy"
assert directory["last_fetch_time"] == "2024-01-01T00:00:00Z"
def test_rules_crud(client):
rule = {
"name": "After-hours admin",
"enabled": True,
"severity": "high",
"conditions": [{"field": "operation", "op": "eq", "value": "Add user"}],
"message": "Admin action outside business hours",
}
res = client.post("/api/rules", json=rule)
assert res.status_code == 200
created = res.json()
assert created["name"] == "After-hours admin"
res2 = client.get("/api/rules")
assert res2.status_code == 200
assert len(res2.json()) == 1
updated = {**rule, "name": "After-hours admin updated"}
res3 = client.put(f"/api/rules/{created['id']}", json=updated)
assert res3.status_code == 200
assert res3.json()["name"] == "After-hours admin updated"
res4 = client.delete(f"/api/rules/{created['id']}")
assert res4.status_code == 200
res5 = client.get("/api/rules")
assert res5.status_code == 200
assert len(res5.json()) == 0