from datetime import UTC, datetime def test_config_features(client): response = client.get("/api/config/features") assert response.status_code == 200 data = response.json() assert "ai_features_enabled" in data assert isinstance(data["ai_features_enabled"], bool) def test_ask_disabled_when_ai_features_off(): import subprocess import sys code = """ import sys sys.path.insert(0, '.') import os os.environ['AI_FEATURES_ENABLED'] = 'false' # Re-import config with the env override import importlib import config importlib.reload(config) # Now import main; it will pick up the new AI_FEATURES_ENABLED import main ask_paths = [r.path for r in main.app.routes if hasattr(r, 'path') and 'ask' in r.path] print('ASK_PATHS:', ask_paths) assert len(ask_paths) == 0, f"Expected no ask routes, found: {ask_paths}" print('OK') """ result = subprocess.run([sys.executable, "-c", code], capture_output=True, text=True, cwd=".") assert result.returncode == 0, f"Subprocess failed: {result.stdout}\n{result.stderr}" assert "OK" in result.stdout def test_mcp_sse_mount_exists(): from main import app mcp_mounts = [r for r in app.routes if getattr(r, "path", "") == "/mcp"] assert len(mcp_mounts) == 1, "MCP mount not found in app routes" def test_mcp_messages_no_session(client): response = client.post("/mcp/messages/") # MCP transport returns 400 when session_id is missing, 404 when session not found assert response.status_code in (400, 404) def test_mcp_sse_auth_required_when_enabled(client, monkeypatch): monkeypatch.setattr("routes.mcp.AUTH_ENABLED", True) response = client.get("/mcp/sse") assert response.status_code == 401 def test_explain_event_not_found(client): response = client.post("/api/events/nonexistent/explain") assert response.status_code == 404 def test_explain_event_no_llm_key(client, mock_events_collection, monkeypatch): monkeypatch.setattr("routes.ask.LLM_API_KEY", "") mock_events_collection.insert_one( { "id": "evt-explain", "timestamp": datetime.now(UTC).isoformat(), "service": "Directory", "operation": "Add user", "result": "success", "actor_display": "Alice", "raw_text": "", } ) response = client.post("/api/events/evt-explain/explain") assert response.status_code == 200 data = response.json() assert "explanation" in data assert data["llm_used"] is False assert "LLM_API_KEY" in (data.get("llm_error") or "") def test_explain_event_with_llm_mock(client, mock_events_collection, monkeypatch): monkeypatch.setattr("routes.ask.LLM_API_KEY", "test-key") async def fake_explain(event, related): return "This is a test explanation." monkeypatch.setattr("routes.ask._explain_event", fake_explain) class FakeRedis: async def get(self, key): return None async def setex(self, key, ttl, value): pass async def fake_get_arq_pool(): return FakeRedis() monkeypatch.setattr("routes.ask.get_arq_pool", fake_get_arq_pool) mock_events_collection.insert_one( { "id": "evt-explain2", "timestamp": datetime.now(UTC).isoformat(), "service": "Directory", "operation": "Add user", "result": "success", "actor_display": "Alice", "raw_text": "", } ) response = client.post("/api/events/evt-explain2/explain") assert response.status_code == 200 data = response.json() assert data["explanation"] == "This is a test explanation." assert data["llm_used"] is True def test_saved_searches_crud(client, monkeypatch): monkeypatch.setattr("auth.AUTH_ENABLED", False) # Create response = client.post( "/api/saved-searches", json={"name": "Test search", "filters": {"actor": "alice", "result": "success"}} ) assert response.status_code == 200 created = response.json() assert created["name"] == "Test search" assert created["filters"]["actor"] == "alice" search_id = created["id"] # List response2 = client.get("/api/saved-searches") assert response2.status_code == 200 items = response2.json() assert len(items) == 1 assert items[0]["name"] == "Test search" # Delete response3 = client.delete(f"/api/saved-searches/{search_id}") assert response3.status_code == 200 # List empty response4 = client.get("/api/saved-searches") assert response4.status_code == 200 assert len(response4.json()) == 0 def test_saved_searches_delete_not_found(client, monkeypatch): monkeypatch.setattr("auth.AUTH_ENABLED", False) response = client.delete("/api/saved-searches/nonexistent") assert response.status_code == 404 def test_saved_searches_create_validation(client, monkeypatch): monkeypatch.setattr("auth.AUTH_ENABLED", False) response = client.post("/api/saved-searches", json={"name": " ", "filters": {}}) assert response.status_code == 400 def test_privacy_filtering_events_by_operation(client, mock_events_collection, monkeypatch): monkeypatch.setattr("config.PRIVACY_SENSITIVE_OPERATIONS", {"MailItemsAccessed", "Send"}) monkeypatch.setattr("routes.events.PRIVACY_SENSITIVE_OPERATIONS", {"MailItemsAccessed", "Send"}) monkeypatch.setattr("auth.PRIVACY_SERVICE_ROLES", {"SecurityAdmin"}) monkeypatch.setattr("auth.user_can_access_privacy_services", lambda claims: False) monkeypatch.setattr("routes.events.user_can_access_privacy_services", lambda claims: False) mock_events_collection.insert_one( { "id": "evt-safe", "timestamp": datetime.now(UTC).isoformat(), "service": "Exchange", "operation": "Add-MailboxPermission", "result": "success", "actor_display": "Alice", "raw_text": "", } ) mock_events_collection.insert_one( { "id": "evt-priv", "timestamp": datetime.now(UTC).isoformat(), "service": "Exchange", "operation": "Send", "result": "success", "actor_display": "Bob", "raw_text": "", } ) response = client.get("/api/events") assert response.status_code == 200 data = response.json() ids = [e["id"] for e in data["items"]] assert "evt-safe" in ids assert "evt-priv" not in ids def test_privacy_filter_options_shows_service_hides_ops(client, mock_events_collection, monkeypatch): monkeypatch.setattr("config.PRIVACY_SENSITIVE_OPERATIONS", {"MailItemsAccessed"}) monkeypatch.setattr("routes.events.PRIVACY_SENSITIVE_OPERATIONS", {"MailItemsAccessed"}) monkeypatch.setattr("auth.PRIVACY_SERVICE_ROLES", {"SecurityAdmin"}) monkeypatch.setattr("auth.user_can_access_privacy_services", lambda claims: False) monkeypatch.setattr("routes.events.user_can_access_privacy_services", lambda claims: False) mock_events_collection.insert_one( { "id": "evt-1", "timestamp": datetime.now(UTC).isoformat(), "service": "Exchange", "operation": "MailItemsAccessed", "result": "success", "actor_display": "Alice", "raw_text": "", } ) mock_events_collection.insert_one( { "id": "evt-2", "timestamp": datetime.now(UTC).isoformat(), "service": "Exchange", "operation": "Add-MailboxPermission", "result": "success", "actor_display": "Bob", "raw_text": "", } ) response = client.get("/api/filter-options") assert response.status_code == 200 data = response.json() assert "Exchange" in data["services"] assert "MailItemsAccessed" not in data["operations"] assert "Add-MailboxPermission" in data["operations"] def test_privacy_explain_forbidden_by_operation(client, mock_events_collection, monkeypatch): monkeypatch.setattr("config.PRIVACY_SENSITIVE_OPERATIONS", {"Send"}) monkeypatch.setattr("routes.ask.PRIVACY_SENSITIVE_OPERATIONS", {"Send"}) monkeypatch.setattr("auth.PRIVACY_SERVICE_ROLES", {"SecurityAdmin"}) monkeypatch.setattr("auth.user_can_access_privacy_services", lambda claims: False) monkeypatch.setattr("routes.ask.user_can_access_privacy_services", lambda claims: False) mock_events_collection.insert_one( { "id": "evt-send", "timestamp": datetime.now(UTC).isoformat(), "service": "Exchange", "operation": "Send", "result": "success", "actor_display": "Bob", "raw_text": "", } ) response = client.post("/api/events/evt-send/explain") assert response.status_code == 403 def test_health(client): response = client.get("/health") assert response.status_code == 200 data = response.json() assert data["status"] == "ok" assert data["database"] == "connected" def test_metrics(client): response = client.get("/metrics", headers={"X-Forwarded-For": "127.0.0.1"}) assert response.status_code == 200 assert "aoc_request_duration_seconds" in response.text def test_list_events_empty(client): response = client.get("/api/events") assert response.status_code == 200 data = response.json() assert data["items"] == [] assert data["next_cursor"] is None def test_list_events_cursor_pagination(client, mock_events_collection): for i in range(5): mock_events_collection.insert_one( { "id": f"evt-{i}", "timestamp": datetime.now(UTC).isoformat(), "service": "Directory", "operation": "Add user", "result": "success", "actor_display": f"Actor {i}", "raw_text": "", } ) response = client.get("/api/events?page_size=2") assert response.status_code == 200 data = response.json() assert len(data["items"]) == 2 assert data["next_cursor"] is not None response2 = client.get(f"/api/events?page_size=2&cursor={data['next_cursor']}") assert response2.status_code == 200 data2 = response2.json() assert len(data2["items"]) == 2 assert data2["next_cursor"] is not None def test_list_events_filter_by_service(client, mock_events_collection): mock_events_collection.insert_one( { "id": "evt-1", "timestamp": datetime.now(UTC).isoformat(), "service": "Exchange", "operation": "Update", "result": "success", "actor_display": "Alice", "raw_text": "", } ) mock_events_collection.insert_one( { "id": "evt-2", "timestamp": datetime.now(UTC).isoformat(), "service": "Directory", "operation": "Add", "result": "success", "actor_display": "Bob", "raw_text": "", } ) response = client.get("/api/events?service=Exchange") assert response.status_code == 200 data = response.json() assert len(data["items"]) == 1 assert data["items"][0]["service"] == "Exchange" def test_list_events_filter_by_services(client, mock_events_collection): mock_events_collection.insert_one( { "id": "evt-1", "timestamp": datetime.now(UTC).isoformat(), "service": "Exchange", "operation": "Update", "result": "success", "actor_display": "Alice", "raw_text": "", } ) mock_events_collection.insert_one( { "id": "evt-2", "timestamp": datetime.now(UTC).isoformat(), "service": "Directory", "operation": "Add", "result": "success", "actor_display": "Bob", "raw_text": "", } ) mock_events_collection.insert_one( { "id": "evt-3", "timestamp": datetime.now(UTC).isoformat(), "service": "Teams", "operation": "Delete", "result": "success", "actor_display": "Charlie", "raw_text": "", } ) response = client.get("/api/events?services=Exchange&services=Directory") assert response.status_code == 200 data = response.json() assert len(data["items"]) == 2 returned_services = {item["service"] for item in data["items"]} assert returned_services == {"Exchange", "Directory"} def test_list_events_page_size_validation(client): response = client.get("/api/events?page_size=0") assert response.status_code == 422 response = client.get("/api/events?page_size=501") assert response.status_code == 422 def test_filter_options(client, mock_events_collection): mock_events_collection.insert_one( { "id": "evt-1", "timestamp": datetime.now(UTC).isoformat(), "service": "Intune", "operation": "Assign", "result": "failure", "actor_display": "Charlie", "actor_upn": "charlie@example.com", "raw_text": "", } ) response = client.get("/api/filter-options") assert response.status_code == 200 data = response.json() assert "Intune" in data["services"] assert "Assign" in data["operations"] assert "failure" in data["results"] assert "Charlie" in data["actors"] assert "charlie@example.com" in data["actor_upns"] def test_fetch_audit_logs_validation(client): response = client.get("/api/fetch-audit-logs?hours=0") assert response.status_code == 422 response = client.get("/api/fetch-audit-logs?hours=721") assert response.status_code == 422 def test_graph_webhook_validation(client): token = "test-validation-token-123" response = client.post("/api/webhooks/graph?validationToken=" + token) assert response.status_code == 200 assert response.text == token assert response.headers["content-type"] == "text/plain; charset=utf-8" def test_graph_webhook_notification(client): payload = { "value": [ { "changeType": "updated", "resource": "auditLogs/directoryAudits", "clientState": "secret", } ] } response = client.post("/api/webhooks/graph", json=payload) assert response.status_code == 200 assert response.json()["status"] == "accepted" def test_update_tags(client, mock_events_collection): mock_events_collection.insert_one( { "id": "evt-tags", "timestamp": datetime.now(UTC).isoformat(), "service": "Directory", "operation": "Add user", "result": "success", "actor_display": "Alice", "raw_text": "", } ) response = client.patch("/api/events/evt-tags/tags", json={"tags": ["investigating", "urgent"]}) assert response.status_code == 200 assert response.json()["tags"] == ["investigating", "urgent"] doc = mock_events_collection.find_one({"id": "evt-tags"}) assert doc["tags"] == ["investigating", "urgent"] def test_add_comment(client, mock_events_collection): mock_events_collection.insert_one( { "id": "evt-comment", "timestamp": datetime.now(UTC).isoformat(), "service": "Directory", "operation": "Add user", "result": "success", "actor_display": "Alice", "raw_text": "", } ) response = client.post("/api/events/evt-comment/comments", json={"text": "Looks suspicious"}) assert response.status_code == 200 data = response.json() assert data["text"] == "Looks suspicious" doc = mock_events_collection.find_one({"id": "evt-comment"}) assert len(doc["comments"]) == 1 assert doc["comments"][0]["text"] == "Looks suspicious" def test_source_health(client, mock_watermarks_collection): mock_watermarks_collection.insert_one({"source": "directory", "last_fetch_time": "2024-01-01T00:00:00Z"}) response = client.get("/api/source-health") assert response.status_code == 200 data = response.json() directory = next((x for x in data if x["source"] == "directory"), None) assert directory["status"] == "healthy" assert directory["last_fetch_time"] == "2024-01-01T00:00:00Z" def test_rules_crud(client): rule = { "name": "After-hours admin", "enabled": True, "severity": "high", "conditions": [{"field": "operation", "op": "eq", "value": "Add user"}], "message": "Admin action outside business hours", } res = client.post("/api/rules", json=rule) assert res.status_code == 200 created = res.json() assert created["name"] == "After-hours admin" res2 = client.get("/api/rules") assert res2.status_code == 200 assert len(res2.json()) == 1 updated = {**rule, "name": "After-hours admin updated"} res3 = client.put(f"/api/rules/{created['id']}", json=updated) assert res3.status_code == 200 assert res3.json()["name"] == "After-hours admin updated" res4 = client.delete(f"/api/rules/{created['id']}") assert res4.status_code == 200 res5 = client.get("/api/rules") assert res5.status_code == 200 assert len(res5.json()) == 0 def test_list_events_filter_by_include_tags(client, mock_events_collection): mock_events_collection.insert_one( { "id": "evt-tagged", "timestamp": datetime.now(UTC).isoformat(), "service": "Directory", "operation": "Add user", "result": "success", "actor_display": "Alice", "raw_text": "", "tags": ["backup", "auto"], } ) mock_events_collection.insert_one( { "id": "evt-untagged", "timestamp": datetime.now(UTC).isoformat(), "service": "Directory", "operation": "Remove user", "result": "success", "actor_display": "Bob", "raw_text": "", "tags": [], } ) response = client.get("/api/events?include_tags=backup") assert response.status_code == 200 data = response.json() assert len(data["items"]) == 1 assert data["items"][0]["id"] == "evt-tagged" def test_bulk_tags_append(client, mock_events_collection): mock_events_collection.insert_one( { "id": "evt-bulk", "timestamp": datetime.now(UTC).isoformat(), "service": "Exchange", "operation": "Update", "result": "success", "actor_display": "Alice", "raw_text": "", "tags": ["existing"], } ) response = client.post("/api/events/bulk-tags?service=Exchange", json={"tags": ["backup"], "mode": "append"}) assert response.status_code == 200 data = response.json() assert data["matched"] == 1 doc = mock_events_collection.find_one({"id": "evt-bulk"}) assert "backup" in doc["tags"] assert "existing" in doc["tags"] def test_bulk_tags_replace(client, mock_events_collection): mock_events_collection.insert_one( { "id": "evt-bulk2", "timestamp": datetime.now(UTC).isoformat(), "service": "Exchange", "operation": "Update", "result": "success", "actor_display": "Alice", "raw_text": "", "tags": ["old"], } ) response = client.post("/api/events/bulk-tags?service=Exchange", json={"tags": ["backup"], "mode": "replace"}) assert response.status_code == 200 doc = mock_events_collection.find_one({"id": "evt-bulk2"}) assert doc["tags"] == ["backup"]