feat: natural language query + production hardening
Some checks failed
CI / lint-and-test (push) Failing after 41s
Release / build-and-push (push) Successful in 1m33s

Features:
- Add /api/ask endpoint for plain-language audit log queries
- Regex-based time/entity extraction (no LLM required for parsing)
- LLM-powered narrative summarisation with OpenAI-compatible APIs
- Graceful fallback to structured bullet lists when LLM is unavailable
- Frontend ask panel with markdown rendering and cited events

Production:
- Harden Dockerfile: non-root user, gunicorn+uvicorn workers
- Add docker-compose.prod.yml with internal networks and health checks
- Add nginx reverse proxy with security headers
- MongoDB no longer exposed externally in production

Tests:
- 29 new tests for ask parsing, query building, and endpoint behaviour
- Fix conftest monkeypatch for routes.ask events collection

Bump version to 1.1.0
This commit is contained in:
2026-04-20 15:10:55 +02:00
parent b0eba09f0f
commit 0ef50c91f7
16 changed files with 1097 additions and 4 deletions

View File

@@ -1,6 +1,27 @@
FROM python:3.11-slim
# Security: run as non-root
RUN groupadd -r aoc && useradd -r -g aoc aoc
WORKDIR /app
# Install dependencies first for layer caching
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
# Copy application code
COPY . .
CMD ["uvicorn", "main:app", "--host", "0.0.0.0", "--port", "8000"]
# Create directories for potential volume mounts and fix permissions
RUN mkdir -p /app/data && chown -R aoc:aoc /app
USER aoc
# Production: use gunicorn with uvicorn workers
# Workers = 2-4 x $NUM_CORES; keep it conservative for containerised workloads
ENV PYTHONUNBUFFERED=1
ENV PYTHONDONTWRITEBYTECODE=1
EXPOSE 8000
CMD ["gunicorn", "main:app", "-k", "uvicorn.workers.UvicornWorker", "--bind", "0.0.0.0:8000", "--workers", "2", "--timeout", "120", "--access-logfile", "-", "--error-logfile", "-"]

View File

