import asyncio from datetime import UTC, datetime, timedelta from jobs import set_cached_ask from routes.ask import _build_event_query, _extract_entity, _extract_time_range # --------------------------------------------------------------------------- # Unit tests: time-range extraction # --------------------------------------------------------------------------- class TestExtractTimeRange: def test_last_n_days(self): start, end = _extract_time_range("What happened in the last 3 days?") assert start is not None assert end is not None # Start should be roughly 3 days before end start_dt = datetime.fromisoformat(start.replace("Z", "+00:00")) end_dt = datetime.fromisoformat(end.replace("Z", "+00:00")) delta = end_dt - start_dt assert delta.days == 3 def test_last_n_hours(self): start, end = _extract_time_range("Show me events in the last 24 hours") start_dt = datetime.fromisoformat(start.replace("Z", "+00:00")) end_dt = datetime.fromisoformat(end.replace("Z", "+00:00")) delta = end_dt - start_dt assert delta.total_seconds() == 24 * 3600 def test_last_week(self): start, end = _extract_time_range("What happened last week?") start_dt = datetime.fromisoformat(start.replace("Z", "+00:00")) end_dt = datetime.fromisoformat(end.replace("Z", "+00:00")) assert (end_dt - start_dt).days == 7 def test_yesterday(self): start, end = _extract_time_range("Show me yesterday's events") start_dt = datetime.fromisoformat(start.replace("Z", "+00:00")) end_dt = datetime.fromisoformat(end.replace("Z", "+00:00")) assert (end_dt - start_dt).days == 1 def test_today(self): start, end = _extract_time_range("What happened today?") start_dt = datetime.fromisoformat(start.replace("Z", "+00:00")) # end_dt is not needed for this assertion # Should be from midnight today to now assert start_dt.hour == 0 assert start_dt.minute == 0 assert start_dt.second == 0 def test_no_time_pattern_returns_none(self): start, end = _extract_time_range("What happened to device ABC?") assert start is None assert end is None def test_last_n_minutes(self): start, end = _extract_time_range("Show me events in the last 15 minutes") start_dt = datetime.fromisoformat(start.replace("Z", "+00:00")) end_dt = datetime.fromisoformat(end.replace("Z", "+00:00")) assert (end_dt - start_dt).total_seconds() == 15 * 60 # --------------------------------------------------------------------------- # Unit tests: entity extraction # --------------------------------------------------------------------------- class TestExtractEntity: def test_device_hint(self): assert _extract_entity("What happened to device LAPTOP-001?") == "LAPTOP-001" def test_user_hint(self): assert _extract_entity("Show me user alice@example.com") == "alice@example.com" def test_laptop_hint(self): assert _extract_entity("What did laptop HR-Desk-04 do?") == "HR-Desk-04" def test_server_hint(self): assert _extract_entity("Check server WEB-01") == "WEB-01" def test_quoted_string(self): assert _extract_entity('What happened to "Surface-Pro-7"?') == "Surface-Pro-7" def test_single_quoted_string(self): assert _extract_entity("What happened to 'VM-WEB-01' today?") == "VM-WEB-01" def test_email_address(self): assert _extract_entity("What did tomas.svensson@contoso.com do?") == "tomas.svensson@contoso.com" def test_no_entity_returns_none(self): assert _extract_entity("What happened in the last 3 days?") is None def test_vm_hint(self): assert _extract_entity("Show me vm APP-SERVER-02") == "APP-SERVER-02" def test_computer_hint(self): assert _extract_entity("What happened to computer DESK-123?") == "DESK-123" # --------------------------------------------------------------------------- # Unit tests: query builder # --------------------------------------------------------------------------- class TestBuildEventQuery: def test_entity_only(self): q = _build_event_query("ABC123", None, None) assert "$and" in q or_clause = q["$and"][0]["$or"] assert any("target_displays" in c for c in or_clause) assert any("actor_display" in c for c in or_clause) assert any("raw_text" in c for c in or_clause) def test_time_only(self): q = _build_event_query(None, "2024-01-01T00:00:00Z", "2024-01-02T00:00:00Z") assert q["$and"][0]["timestamp"]["$gte"] == "2024-01-01T00:00:00Z" assert q["$and"][0]["timestamp"]["$lte"] == "2024-01-02T00:00:00Z" def test_entity_and_time(self): q = _build_event_query("DEV-01", "2024-01-01T00:00:00Z", "2024-01-02T00:00:00Z") assert len(q["$and"]) == 2 assert "timestamp" in q["$and"][0] or "timestamp" in q["$and"][1] def test_empty_returns_empty(self): q = _build_event_query(None, None, None) assert q == {} def test_entity_is_escaped_for_regex(self): q = _build_event_query("DEV.01", None, None) # The dot should be escaped in the regex or_clause = q["$and"][0]["$or"] raw_regex = or_clause[-1]["raw_text"]["$regex"] assert raw_regex == "DEV\\.01" # --------------------------------------------------------------------------- # Integration tests: /api/ask endpoint # --------------------------------------------------------------------------- class TestAskEndpoint: def test_ask_empty_question(self, client): response = client.post("/api/ask", json={"question": ""}) assert response.status_code == 400 def test_ask_no_events(self, client): response = client.post("/api/ask", json={"question": "What happened to device NONEXISTENT in the last 3 days?"}) assert response.status_code == 200 data = response.json() assert data["answer"] != "" assert data["events"] == [] assert data["llm_used"] is False assert data["query_info"]["entity"] == "NONEXISTENT" def test_ask_with_events_fallback(self, client, mock_events_collection): now = datetime.now(UTC) mock_events_collection.insert_one( { "id": "evt-ask-1", "timestamp": now.isoformat(), "service": "Device", "operation": "Update device", "result": "success", "actor_display": "Admin Bob", "actor_upn": "bob@example.com", "target_displays": ["LAPTOP-001"], "display_summary": "Update device | device: LAPTOP-001 by Admin Bob", "raw_text": "LAPTOP-001 something", } ) response = client.post("/api/ask", json={"question": "What happened to device LAPTOP-001 in the last 3 days?"}) assert response.status_code == 200 data = response.json() assert data["llm_used"] is False assert len(data["events"]) == 1 assert data["events"][0]["id"] == "evt-ask-1" assert "LAPTOP-001" in data["answer"] assert data["query_info"]["entity"] == "LAPTOP-001" assert data["query_info"]["event_count"] == 1 def test_ask_defaults_to_7_days_when_no_time(self, client, mock_events_collection): # Insert an event from 5 days ago five_days_ago = datetime.now(UTC) - timedelta(days=5) mock_events_collection.insert_one( { "id": "evt-ask-old", "timestamp": five_days_ago.isoformat(), "service": "Directory", "operation": "Add user", "result": "success", "actor_display": "Alice", "target_displays": ["DESKTOP-999"], "display_summary": "summary", "raw_text": "raw", } ) response = client.post("/api/ask", json={"question": "What happened to DESKTOP-999?"}) assert response.status_code == 200 data = response.json() assert data["query_info"]["event_count"] == 1 assert data["events"][0]["id"] == "evt-ask-old" def test_ask_event_outside_time_window(self, client, mock_events_collection): # Event from 10 days ago — outside default 7-day window old = datetime.now(UTC) - timedelta(days=10) mock_events_collection.insert_one( { "id": "evt-too-old", "timestamp": old.isoformat(), "service": "Directory", "operation": "Add user", "result": "success", "actor_display": "Alice", "target_displays": ["OLD-DEVICE"], "display_summary": "summary", "raw_text": "raw", } ) response = client.post("/api/ask", json={"question": "What happened to OLD-DEVICE?"}) assert response.status_code == 200 data = response.json() # Default is 7 days, so 10-day-old event should not match assert data["query_info"]["event_count"] == 0 def test_ask_with_llm(self, client, mock_events_collection, monkeypatch): now = datetime.now(UTC) mock_events_collection.insert_one( { "id": "evt-llm", "timestamp": now.isoformat(), "service": "Device", "operation": "Wipe device", "result": "failure", "actor_display": "System", "target_displays": ["PHONE-001"], "display_summary": "Wipe device | device: PHONE-001 by System", "raw_text": "PHONE-001 wipe failed", } ) async def fake_llm(question, events, total=None, excluded_services=None): return "The device had a failed wipe attempt." monkeypatch.setattr("routes.ask.LLM_API_KEY", "fake-key") monkeypatch.setattr("routes.ask._call_llm", fake_llm) response = client.post("/api/ask", json={"question": "What happened to PHONE-001 in the last day?"}) assert response.status_code == 200 data = response.json() assert data["llm_used"] is True assert data["answer"] == "The device had a failed wipe attempt." assert len(data["events"]) == 1 def test_ask_falls_back_when_llm_errors(self, client, mock_events_collection, monkeypatch): now = datetime.now(UTC) mock_events_collection.insert_one( { "id": "evt-fallback", "timestamp": now.isoformat(), "service": "Directory", "operation": "Add user", "result": "success", "actor_display": "Alice", "target_displays": ["USER-001"], "display_summary": "summary", "raw_text": "raw", } ) async def failing_llm(question, events, total=None): raise RuntimeError("LLM service down") monkeypatch.setattr("routes.ask.LLM_API_KEY", "fake-key") monkeypatch.setattr("routes.ask._call_llm", failing_llm) response = client.post("/api/ask", json={"question": "What happened to USER-001?"}) assert response.status_code == 200 data = response.json() assert data["llm_used"] is False # Falls back assert len(data["events"]) == 1 assert "Found 1 event" in data["answer"] def test_ask_with_explicit_filters(self, client, mock_events_collection): now = datetime.now(UTC) mock_events_collection.insert_one( { "id": "evt-exchange", "timestamp": now.isoformat(), "service": "Exchange", "operation": "Update", "result": "failure", "actor_display": "Alice", "target_displays": ["LAPTOP-001"], "display_summary": "summary", "raw_text": "raw", } ) mock_events_collection.insert_one( { "id": "evt-directory", "timestamp": now.isoformat(), "service": "Directory", "operation": "Add user", "result": "success", "actor_display": "Alice", "target_displays": ["LAPTOP-001"], "display_summary": "summary", "raw_text": "raw", } ) response = client.post( "/api/ask", json={"question": "What happened to LAPTOP-001?", "services": ["Exchange"], "result": "failure"}, ) assert response.status_code == 200 data = response.json() assert data["query_info"]["event_count"] == 1 assert data["events"][0]["id"] == "evt-exchange" def test_ask_with_explicit_actor_filter(self, client, mock_events_collection): now = datetime.now(UTC) mock_events_collection.insert_one( { "id": "evt-bob", "timestamp": now.isoformat(), "service": "Directory", "operation": "Add user", "result": "success", "actor_display": "Bob", "actor_upn": "bob@example.com", "target_displays": ["USER-001"], "display_summary": "summary", "raw_text": "raw", } ) mock_events_collection.insert_one( { "id": "evt-alice", "timestamp": now.isoformat(), "service": "Directory", "operation": "Remove user", "result": "success", "actor_display": "Alice", "actor_upn": "alice@example.com", "target_displays": ["USER-001"], "display_summary": "summary", "raw_text": "raw", } ) response = client.post("/api/ask", json={"question": "What happened to USER-001?", "actor": "bob"}) assert response.status_code == 200 data = response.json() assert data["query_info"]["event_count"] == 1 assert data["events"][0]["id"] == "evt-bob" class TestAskCaching: def test_ask_cache_hit_returns_cached_answer(self, client, mock_events_collection, monkeypatch): """If the answer is cached, the LLM should not be called.""" now = datetime.now(UTC) mock_events_collection.insert_one( { "id": "evt-cache", "timestamp": now.isoformat(), "service": "Directory", "operation": "Add user", "result": "success", "actor_display": "Alice", "target_displays": ["USER-001"], "display_summary": "summary", "raw_text": "raw", } ) llm_called = False async def fake_llm(question, events, total=None, excluded_services=None): nonlocal llm_called llm_called = True return "This should NOT appear." monkeypatch.setattr("routes.ask.LLM_API_KEY", "fake-key") monkeypatch.setattr("routes.ask._call_llm", fake_llm) # Pre-populate cache with a specific answer class CachingFakeRedis: def __init__(self): self.store = {} async def get(self, key): return self.store.get(key) async def setex(self, key, ttl, value): self.store[key] = value redis = CachingFakeRedis() # Seed cache with the exact filters the endpoint will generate filters_snapshot = { "services": None, "actor": None, "operation": None, "result": None, "start": None, "end": None, "include_tags": None, "exclude_tags": None, } asyncio.run(set_cached_ask(redis, "What happened to USER-001?", filters_snapshot, [{"id": "evt-cache"}], {"answer": "Cached answer!", "llm_used": True, "llm_error": None})) async def fake_get_arq_pool(): return redis monkeypatch.setattr("routes.ask.get_arq_pool", fake_get_arq_pool) response = client.post("/api/ask", json={"question": "What happened to USER-001?"}) assert response.status_code == 200 data = response.json() assert data["answer"] == "Cached answer!" assert data["llm_used"] is True assert llm_called is False def test_ask_async_mode_returns_job_id(self, client, mock_events_collection, monkeypatch): """Async mode should return immediately with a job_id.""" now = datetime.now(UTC) mock_events_collection.insert_one( { "id": "evt-async", "timestamp": now.isoformat(), "service": "Directory", "operation": "Add user", "result": "success", "actor_display": "Alice", "target_displays": ["USER-001"], "display_summary": "summary", "raw_text": "raw", } ) monkeypatch.setattr("routes.ask.LLM_API_KEY", "fake-key") # Mock arq pool to capture enqueue_job call class FakeArqPool: def __init__(self): self.enqueued = [] async def get(self, key): return None async def setex(self, key, ttl, value): pass async def enqueue_job(self, func, *args, **kwargs): from unittest.mock import MagicMock job = MagicMock() job.job_id = "job-12345" self.enqueued.append((func, args, kwargs)) return job pool = FakeArqPool() async def fake_get_arq_pool(): return pool monkeypatch.setattr("routes.ask.get_arq_pool", fake_get_arq_pool) response = client.post("/api/ask", json={"question": "What happened to USER-001?", "async_mode": True}) assert response.status_code == 200 data = response.json() assert data["job_id"] == "job-12345" assert data["llm_used"] is False assert "being processed" in data["answer"] assert len(pool.enqueued) == 1 assert pool.enqueued[0][0] == "process_ask_question"