All checks were successful
CI / lint-and-test (push) Successful in 21s
- Replace broad service-level hiding with fine-grained operation-level gating
- PRIVACY_SENSITIVE_OPERATIONS config: hide specific operations across ALL services
- PRIVACY_SERVICES still works for broad service-level blocking (optional)
- Users without PRIVACY_SERVICE_ROLES:
* Don't see sensitive operations in /api/filter-options
* Can't query sensitive operations via /api/events or /api/ask
* Get 403 on /api/events/{id}/explain for sensitive events
- Exchange/Teams services remain visible; only privacy ops are hidden
- Update .env.example with new operation-level config docs
581 lines
19 KiB
Python
581 lines
19 KiB
Python
from datetime import UTC, datetime
|
|
|
|
|
|
def test_config_features(client):
|
|
response = client.get("/api/config/features")
|
|
assert response.status_code == 200
|
|
data = response.json()
|
|
assert "ai_features_enabled" in data
|
|
assert isinstance(data["ai_features_enabled"], bool)
|
|
|
|
|
|
def test_ask_disabled_when_ai_features_off():
|
|
import subprocess
|
|
import sys
|
|
|
|
code = """
|
|
import sys
|
|
sys.path.insert(0, '.')
|
|
import os
|
|
os.environ['AI_FEATURES_ENABLED'] = 'false'
|
|
|
|
# Re-import config with the env override
|
|
import importlib
|
|
import config
|
|
importlib.reload(config)
|
|
|
|
# Now import main; it will pick up the new AI_FEATURES_ENABLED
|
|
import main
|
|
ask_paths = [r.path for r in main.app.routes if hasattr(r, 'path') and 'ask' in r.path]
|
|
print('ASK_PATHS:', ask_paths)
|
|
assert len(ask_paths) == 0, f"Expected no ask routes, found: {ask_paths}"
|
|
print('OK')
|
|
"""
|
|
result = subprocess.run([sys.executable, "-c", code], capture_output=True, text=True, cwd=".")
|
|
assert result.returncode == 0, f"Subprocess failed: {result.stdout}\n{result.stderr}"
|
|
assert "OK" in result.stdout
|
|
|
|
|
|
def test_mcp_sse_mount_exists():
|
|
from main import app
|
|
|
|
mcp_mounts = [r for r in app.routes if getattr(r, "path", "") == "/mcp"]
|
|
assert len(mcp_mounts) == 1, "MCP mount not found in app routes"
|
|
|
|
|
|
def test_mcp_messages_no_session(client):
|
|
response = client.post("/mcp/messages/")
|
|
# MCP transport returns 400 when session_id is missing, 404 when session not found
|
|
assert response.status_code in (400, 404)
|
|
|
|
|
|
def test_mcp_sse_auth_required_when_enabled(client, monkeypatch):
|
|
monkeypatch.setattr("routes.mcp.AUTH_ENABLED", True)
|
|
response = client.get("/mcp/sse")
|
|
assert response.status_code == 401
|
|
|
|
|
|
def test_explain_event_not_found(client):
|
|
response = client.post("/api/events/nonexistent/explain")
|
|
assert response.status_code == 404
|
|
|
|
|
|
def test_explain_event_no_llm_key(client, mock_events_collection, monkeypatch):
|
|
monkeypatch.setattr("routes.ask.LLM_API_KEY", "")
|
|
mock_events_collection.insert_one(
|
|
{
|
|
"id": "evt-explain",
|
|
"timestamp": datetime.now(UTC).isoformat(),
|
|
"service": "Directory",
|
|
"operation": "Add user",
|
|
"result": "success",
|
|
"actor_display": "Alice",
|
|
"raw_text": "",
|
|
}
|
|
)
|
|
response = client.post("/api/events/evt-explain/explain")
|
|
assert response.status_code == 200
|
|
data = response.json()
|
|
assert "explanation" in data
|
|
assert data["llm_used"] is False
|
|
assert "LLM_API_KEY" in (data.get("llm_error") or "")
|
|
|
|
|
|
def test_explain_event_with_llm_mock(client, mock_events_collection, monkeypatch):
|
|
monkeypatch.setattr("routes.ask.LLM_API_KEY", "test-key")
|
|
|
|
async def fake_explain(event, related):
|
|
return "This is a test explanation."
|
|
|
|
monkeypatch.setattr("routes.ask._explain_event", fake_explain)
|
|
|
|
mock_events_collection.insert_one(
|
|
{
|
|
"id": "evt-explain2",
|
|
"timestamp": datetime.now(UTC).isoformat(),
|
|
"service": "Directory",
|
|
"operation": "Add user",
|
|
"result": "success",
|
|
"actor_display": "Alice",
|
|
"raw_text": "",
|
|
}
|
|
)
|
|
response = client.post("/api/events/evt-explain2/explain")
|
|
assert response.status_code == 200
|
|
data = response.json()
|
|
assert data["explanation"] == "This is a test explanation."
|
|
assert data["llm_used"] is True
|
|
|
|
|
|
def test_saved_searches_crud(client, monkeypatch):
|
|
monkeypatch.setattr("auth.AUTH_ENABLED", False)
|
|
|
|
# Create
|
|
response = client.post(
|
|
"/api/saved-searches", json={"name": "Test search", "filters": {"actor": "alice", "result": "success"}}
|
|
)
|
|
assert response.status_code == 200
|
|
created = response.json()
|
|
assert created["name"] == "Test search"
|
|
assert created["filters"]["actor"] == "alice"
|
|
search_id = created["id"]
|
|
|
|
# List
|
|
response2 = client.get("/api/saved-searches")
|
|
assert response2.status_code == 200
|
|
items = response2.json()
|
|
assert len(items) == 1
|
|
assert items[0]["name"] == "Test search"
|
|
|
|
# Delete
|
|
response3 = client.delete(f"/api/saved-searches/{search_id}")
|
|
assert response3.status_code == 200
|
|
|
|
# List empty
|
|
response4 = client.get("/api/saved-searches")
|
|
assert response4.status_code == 200
|
|
assert len(response4.json()) == 0
|
|
|
|
|
|
def test_saved_searches_delete_not_found(client, monkeypatch):
|
|
monkeypatch.setattr("auth.AUTH_ENABLED", False)
|
|
response = client.delete("/api/saved-searches/nonexistent")
|
|
assert response.status_code == 404
|
|
|
|
|
|
def test_saved_searches_create_validation(client, monkeypatch):
|
|
monkeypatch.setattr("auth.AUTH_ENABLED", False)
|
|
response = client.post("/api/saved-searches", json={"name": " ", "filters": {}})
|
|
assert response.status_code == 400
|
|
|
|
|
|
def test_privacy_filtering_events_by_operation(client, mock_events_collection, monkeypatch):
|
|
monkeypatch.setattr("config.PRIVACY_SENSITIVE_OPERATIONS", {"MailItemsAccessed", "Send"})
|
|
monkeypatch.setattr("routes.events.PRIVACY_SENSITIVE_OPERATIONS", {"MailItemsAccessed", "Send"})
|
|
monkeypatch.setattr("auth.PRIVACY_SERVICE_ROLES", {"SecurityAdmin"})
|
|
monkeypatch.setattr("auth.user_can_access_privacy_services", lambda claims: False)
|
|
monkeypatch.setattr("routes.events.user_can_access_privacy_services", lambda claims: False)
|
|
|
|
mock_events_collection.insert_one(
|
|
{
|
|
"id": "evt-safe",
|
|
"timestamp": datetime.now(UTC).isoformat(),
|
|
"service": "Exchange",
|
|
"operation": "Add-MailboxPermission",
|
|
"result": "success",
|
|
"actor_display": "Alice",
|
|
"raw_text": "",
|
|
}
|
|
)
|
|
mock_events_collection.insert_one(
|
|
{
|
|
"id": "evt-priv",
|
|
"timestamp": datetime.now(UTC).isoformat(),
|
|
"service": "Exchange",
|
|
"operation": "Send",
|
|
"result": "success",
|
|
"actor_display": "Bob",
|
|
"raw_text": "",
|
|
}
|
|
)
|
|
|
|
response = client.get("/api/events")
|
|
assert response.status_code == 200
|
|
data = response.json()
|
|
ids = [e["id"] for e in data["items"]]
|
|
assert "evt-safe" in ids
|
|
assert "evt-priv" not in ids
|
|
|
|
|
|
def test_privacy_filter_options_shows_service_hides_ops(client, mock_events_collection, monkeypatch):
|
|
monkeypatch.setattr("config.PRIVACY_SENSITIVE_OPERATIONS", {"MailItemsAccessed"})
|
|
monkeypatch.setattr("routes.events.PRIVACY_SENSITIVE_OPERATIONS", {"MailItemsAccessed"})
|
|
monkeypatch.setattr("auth.PRIVACY_SERVICE_ROLES", {"SecurityAdmin"})
|
|
monkeypatch.setattr("auth.user_can_access_privacy_services", lambda claims: False)
|
|
monkeypatch.setattr("routes.events.user_can_access_privacy_services", lambda claims: False)
|
|
|
|
mock_events_collection.insert_one(
|
|
{
|
|
"id": "evt-1",
|
|
"timestamp": datetime.now(UTC).isoformat(),
|
|
"service": "Exchange",
|
|
"operation": "MailItemsAccessed",
|
|
"result": "success",
|
|
"actor_display": "Alice",
|
|
"raw_text": "",
|
|
}
|
|
)
|
|
mock_events_collection.insert_one(
|
|
{
|
|
"id": "evt-2",
|
|
"timestamp": datetime.now(UTC).isoformat(),
|
|
"service": "Exchange",
|
|
"operation": "Add-MailboxPermission",
|
|
"result": "success",
|
|
"actor_display": "Bob",
|
|
"raw_text": "",
|
|
}
|
|
)
|
|
|
|
response = client.get("/api/filter-options")
|
|
assert response.status_code == 200
|
|
data = response.json()
|
|
assert "Exchange" in data["services"]
|
|
assert "MailItemsAccessed" not in data["operations"]
|
|
assert "Add-MailboxPermission" in data["operations"]
|
|
|
|
|
|
def test_privacy_explain_forbidden_by_operation(client, mock_events_collection, monkeypatch):
|
|
monkeypatch.setattr("config.PRIVACY_SENSITIVE_OPERATIONS", {"Send"})
|
|
monkeypatch.setattr("routes.ask.PRIVACY_SENSITIVE_OPERATIONS", {"Send"})
|
|
monkeypatch.setattr("auth.PRIVACY_SERVICE_ROLES", {"SecurityAdmin"})
|
|
monkeypatch.setattr("auth.user_can_access_privacy_services", lambda claims: False)
|
|
monkeypatch.setattr("routes.ask.user_can_access_privacy_services", lambda claims: False)
|
|
|
|
mock_events_collection.insert_one(
|
|
{
|
|
"id": "evt-send",
|
|
"timestamp": datetime.now(UTC).isoformat(),
|
|
"service": "Exchange",
|
|
"operation": "Send",
|
|
"result": "success",
|
|
"actor_display": "Bob",
|
|
"raw_text": "",
|
|
}
|
|
)
|
|
response = client.post("/api/events/evt-send/explain")
|
|
assert response.status_code == 403
|
|
|
|
|
|
def test_health(client):
|
|
response = client.get("/health")
|
|
assert response.status_code == 200
|
|
data = response.json()
|
|
assert data["status"] == "ok"
|
|
assert data["database"] == "connected"
|
|
|
|
|
|
def test_metrics(client):
|
|
response = client.get("/metrics")
|
|
assert response.status_code == 200
|
|
assert "aoc_request_duration_seconds" in response.text
|
|
|
|
|
|
def test_list_events_empty(client):
|
|
response = client.get("/api/events")
|
|
assert response.status_code == 200
|
|
data = response.json()
|
|
assert data["items"] == []
|
|
assert data["next_cursor"] is None
|
|
|
|
|
|
def test_list_events_cursor_pagination(client, mock_events_collection):
|
|
for i in range(5):
|
|
mock_events_collection.insert_one(
|
|
{
|
|
"id": f"evt-{i}",
|
|
"timestamp": datetime.now(UTC).isoformat(),
|
|
"service": "Directory",
|
|
"operation": "Add user",
|
|
"result": "success",
|
|
"actor_display": f"Actor {i}",
|
|
"raw_text": "",
|
|
}
|
|
)
|
|
response = client.get("/api/events?page_size=2")
|
|
assert response.status_code == 200
|
|
data = response.json()
|
|
assert len(data["items"]) == 2
|
|
assert data["next_cursor"] is not None
|
|
|
|
response2 = client.get(f"/api/events?page_size=2&cursor={data['next_cursor']}")
|
|
assert response2.status_code == 200
|
|
data2 = response2.json()
|
|
assert len(data2["items"]) == 2
|
|
assert data2["next_cursor"] is not None
|
|
|
|
|
|
def test_list_events_filter_by_service(client, mock_events_collection):
|
|
mock_events_collection.insert_one(
|
|
{
|
|
"id": "evt-1",
|
|
"timestamp": datetime.now(UTC).isoformat(),
|
|
"service": "Exchange",
|
|
"operation": "Update",
|
|
"result": "success",
|
|
"actor_display": "Alice",
|
|
"raw_text": "",
|
|
}
|
|
)
|
|
mock_events_collection.insert_one(
|
|
{
|
|
"id": "evt-2",
|
|
"timestamp": datetime.now(UTC).isoformat(),
|
|
"service": "Directory",
|
|
"operation": "Add",
|
|
"result": "success",
|
|
"actor_display": "Bob",
|
|
"raw_text": "",
|
|
}
|
|
)
|
|
response = client.get("/api/events?service=Exchange")
|
|
assert response.status_code == 200
|
|
data = response.json()
|
|
assert len(data["items"]) == 1
|
|
assert data["items"][0]["service"] == "Exchange"
|
|
|
|
|
|
def test_list_events_filter_by_services(client, mock_events_collection):
|
|
mock_events_collection.insert_one(
|
|
{
|
|
"id": "evt-1",
|
|
"timestamp": datetime.now(UTC).isoformat(),
|
|
"service": "Exchange",
|
|
"operation": "Update",
|
|
"result": "success",
|
|
"actor_display": "Alice",
|
|
"raw_text": "",
|
|
}
|
|
)
|
|
mock_events_collection.insert_one(
|
|
{
|
|
"id": "evt-2",
|
|
"timestamp": datetime.now(UTC).isoformat(),
|
|
"service": "Directory",
|
|
"operation": "Add",
|
|
"result": "success",
|
|
"actor_display": "Bob",
|
|
"raw_text": "",
|
|
}
|
|
)
|
|
mock_events_collection.insert_one(
|
|
{
|
|
"id": "evt-3",
|
|
"timestamp": datetime.now(UTC).isoformat(),
|
|
"service": "Teams",
|
|
"operation": "Delete",
|
|
"result": "success",
|
|
"actor_display": "Charlie",
|
|
"raw_text": "",
|
|
}
|
|
)
|
|
response = client.get("/api/events?services=Exchange&services=Directory")
|
|
assert response.status_code == 200
|
|
data = response.json()
|
|
assert len(data["items"]) == 2
|
|
returned_services = {item["service"] for item in data["items"]}
|
|
assert returned_services == {"Exchange", "Directory"}
|
|
|
|
|
|
def test_list_events_page_size_validation(client):
|
|
response = client.get("/api/events?page_size=0")
|
|
assert response.status_code == 422
|
|
response = client.get("/api/events?page_size=501")
|
|
assert response.status_code == 422
|
|
|
|
|
|
def test_filter_options(client, mock_events_collection):
|
|
mock_events_collection.insert_one(
|
|
{
|
|
"id": "evt-1",
|
|
"timestamp": datetime.now(UTC).isoformat(),
|
|
"service": "Intune",
|
|
"operation": "Assign",
|
|
"result": "failure",
|
|
"actor_display": "Charlie",
|
|
"actor_upn": "charlie@example.com",
|
|
"raw_text": "",
|
|
}
|
|
)
|
|
response = client.get("/api/filter-options")
|
|
assert response.status_code == 200
|
|
data = response.json()
|
|
assert "Intune" in data["services"]
|
|
assert "Assign" in data["operations"]
|
|
assert "failure" in data["results"]
|
|
assert "Charlie" in data["actors"]
|
|
assert "charlie@example.com" in data["actor_upns"]
|
|
|
|
|
|
def test_fetch_audit_logs_validation(client):
|
|
response = client.get("/api/fetch-audit-logs?hours=0")
|
|
assert response.status_code == 422
|
|
response = client.get("/api/fetch-audit-logs?hours=721")
|
|
assert response.status_code == 422
|
|
|
|
|
|
def test_graph_webhook_validation(client):
|
|
token = "test-validation-token-123"
|
|
response = client.post("/api/webhooks/graph?validationToken=" + token)
|
|
assert response.status_code == 200
|
|
assert response.text == token
|
|
assert response.headers["content-type"] == "text/plain; charset=utf-8"
|
|
|
|
|
|
def test_graph_webhook_notification(client):
|
|
payload = {
|
|
"value": [
|
|
{
|
|
"changeType": "updated",
|
|
"resource": "auditLogs/directoryAudits",
|
|
"clientState": "secret",
|
|
}
|
|
]
|
|
}
|
|
response = client.post("/api/webhooks/graph", json=payload)
|
|
assert response.status_code == 200
|
|
assert response.json()["status"] == "accepted"
|
|
|
|
|
|
def test_update_tags(client, mock_events_collection):
|
|
mock_events_collection.insert_one(
|
|
{
|
|
"id": "evt-tags",
|
|
"timestamp": datetime.now(UTC).isoformat(),
|
|
"service": "Directory",
|
|
"operation": "Add user",
|
|
"result": "success",
|
|
"actor_display": "Alice",
|
|
"raw_text": "",
|
|
}
|
|
)
|
|
response = client.patch("/api/events/evt-tags/tags", json={"tags": ["investigating", "urgent"]})
|
|
assert response.status_code == 200
|
|
assert response.json()["tags"] == ["investigating", "urgent"]
|
|
doc = mock_events_collection.find_one({"id": "evt-tags"})
|
|
assert doc["tags"] == ["investigating", "urgent"]
|
|
|
|
|
|
def test_add_comment(client, mock_events_collection):
|
|
mock_events_collection.insert_one(
|
|
{
|
|
"id": "evt-comment",
|
|
"timestamp": datetime.now(UTC).isoformat(),
|
|
"service": "Directory",
|
|
"operation": "Add user",
|
|
"result": "success",
|
|
"actor_display": "Alice",
|
|
"raw_text": "",
|
|
}
|
|
)
|
|
response = client.post("/api/events/evt-comment/comments", json={"text": "Looks suspicious"})
|
|
assert response.status_code == 200
|
|
data = response.json()
|
|
assert data["text"] == "Looks suspicious"
|
|
doc = mock_events_collection.find_one({"id": "evt-comment"})
|
|
assert len(doc["comments"]) == 1
|
|
assert doc["comments"][0]["text"] == "Looks suspicious"
|
|
|
|
|
|
def test_source_health(client, mock_watermarks_collection):
|
|
mock_watermarks_collection.insert_one({"source": "directory", "last_fetch_time": "2024-01-01T00:00:00Z"})
|
|
response = client.get("/api/source-health")
|
|
assert response.status_code == 200
|
|
data = response.json()
|
|
directory = next((x for x in data if x["source"] == "directory"), None)
|
|
assert directory["status"] == "healthy"
|
|
assert directory["last_fetch_time"] == "2024-01-01T00:00:00Z"
|
|
|
|
|
|
def test_rules_crud(client):
|
|
rule = {
|
|
"name": "After-hours admin",
|
|
"enabled": True,
|
|
"severity": "high",
|
|
"conditions": [{"field": "operation", "op": "eq", "value": "Add user"}],
|
|
"message": "Admin action outside business hours",
|
|
}
|
|
res = client.post("/api/rules", json=rule)
|
|
assert res.status_code == 200
|
|
created = res.json()
|
|
assert created["name"] == "After-hours admin"
|
|
|
|
res2 = client.get("/api/rules")
|
|
assert res2.status_code == 200
|
|
assert len(res2.json()) == 1
|
|
|
|
updated = {**rule, "name": "After-hours admin updated"}
|
|
res3 = client.put(f"/api/rules/{created['id']}", json=updated)
|
|
assert res3.status_code == 200
|
|
assert res3.json()["name"] == "After-hours admin updated"
|
|
|
|
res4 = client.delete(f"/api/rules/{created['id']}")
|
|
assert res4.status_code == 200
|
|
|
|
res5 = client.get("/api/rules")
|
|
assert res5.status_code == 200
|
|
assert len(res5.json()) == 0
|
|
|
|
|
|
def test_list_events_filter_by_include_tags(client, mock_events_collection):
|
|
mock_events_collection.insert_one(
|
|
{
|
|
"id": "evt-tagged",
|
|
"timestamp": datetime.now(UTC).isoformat(),
|
|
"service": "Directory",
|
|
"operation": "Add user",
|
|
"result": "success",
|
|
"actor_display": "Alice",
|
|
"raw_text": "",
|
|
"tags": ["backup", "auto"],
|
|
}
|
|
)
|
|
mock_events_collection.insert_one(
|
|
{
|
|
"id": "evt-untagged",
|
|
"timestamp": datetime.now(UTC).isoformat(),
|
|
"service": "Directory",
|
|
"operation": "Remove user",
|
|
"result": "success",
|
|
"actor_display": "Bob",
|
|
"raw_text": "",
|
|
"tags": [],
|
|
}
|
|
)
|
|
response = client.get("/api/events?include_tags=backup")
|
|
assert response.status_code == 200
|
|
data = response.json()
|
|
assert len(data["items"]) == 1
|
|
assert data["items"][0]["id"] == "evt-tagged"
|
|
|
|
|
|
def test_bulk_tags_append(client, mock_events_collection):
|
|
mock_events_collection.insert_one(
|
|
{
|
|
"id": "evt-bulk",
|
|
"timestamp": datetime.now(UTC).isoformat(),
|
|
"service": "Exchange",
|
|
"operation": "Update",
|
|
"result": "success",
|
|
"actor_display": "Alice",
|
|
"raw_text": "",
|
|
"tags": ["existing"],
|
|
}
|
|
)
|
|
response = client.post("/api/events/bulk-tags?service=Exchange", json={"tags": ["backup"], "mode": "append"})
|
|
assert response.status_code == 200
|
|
data = response.json()
|
|
assert data["matched"] == 1
|
|
doc = mock_events_collection.find_one({"id": "evt-bulk"})
|
|
assert "backup" in doc["tags"]
|
|
assert "existing" in doc["tags"]
|
|
|
|
|
|
def test_bulk_tags_replace(client, mock_events_collection):
|
|
mock_events_collection.insert_one(
|
|
{
|
|
"id": "evt-bulk2",
|
|
"timestamp": datetime.now(UTC).isoformat(),
|
|
"service": "Exchange",
|
|
"operation": "Update",
|
|
"result": "success",
|
|
"actor_display": "Alice",
|
|
"raw_text": "",
|
|
"tags": ["old"],
|
|
}
|
|
)
|
|
response = client.post("/api/events/bulk-tags?service=Exchange", json={"tags": ["backup"], "mode": "replace"})
|
|
assert response.status_code == 200
|
|
doc = mock_events_collection.find_one({"id": "evt-bulk2"})
|
|
assert doc["tags"] == ["backup"]
|