@@ -42,6 +42,13 @@ class Settings(BaseSettings):
# Alerting
ALERTS_ENABLED: bool = False
# LLM / Natural Language Query
LLM_API_KEY: str = ""
LLM_BASE_URL: str = "https://api.openai.com/v1"
LLM_MODEL: str = "gpt-4o-mini"
LLM_MAX_EVENTS: int = 50
LLM_TIMEOUT_SECONDS: int = 30
_settings = Settings()
@@ -68,3 +75,9 @@ CORS_ORIGINS = [o.strip() for o in _settings.CORS_ORIGINS.split(",") if o.strip(
SIEM_ENABLED = _settings.SIEM_ENABLED
SIEM_WEBHOOK_URL = _settings.SIEM_WEBHOOK_URL
ALERTS_ENABLED = _settings.ALERTS_ENABLED
LLM_API_KEY = _settings.LLM_API_KEY
LLM_BASE_URL = _settings.LLM_BASE_URL
LLM_MODEL = _settings.LLM_MODEL
LLM_MAX_EVENTS = _settings.LLM_MAX_EVENTS
LLM_TIMEOUT_SECONDS = _settings.LLM_TIMEOUT_SECONDS

View File

@@ -38,6 +38,45 @@
</div>
</section>
<section class="panel">
<h3>Ask a question</h3>
<form class="ask-form" @submit.prevent="askQuestion()">
<div class="ask-row">
<input
type="text"
placeholder="What happened to device ABC123 in the last 3 days?"
x-model="askQuestionText"
class="ask-input"
/>
<button type="submit" :disabled="askLoading" x-text="askLoading ? 'Thinking…' : 'Ask'">Ask</button>
</div>
</form>
<template x-if="askAnswer">
<div class="ask-result">
<div class="ask-answer" x-html="askAnswerHtml"></div>
<template x-if="askEvents.length">
<div class="ask-events">
<h4>Referenced events</h4>
<template x-for="(evt, idx) in askEvents" :key="evt.id || idx">
<article class="event event--compact">
<div class="event__meta">
<span class="pill" x-text="evt.display_category || evt.service || '—'"></span>
<span class="pill" :class="['success','succeeded','ok','passed'].includes((evt.result || '').toLowerCase()) ? 'pill--ok' : 'pill--warn'" x-text="evt.result || '—'"></span>
</div>
<h3 x-text="evt.operation || '—'"></h3>
<p class="event__detail" x-show="evt.display_summary"><strong>Summary:</strong> <span x-text="evt.display_summary"></span></p>
<p class="event__detail"><strong>Actor:</strong> <span x-text="evt.actor_display || '—'"></span></p>
<p class="event__detail"><strong>Target:</strong> <span x-text="Array.isArray(evt.target_displays) ? evt.target_displays.join(', ') : '—'"></span></p>
<p class="event__detail"><strong>When:</strong> <span x-text="evt.timestamp ? new Date(evt.timestamp).toLocaleString() : '—'"></span></p>
</article>
</template>
</div>
</template>
<button type="button" class="ghost" @click="clearAsk()">Clear</button>
</div>
</template>
</section>
<section class="panel">
<form id="filters" class="filters" @submit.prevent="resetPagination(); loadEvents()">
<div class="filter-row">
@@ -200,6 +239,12 @@
actor: '', selectedServices: [], search: '', operation: '', result: '', start: '', end: '', limit: 100, includeTags: '', excludeTags: '',
},
options: { actors: [], services: [], operations: [], results: [] },
askQuestionText: '',
askLoading: false,
askAnswer: '',
askAnswerHtml: '',
askEvents: [],
askLlmUsed: false,
async initApp() {
await this.initAuth();
@@ -437,6 +482,52 @@
this.loadEvents();
},
async askQuestion() {
const q = this.askQuestionText.trim();
if (!q) return;
this.askLoading = true;
this.askAnswer = '';
this.askAnswerHtml = '';
this.askEvents = [];
try {
const res = await fetch('/api/ask', {
method: 'POST',
headers: { 'Content-Type': 'application/json', ...this.authHeader() },
body: JSON.stringify({ question: q }),
});
if (!res.ok) throw new Error(await res.text());
const body = await res.json();
this.askAnswer = body.answer;
this.askAnswerHtml = this._mdToHtml(body.answer);
this.askEvents = body.events || [];
this.askLlmUsed = body.llm_used;
} catch (err) {
this.askAnswer = 'Sorry, something went wrong: ' + (err.message || 'Unknown error');
this.askAnswerHtml = this.askAnswer;
} finally {
this.askLoading = false;
}
},
clearAsk() {
this.askQuestionText = '';
this.askAnswer = '';
this.askAnswerHtml = '';
this.askEvents = [];
this.askLlmUsed = false;
},
_mdToHtml(text) {
// Very lightweight markdown-to-HTML for LLM answers
return text
.replace(/&/g, '&amp;').replace(/</g, '&lt;').replace(/>/g, '&gt;')
.replace(/\*\*(.+?)\*\*/g, '<strong>$1</strong>')
.replace(/\*(.+?)\*/g, '<em>$1</em>')
.replace(/`([^`]+)`/g, '<code>$1</code>')
.replace(/Event #(\d+)/g, '<strong>Event #$1</strong>')
.replace(/\n/g, '<br>');
},
async bulkTagMatching() {
const tag = prompt('Enter tag to apply to all matching events:');
if (!tag || !tag.trim()) return;

View File

@@ -377,6 +377,84 @@ input {
margin: 0;
}
/* Ask / Natural Language Query */
.ask-form {
margin-top: 10px;
}
.ask-row {
display: flex;
gap: 10px;
align-items: center;
}
.ask-input {
flex: 1;
padding: 12px 14px;
border-radius: 10px;
border: 1px solid var(--border);
background: rgba(255, 255, 255, 0.02);
color: var(--text);
font-size: 15px;
}
.ask-result {
margin-top: 16px;
}
.ask-answer {
background: rgba(125, 211, 252, 0.06);
border: 1px solid rgba(125, 211, 252, 0.2);
border-radius: 12px;
padding: 16px;
line-height: 1.55;
margin-bottom: 14px;
}
.ask-answer code {
background: rgba(255,255,255,0.06);
padding: 2px 6px;
border-radius: 6px;
font-size: 13px;
}
.ask-events {
margin-bottom: 14px;
}
.ask-events h4 {
margin: 0 0 10px;
color: var(--muted);
font-size: 14px;
text-transform: uppercase;
letter-spacing: 0.06em;
}
.event--compact {
padding: 12px;
margin-bottom: 10px;
}
.event--compact h3 {
font-size: 15px;
}
.source-health {
display: grid;
grid-template-columns: repeat(auto-fit, minmax(min(200px, 100%), 1fr));
gap: 10px;
}
.health-card {
border: 1px solid var(--border);
border-radius: 10px;
padding: 10px 12px;
background: rgba(255, 255, 255, 0.02);
display: flex;
flex-direction: column;
gap: 4px;
}
@media (max-width: 640px) {
.hero {
flex-direction: column;
@@ -386,4 +464,9 @@ input {
flex-direction: column;
align-items: stretch;
}
.ask-row {
flex-direction: column;
align-items: stretch;
}
}

View File

@@ -19,6 +19,7 @@ from routes.events import router as events_router
from routes.fetch import router as fetch_router
from routes.fetch import run_fetch
from routes.health import router as health_router
from routes.ask import router as ask_router
from routes.rules import router as rules_router
from routes.webhooks import router as webhooks_router
@@ -112,6 +113,7 @@ app.include_router(events_router, prefix="/api")
app.include_router(config_router, prefix="/api")
app.include_router(webhooks_router, prefix="/api")
app.include_router(health_router, prefix="/api")
app.include_router(ask_router, prefix="/api")
app.include_router(rules_router, prefix="/api")

View File

@@ -70,3 +70,25 @@ class AlertRuleResponse(BaseModel):
severity: str
conditions: list[dict]
message: str
class AskRequest(BaseModel):
question: str
class AskEventRef(BaseModel):
id: str | None = None
timestamp: str | None = None
operation: str | None = None
actor_display: str | None = None
target_displays: list[str] | None = None
display_summary: str | None = None
service: str | None = None
result: str | None = None
class AskResponse(BaseModel):
answer: str
events: list[AskEventRef]
query_info: dict
llm_used: bool

View File

@@ -11,3 +11,5 @@ pydantic-settings
structlog
tenacity
prometheus-client
httpx
gunicorn

304
backend/routes/ask.py Normal file
View File

@@ -0,0 +1,304 @@
import json
import re
from datetime import UTC, datetime, timedelta
import httpx
import structlog
from auth import require_auth
from config import LLM_API_KEY, LLM_BASE_URL, LLM_MAX_EVENTS, LLM_MODEL, LLM_TIMEOUT_SECONDS
from database import events_collection
from fastapi import APIRouter, Depends, HTTPException
from models.api import AskRequest, AskResponse
router = APIRouter(dependencies=[Depends(require_auth)])
logger = structlog.get_logger("aoc.ask")
# ---------------------------------------------------------------------------
# Time-range extraction
# ---------------------------------------------------------------------------
_TIME_PATTERNS = [
(r"\blast\s+(\d+)\s+days?\b", "days"),
(r"\blast\s+(\d+)\s+hours?\b", "hours"),
(r"\blast\s+(\d+)\s+minutes?\b", "minutes"),
(r"\blast\s+week\b", "week"),
(r"\byesterday\b", "yesterday"),
(r"\btoday\b", "today"),
(r"\bin\s+the\s+last\s+(\d+)\s+days?\b", "days"),
(r"\bin\s+the\s+last\s+(\d+)\s+hours?\b", "hours"),
]
def _extract_time_range(question: str) -> tuple[str | None, str | None]:
"""Return (start_iso, end_iso) or (None, None) if no time detected."""
now = datetime.now(UTC)
q_lower = question.lower()
for pattern, unit in _TIME_PATTERNS:
m = re.search(pattern, q_lower)
if not m:
continue
if unit == "week":
start = now - timedelta(days=7)
elif unit == "yesterday":
start = now - timedelta(days=1)
elif unit == "today":
start = now.replace(hour=0, minute=0, second=0, microsecond=0)
else:
num = int(m.group(1))
delta = {"days": timedelta(days=num), "hours": timedelta(hours=num), "minutes": timedelta(minutes=num)}[unit]
start = now - delta
return start.isoformat().replace("+00:00", "Z"), now.isoformat().replace("+00:00", "Z")
return None, None
# ---------------------------------------------------------------------------
# Entity extraction
# ---------------------------------------------------------------------------
_ENTITY_HINTS = [
r"device\s+['\"]?([^'\"\s]+)['\"]?",
r"user\s+['\"]?([^'\"\s]+)['\"]?",
r"laptop\s+['\"]?([^'\"\s]+)['\"]?",
r"vm\s+['\"]?([^'\"\s]+)['\"]?",
r"server\s+['\"]?([^'\"\s]+)['\"]?",
r"computer\s+['\"]?([^'\"\s]+)['\"]?",
]
_EMAIL_RE = re.compile(r"[\w.+-]+@[\w-]+\.[\w.-]+")
def _extract_entity(question: str) -> str | None:
"""Best-effort extraction of the device / user / entity name."""
q_lower = question.lower()
# Look for explicit hints: "device ABC123"
for pattern in _ENTITY_HINTS:
m = re.search(pattern, q_lower)
if m:
# Extract from the original question to preserve case
start, end = m.span(1)
return question[start:end].strip().rstrip("?.!,;:")
# Look for quoted strings
m = re.search(r'"([^"]{2,50})"', question)
if m:
return m.group(1).strip()
m = re.search(r"'([^']{2,50})'", question)
if m:
return m.group(1).strip()
# Look for email addresses
m = _EMAIL_RE.search(question)
if m:
return m.group(0)
return None
# ---------------------------------------------------------------------------
# MongoDB query builder
# ---------------------------------------------------------------------------
def _build_event_query(entity: str | None, start: str | None, end: str | None) -> dict:
filters = []
if start or end:
time_filter = {}
if start:
time_filter["$gte"] = start
if end:
time_filter["$lte"] = end
filters.append({"timestamp": time_filter})
if entity:
entity_safe = re.escape(entity)
filters.append(
{
"$or": [
{"target_displays": {"$elemMatch": {"$regex": entity_safe, "$options": "i"}}},
{"actor_display": {"$regex": entity_safe, "$options": "i"}},
{"actor_upn": {"$regex": entity_safe, "$options": "i"}},
{"raw_text": {"$regex": entity_safe, "$options": "i"}},
]
}
)
return {"$and": filters} if filters else {}
# ---------------------------------------------------------------------------
# LLM summarisation
# ---------------------------------------------------------------------------
_SYSTEM_PROMPT = """You are an IT operations assistant. An administrator has asked a question about audit logs.
Your job is to read the list of audit events below and write a concise, plain-language answer.
Rules:
- Assume the reader is a non-expert admin.
- Group related events together and tell a coherent story.
- Highlight anything unusual, failed actions, or privilege escalations.
- Reference specific event numbers (e.g., "Event #3") when making claims so the user can verify.
- If there are no events, say so clearly.
- Keep the answer under 300 words.
- Do not invent events that are not in the list.
"""
def _format_events_for_llm(events: list[dict]) -> str:
lines = []
for i, e in enumerate(events, 1):
ts = e.get("timestamp") or "unknown time"
op = e.get("operation") or "unknown action"
actor = e.get("actor_display") or "unknown actor"
targets = ", ".join(e.get("target_displays") or []) or "unknown target"
svc = e.get("service") or "unknown service"
result = e.get("result") or "unknown result"
summary = e.get("display_summary") or ""
lines.append(
f"Event #{i}\n"
f" Time: {ts}\n"
f" Service: {svc}\n"
f" Action: {op}\n"
f" Actor: {actor}\n"
f" Target: {targets}\n"
f" Result: {result}\n"
f" Summary: {summary}\n"
)
return "\n".join(lines)
async def _call_llm(question: str, events: list[dict]) -> str:
if not LLM_API_KEY:
raise RuntimeError("LLM_API_KEY not configured")
context = _format_events_for_llm(events)
messages = [
{"role": "system", "content": _SYSTEM_PROMPT},
{
"role": "user",
"content": f"Question: {question}\n\nAudit events:\n{context}\n\nPlease answer the question based only on the events above.",
},
]
async with httpx.AsyncClient(timeout=LLM_TIMEOUT_SECONDS) as client:
resp = await client.post(
f"{LLM_BASE_URL.rstrip('/')}/chat/completions",
headers={
"Authorization": f"Bearer {LLM_API_KEY}",
"Content-Type": "application/json",
},
json={
"model": LLM_MODEL,
"messages": messages,
"temperature": 0.3,
"max_tokens": 800,
},
)
resp.raise_for_status()
data = resp.json()
return data["choices"][0]["message"]["content"].strip()
# ---------------------------------------------------------------------------
# API endpoint
# ---------------------------------------------------------------------------
def _to_event_ref(e: dict) -> dict:
return {
"id": e.get("id"),
"timestamp": e.get("timestamp"),
"operation": e.get("operation"),
"actor_display": e.get("actor_display"),
"target_displays": e.get("target_displays"),
"display_summary": e.get("display_summary"),
"service": e.get("service"),
"result": e.get("result"),
}
@router.post("/ask", response_model=AskResponse)
async def ask_question(body: AskRequest, user: dict = Depends(require_auth)):
question = body.question.strip()
if not question:
raise HTTPException(status_code=400, detail="Question is required")
start, end = _extract_time_range(question)
entity = _extract_entity(question)
# Default to last 7 days if no time range detected
if not start:
now = datetime.now(UTC)
start = (now - timedelta(days=7)).isoformat().replace("+00:00", "Z")
end = now.isoformat().replace("+00:00", "Z")
query = _build_event_query(entity, start, end)
try:
cursor = (
events_collection.find(query)
.sort([("timestamp", -1)])
.limit(LLM_MAX_EVENTS)
)
events = list(cursor)
except Exception as exc:
logger.error("Failed to query events for ask", error=str(exc))
raise HTTPException(status_code=500, detail=f"Database query failed: {exc}") from exc
for e in events:
e["_id"] = str(e.get("_id", ""))
# If no events, return early
if not events:
return AskResponse(
answer="I couldn't find any audit events matching your question. Try broadening the time range or checking the spelling of the device/user name.",
events=[],
query_info={"entity": entity, "start": start, "end": end, "event_count": 0},
llm_used=False,
)
# Try LLM summarisation
answer = ""
llm_used = False
if LLM_API_KEY:
try:
answer = await _call_llm(question, events)
llm_used = True
except Exception as exc:
logger.warning("LLM call failed, falling back to structured summary", error=str(exc))
# Fallback: structured summary if LLM unavailable or failed
if not answer:
parts = [f"Found {len(events)} event(s)"]
if entity:
parts.append(f"related to **{entity}**")
parts.append(f"between {start[:10]} and {end[:10]}.\n")
for i, e in enumerate(events[:10], 1):
ts = e.get("timestamp", "?")[:16].replace("T", " ")
op = e.get("operation", "unknown action")
actor = e.get("actor_display", "unknown")
targets = ", ".join(e.get("target_displays") or []) or ""
result = e.get("result", "")
parts.append(f"{i}. **{ts}** — {op} by {actor} on {targets} ({result})")
if len(events) > 10:
parts.append(f"\n...and {len(events) - 10} more events.")
answer = "\n".join(parts)
return AskResponse(
answer=answer,
events=[_to_event_ref(e) for e in events],
query_info={
"entity": entity,
"start": start,
"end": end,
"event_count": len(events),
"mongo_query": json.dumps(query, default=str),
},
llm_used=llm_used,
)

View File

@@ -24,6 +24,7 @@ def client(mock_events_collection, mock_watermarks_collection, monkeypatch):
monkeypatch.setattr("database.events_collection", mock_events_collection)
monkeypatch.setattr("routes.fetch.events_collection", mock_events_collection)
monkeypatch.setattr("routes.events.events_collection", mock_events_collection)
monkeypatch.setattr("routes.ask.events_collection", mock_events_collection)
monkeypatch.setattr("watermark.watermarks_collection", mock_watermarks_collection)
monkeypatch.setattr("routes.health.watermarks_collection", mock_watermarks_collection)
monkeypatch.setattr("routes.fetch.get_watermark", lambda source: None)

283
backend/tests/test_ask.py Normal file
View File

@@ -0,0 +1,283 @@
from datetime import UTC, datetime, timedelta
import pytest
from fastapi.testclient import TestClient
from routes.ask import _build_event_query, _extract_entity, _extract_time_range
# ---------------------------------------------------------------------------
# Unit tests: time-range extraction
# ---------------------------------------------------------------------------
class TestExtractTimeRange:
def test_last_n_days(self):
start, end = _extract_time_range("What happened in the last 3 days?")
assert start is not None
assert end is not None
# Start should be roughly 3 days before end
start_dt = datetime.fromisoformat(start.replace("Z", "+00:00"))
end_dt = datetime.fromisoformat(end.replace("Z", "+00:00"))
delta = end_dt - start_dt
assert delta.days == 3
def test_last_n_hours(self):
start, end = _extract_time_range("Show me events in the last 24 hours")
start_dt = datetime.fromisoformat(start.replace("Z", "+00:00"))
end_dt = datetime.fromisoformat(end.replace("Z", "+00:00"))
delta = end_dt - start_dt
assert delta.total_seconds() == 24 * 3600
def test_last_week(self):
start, end = _extract_time_range("What happened last week?")
start_dt = datetime.fromisoformat(start.replace("Z", "+00:00"))
end_dt = datetime.fromisoformat(end.replace("Z", "+00:00"))
assert (end_dt - start_dt).days == 7
def test_yesterday(self):
start, end = _extract_time_range("Show me yesterday's events")
start_dt = datetime.fromisoformat(start.replace("Z", "+00:00"))
end_dt = datetime.fromisoformat(end.replace("Z", "+00:00"))
assert (end_dt - start_dt).days == 1
def test_today(self):
start, end = _extract_time_range("What happened today?")
start_dt = datetime.fromisoformat(start.replace("Z", "+00:00"))
end_dt = datetime.fromisoformat(end.replace("Z", "+00:00"))
# Should be from midnight today to now
assert start_dt.hour == 0
assert start_dt.minute == 0
assert start_dt.second == 0
def test_no_time_pattern_returns_none(self):
start, end = _extract_time_range("What happened to device ABC?")
assert start is None
assert end is None
def test_last_n_minutes(self):
start, end = _extract_time_range("Show me events in the last 15 minutes")
start_dt = datetime.fromisoformat(start.replace("Z", "+00:00"))
end_dt = datetime.fromisoformat(end.replace("Z", "+00:00"))
assert (end_dt - start_dt).total_seconds() == 15 * 60
# ---------------------------------------------------------------------------
# Unit tests: entity extraction
# ---------------------------------------------------------------------------
class TestExtractEntity:
def test_device_hint(self):
assert _extract_entity("What happened to device LAPTOP-001?") == "LAPTOP-001"
def test_user_hint(self):
assert _extract_entity("Show me user alice@example.com") == "alice@example.com"
def test_laptop_hint(self):
assert _extract_entity("What did laptop HR-Desk-04 do?") == "HR-Desk-04"
def test_server_hint(self):
assert _extract_entity("Check server WEB-01") == "WEB-01"
def test_quoted_string(self):
assert _extract_entity('What happened to "Surface-Pro-7"?') == "Surface-Pro-7"
def test_single_quoted_string(self):
assert _extract_entity("What happened to 'VM-WEB-01' today?") == "VM-WEB-01"
def test_email_address(self):
assert _extract_entity("What did tomas.svensson@contoso.com do?") == "tomas.svensson@contoso.com"
def test_no_entity_returns_none(self):
assert _extract_entity("What happened in the last 3 days?") is None
def test_vm_hint(self):
assert _extract_entity("Show me vm APP-SERVER-02") == "APP-SERVER-02"
def test_computer_hint(self):
assert _extract_entity("What happened to computer DESK-123?") == "DESK-123"
# ---------------------------------------------------------------------------
# Unit tests: query builder
# ---------------------------------------------------------------------------
class TestBuildEventQuery:
def test_entity_only(self):
q = _build_event_query("ABC123", None, None)
assert "$and" in q
or_clause = q["$and"][0]["$or"]
assert any("target_displays" in c for c in or_clause)
assert any("actor_display" in c for c in or_clause)
assert any("raw_text" in c for c in or_clause)
def test_time_only(self):
q = _build_event_query(None, "2024-01-01T00:00:00Z", "2024-01-02T00:00:00Z")
assert q["$and"][0]["timestamp"]["$gte"] == "2024-01-01T00:00:00Z"
assert q["$and"][0]["timestamp"]["$lte"] == "2024-01-02T00:00:00Z"
def test_entity_and_time(self):
q = _build_event_query("DEV-01", "2024-01-01T00:00:00Z", "2024-01-02T00:00:00Z")
assert len(q["$and"]) == 2
assert "timestamp" in q["$and"][0] or "timestamp" in q["$and"][1]
def test_empty_returns_empty(self):
q = _build_event_query(None, None, None)
assert q == {}
def test_entity_is_escaped_for_regex(self):
q = _build_event_query("DEV.01", None, None)
# The dot should be escaped in the regex
or_clause = q["$and"][0]["$or"]
raw_regex = or_clause[-1]["raw_text"]["$regex"]
assert raw_regex == "DEV\\.01"
# ---------------------------------------------------------------------------
# Integration tests: /api/ask endpoint
# ---------------------------------------------------------------------------
class TestAskEndpoint:
def test_ask_empty_question(self, client):
response = client.post("/api/ask", json={"question": ""})
assert response.status_code == 400
def test_ask_no_events(self, client):
response = client.post("/api/ask", json={"question": "What happened to device NONEXISTENT in the last 3 days?"})
assert response.status_code == 200
data = response.json()
assert data["answer"] != ""
assert data["events"] == []
assert data["llm_used"] is False
assert data["query_info"]["entity"] == "NONEXISTENT"
def test_ask_with_events_fallback(self, client, mock_events_collection):
now = datetime.now(UTC)
mock_events_collection.insert_one(
{
"id": "evt-ask-1",
"timestamp": now.isoformat(),
"service": "Device",
"operation": "Update device",
"result": "success",
"actor_display": "Admin Bob",
"actor_upn": "bob@example.com",
"target_displays": ["LAPTOP-001"],
"display_summary": "Update device | device: LAPTOP-001 by Admin Bob",
"raw_text": "LAPTOP-001 something",
}
)
response = client.post("/api/ask", json={"question": "What happened to device LAPTOP-001 in the last 3 days?"})
assert response.status_code == 200
data = response.json()
assert data["llm_used"] is False
assert len(data["events"]) == 1
assert data["events"][0]["id"] == "evt-ask-1"
assert "LAPTOP-001" in data["answer"]
assert data["query_info"]["entity"] == "LAPTOP-001"
assert data["query_info"]["event_count"] == 1
def test_ask_defaults_to_7_days_when_no_time(self, client, mock_events_collection):
# Insert an event from 5 days ago
five_days_ago = datetime.now(UTC) - timedelta(days=5)
mock_events_collection.insert_one(
{
"id": "evt-ask-old",
"timestamp": five_days_ago.isoformat(),
"service": "Directory",
"operation": "Add user",
"result": "success",
"actor_display": "Alice",
"target_displays": ["DESKTOP-999"],
"display_summary": "summary",
"raw_text": "raw",
}
)
response = client.post("/api/ask", json={"question": "What happened to DESKTOP-999?"})
assert response.status_code == 200
data = response.json()
assert data["query_info"]["event_count"] == 1
assert data["events"][0]["id"] == "evt-ask-old"
def test_ask_event_outside_time_window(self, client, mock_events_collection):
# Event from 10 days ago — outside default 7-day window
old = datetime.now(UTC) - timedelta(days=10)
mock_events_collection.insert_one(
{
"id": "evt-too-old",
"timestamp": old.isoformat(),
"service": "Directory",
"operation": "Add user",
"result": "success",
"actor_display": "Alice",
"target_displays": ["OLD-DEVICE"],
"display_summary": "summary",
"raw_text": "raw",
}
)
response = client.post("/api/ask", json={"question": "What happened to OLD-DEVICE?"})
assert response.status_code == 200
data = response.json()
# Default is 7 days, so 10-day-old event should not match
assert data["query_info"]["event_count"] == 0
def test_ask_with_llm(self, client, mock_events_collection, monkeypatch):
now = datetime.now(UTC)
mock_events_collection.insert_one(
{
"id": "evt-llm",
"timestamp": now.isoformat(),
"service": "Device",
"operation": "Wipe device",
"result": "failure",
"actor_display": "System",
"target_displays": ["PHONE-001"],
"display_summary": "Wipe device | device: PHONE-001 by System",
"raw_text": "PHONE-001 wipe failed",
}
)
async def fake_llm(question, events):
return "The device had a failed wipe attempt."
monkeypatch.setattr("routes.ask.LLM_API_KEY", "fake-key")
monkeypatch.setattr("routes.ask._call_llm", fake_llm)
response = client.post("/api/ask", json={"question": "What happened to PHONE-001 in the last day?"})
assert response.status_code == 200
data = response.json()
assert data["llm_used"] is True
assert data["answer"] == "The device had a failed wipe attempt."
assert len(data["events"]) == 1
def test_ask_falls_back_when_llm_errors(self, client, mock_events_collection, monkeypatch):
now = datetime.now(UTC)
mock_events_collection.insert_one(
{
"id": "evt-fallback",
"timestamp": now.isoformat(),
"service": "Directory",
"operation": "Add user",
"result": "success",
"actor_display": "Alice",
"target_displays": ["USER-001"],
"display_summary": "summary",
"raw_text": "raw",
}
)
async def failing_llm(question, events):
raise RuntimeError("LLM service down")
monkeypatch.setattr("routes.ask.LLM_API_KEY", "fake-key")
monkeypatch.setattr("routes.ask._call_llm", failing_llm)
response = client.post("/api/ask", json={"question": "What happened to USER-001?"})
assert response.status_code == 200
data = response.json()
assert data["llm_used"] is False # Falls back
assert len(data["events"]) == 1
assert "Found 1 event" in data["answer"]