11 Commits

Author SHA1 Message Date
5bda1dd616 chore: bump version to 1.6.4
All checks were successful
CI / lint-and-test (push) Successful in 25s
Release / build-and-push (push) Successful in 1m29s
2026-04-22 12:16:32 +02:00
3e333291c6 fix: revert to single-click service filter, show all services by default, page size 24
- Revert +/- buttons on service pills back to single-click = filter only this service
- Remove default exclusion of Exchange/SharePoint/Teams (privacy controls handle this server-side)
- Change default page size from 25 to 24 (divisible by 3 for the 3-column grid)
- Update DEFAULT_PAGE_SIZE config default to 24
2026-04-22 12:16:20 +02:00
aa62528862 chore: bump version to 1.6.3
All checks were successful
CI / lint-and-test (push) Successful in 35s
Release / build-and-push (push) Successful in 1m47s
2026-04-22 12:02:28 +02:00
ac155d8843 feat: +/- buttons on service pills for additive/subtractive filtering
- Replace single-click service pill filter with explicit +/− buttons
- '+' adds the service to the current filter (keeps other selections)
- '−' removes the service from the current filter
- Result pills keep toggle click behavior
- Add .pill__action styles for small inline buttons
2026-04-22 12:02:11 +02:00
ed7465f5cd chore: bump version to 1.6.2
All checks were successful
Release / build-and-push (push) Successful in 1m33s
CI / lint-and-test (push) Successful in 33s
2026-04-22 11:53:21 +02:00
0eebcd0765 feat: clickable pills, configurable page size, CQRE.NET branding
- Service/category pills are now clickable: click to filter by that service
- Result pills (Success, Failure, etc.) are now clickable: click to filter by that result
- Click again to clear the filter (toggle behavior)
- Change default page size from 100 to 25
- Add DEFAULT_PAGE_SIZE config (env var, default 25), exposed via /api/config/features
- Change footer brand from CQRE to CQRE.NET
- Add pill--clickable hover styles
- Bump CSS cache-buster to v=10
2026-04-22 11:53:01 +02:00
67f3c28e82 chore: bump version to 1.6.1
All checks were successful
CI / lint-and-test (push) Successful in 32s
Release / build-and-push (push) Successful in 1m30s
2026-04-22 11:31:57 +02:00
04c41ee740 style: UI polish — topbar, footer, user info, product feel
- Add sticky top navigation bar with brand, repo/docs links, user chip
- Show logged-in user name + email from MSAL account
- Add footer with version, issue link, repo link, docs link
- Move action buttons (Fetch/Refresh/Login) to compact topbar
- Clean up hero section (removed buttons, just title + tagline)
- Bump CSS cache-buster to v=9
- Responsive stacking for mobile
2026-04-22 11:31:37 +02:00
cbd46adaa6 style: ruff format
All checks were successful
CI / lint-and-test (push) Successful in 25s
2026-04-22 10:08:32 +02:00
e4bafbc4b0 chore: fix ruff import order in test_ask.py
Some checks failed
CI / lint-and-test (push) Failing after 19s
2026-04-22 10:06:07 +02:00
f75f165911 feat: Redis caching + async queue for LLM scaling (v1.6.0)
Some checks failed
Release / build-and-push (push) Successful in 1m24s
CI / lint-and-test (push) Failing after 29s
- Add async Redis client singleton (redis_client.py) for caching and arq pool
- Add arq job functions (jobs.py) for background LLM processing
- Cache ask/explain LLM responses with TTL (1h ask, 24h explain)
- Add async mode to /api/ask: enqueue job, return job_id, poll /api/jobs/{id}
- Add GET /api/jobs/{job_id} endpoint for job status polling
- Add arq worker service to docker-compose (dev + prod)
- Switch from Redis to Valkey (BSD fork) in Docker Compose
- Add REDIS_URL config setting
- Add tests for cache hit, async mode, and job status
2026-04-22 09:55:05 +02:00
19 changed files with 809 additions and 32 deletions

View File

@@ -50,6 +50,14 @@ LLM_MAX_EVENTS=200
LLM_TIMEOUT_SECONDS=30
LLM_API_VERSION=
# Valkey (caching + async job queue for LLM calls)
# In Docker Compose, this is set automatically to redis://redis:6379/0
# For local dev, start Valkey with: docker run -d -p 6379:6379 valkey/valkey:8-alpine
REDIS_URL=redis://localhost:6379/0
# UI default page size (number of events shown per page)
DEFAULT_PAGE_SIZE=24
# Optional: privacy / access control
# Hide entire services from users without PRIVACY_SERVICE_ROLES
# PRIVACY_SERVICES=Exchange,Teams

View File

@@ -65,9 +65,10 @@ Goal: add AI-powered analysis and external tool integration.
- [x] AI feature flag (`AI_FEATURES_ENABLED`) to gate LLM-dependent features
- [x] Natural language query endpoint (`/api/ask`) with intent extraction and smart sampling
- [x] MCP (Model Context Protocol) server for Claude Desktop / Cursor integration
- [x] Valkey caching for LLM responses and frequent queries
- [x] Async queue (arq) for LLM requests to prevent timeout/cost explosions at scale
- [ ] Advanced analytics dashboard (trending operations, anomaly detection)
- [ ] Redis caching for LLM responses and frequent queries
- [ ] Async queue for LLM requests to prevent timeout/cost explosions at scale
## Completed in this PR
All Phase 5 items marked done were implemented in v1.3.0.
All Phase 5 items marked done were implemented in v1.3.0v1.5.0.
Redis caching + async queue implemented in v1.6.0, switched to Valkey.

View File

@@ -1 +1 @@
1.5.0
1.6.4

View File

@@ -57,6 +57,12 @@ class Settings(BaseSettings):
PRIVACY_SENSITIVE_OPERATIONS: str = "" # comma-separated, e.g. "MailItemsAccessed,Search-Mailbox,Send"
PRIVACY_SERVICE_ROLES: str = "" # comma-separated, e.g. "SecurityAdministrator,ComplianceAdministrator"
# Redis (caching + async job queue)
REDIS_URL: str = "redis://localhost:6379/0"
# UI defaults
DEFAULT_PAGE_SIZE: int = 24
_settings = Settings()
@@ -95,3 +101,6 @@ LLM_API_VERSION = _settings.LLM_API_VERSION
PRIVACY_SERVICES = {s.strip() for s in _settings.PRIVACY_SERVICES.split(",") if s.strip()}
PRIVACY_SENSITIVE_OPERATIONS = {o.strip() for o in _settings.PRIVACY_SENSITIVE_OPERATIONS.split(",") if o.strip()}
PRIVACY_SERVICE_ROLES = {r.strip() for r in _settings.PRIVACY_SERVICE_ROLES.split(",") if r.strip()}
REDIS_URL = _settings.REDIS_URL
DEFAULT_PAGE_SIZE = _settings.DEFAULT_PAGE_SIZE

View File

@@ -4,23 +4,49 @@
<meta charset="UTF-8" />
<meta name="viewport" content="width=device-width, initial-scale=1.0" />
<title>Admin Operations Center</title>
<link rel="stylesheet" href="/style.css?v=8" />
<link rel="stylesheet" href="/style.css?v=12" />
<script defer src="https://cdn.jsdelivr.net/npm/alpinejs@3.x.x/dist/cdn.min.js"></script>
<script src="https://alcdn.msauth.net/browser/2.37.0/js/msal-browser.min.js" crossorigin="anonymous"></script>
</head>
<body>
<div class="page" x-data="aocApp()" x-init="initApp()">
<nav class="topbar">
<div class="topbar__brand">
<span class="topbar__logo">🔍</span>
<span class="topbar__name">AOC</span>
<span class="version-badge" x-text="appVersion"></span>
</div>
<div class="topbar__links">
<a :href="repoUrl" target="_blank" rel="noopener">Repository</a>
<a :href="docsUrl" target="_blank" rel="noopener">Docs</a>
</div>
<div class="topbar__meta">
<template x-if="account">
<div class="user-chip">
<div class="user-avatar" x-text="(account.name || account.username || '?').charAt(0).toUpperCase()"></div>
<div class="user-details">
<span class="user-name" x-text="account.name || account.username || ''"></span>
<span class="user-email" x-text="account.username || ''"></span>
</div>
</div>
</template>
<template x-if="!account && authConfig?.auth_enabled">
<span class="login-hint">Not signed in</span>
</template>
</div>
<div class="topbar__actions">
<button id="fetchBtn" class="ghost btn--compact" aria-label="Fetch latest audit logs" @click="fetchLogs()">Fetch</button>
<button id="refreshBtn" class="ghost btn--compact" aria-label="Refresh events" @click="loadEvents(currentCursor)">Refresh</button>
<button id="authBtn" class="ghost btn--compact" aria-label="Login" x-text="authBtnText" @click="toggleAuth()"></button>
</div>
</nav>
<header class="hero">
<div>
<p class="eyebrow">Admin Operations Center <span class="version-badge" x-text="appVersion"></span></p>
<p class="eyebrow">Admin Operations Center</p>
<h1>Audit Log Explorer</h1>
<p class="lede">Search and review Microsoft audit events from Entra, Intune, Exchange, SharePoint, and Teams.</p>
</div>
<div class="cta">
<button id="authBtn" class="ghost" aria-label="Login" x-text="authBtnText" @click="toggleAuth()"></button>
<button id="fetchBtn" aria-label="Fetch latest audit logs" @click="fetchLogs()">Fetch new</button>
<button id="refreshBtn" aria-label="Refresh events" @click="loadEvents(currentCursor)">Refresh</button>
</div>
</header>
<section class="panel">
@@ -158,8 +184,8 @@
<template x-for="(evt, idx) in askEvents" :key="evt.id || idx">
<article class="event event--compact">
<div class="event__meta">
<span class="pill" x-text="evt.display_category || evt.service || '—'"></span>
<span class="pill" :class="['success','succeeded','ok','passed','true'].includes((evt.result || '').toLowerCase()) ? 'pill--ok' : 'pill--warn'" x-text="evt.result || '—'"></span>
<span class="pill pill--clickable" x-text="evt.display_category || evt.service || '—'" @click="filterByService(evt.service || evt.display_category)" title="Filter by this service"></span>
<span class="pill pill--clickable" :class="['success','succeeded','ok','passed','true'].includes((evt.result || '').toLowerCase()) ? 'pill--ok' : 'pill--warn'" x-text="evt.result || '—'" @click="filterByResult(evt.result)" title="Filter by this result"></span>
</div>
<h3 x-text="evt.operation || '—'"></h3>
<p class="event__detail" x-show="evt.display_summary"><strong>Summary:</strong> <span x-text="evt.display_summary"></span></p>
@@ -185,8 +211,8 @@
<template x-for="(evt, idx) in events" :key="evt._id || evt.id || idx">
<article class="event">
<div class="event__meta">
<span class="pill" x-text="evt.display_category || evt.service || '—'"></span>
<span class="pill" :class="['success','succeeded','ok','passed','true'].includes((evt.result || '').toLowerCase()) ? 'pill--ok' : 'pill--warn'" x-text="evt.result || '—'"></span>
<span class="pill pill--clickable" x-text="evt.display_category || evt.service || '—'" @click="filterByService(evt.service || evt.display_category)" title="Filter by this service"></span>
<span class="pill pill--clickable" :class="['success','succeeded','ok','passed','true'].includes((evt.result || '').toLowerCase()) ? 'pill--ok' : 'pill--warn'" x-text="evt.result || '—'" @click="filterByResult(evt.result)" title="Filter by this result"></span>
</div>
<h3 x-text="evt.operation || '—'"></h3>
<p class="event__detail" x-show="evt.display_summary"><strong>Summary:</strong> <span x-text="evt.display_summary"></span></p>
@@ -239,6 +265,21 @@
<pre id="modalBody" x-text="modalBody"></pre>
</div>
</div>
<footer class="footer">
<div class="footer__left">
<span class="footer__brand">Admin Operations Center</span>
<span class="footer__version" x-text="'v' + appVersion"></span>
</div>
<div class="footer__center">
<a :href="repoUrl + '/issues/new'" target="_blank" rel="noopener">🐛 Report an issue</a>
<a :href="repoUrl" target="_blank" rel="noopener">💻 Source code</a>
<a :href="docsUrl" target="_blank" rel="noopener">📖 Documentation</a>
</div>
<div class="footer__right">
<span>Built with ❤️ by CQRE.NET</span>
</div>
</footer>
</div>
<script>
@@ -264,11 +305,13 @@
accessToken: null,
authScopes: [],
filters: {
actor: '', selectedServices: [], search: '', operation: '', result: '', start: '', end: '', limit: 100, includeTags: '', excludeTags: '',
actor: '', selectedServices: [], search: '', operation: '', result: '', start: '', end: '', limit: 24, includeTags: '', excludeTags: '',
},
options: { actors: [], services: [], operations: [], results: [] },
savedSearches: [],
appVersion: '',
repoUrl: 'https://git.cqre.net/cqrenet/aoc',
docsUrl: 'https://git.cqre.net/cqrenet/aoc/src/branch/main/README.md',
aiFeaturesEnabled: true,
askQuestionText: '',
askLoading: false,
@@ -353,6 +396,11 @@
if (featRes.ok) {
const featBody = await featRes.json();
this.aiFeaturesEnabled = featBody.ai_features_enabled !== false;
if (featBody.default_page_size) {
this.filters.limit = featBody.default_page_size;
} else {
this.filters.limit = 24;
}
} else {
this.aiFeaturesEnabled = true;
}
@@ -521,9 +569,8 @@
const saved = localStorage.getItem('aoc_filters');
if (!saved && this.options.services.length) {
// Default: exclude noisy high-volume services
const noisy = ['Exchange', 'SharePoint', 'Teams'];
this.filters.selectedServices = this.options.services.filter((s) => !noisy.includes(s));
// Default: show all services (privacy controls handle exclusions server-side)
this.filters.selectedServices = [...this.options.services];
} else if (saved) {
try {
const parsed = JSON.parse(saved);
@@ -617,8 +664,23 @@
},
clearFilters() {
const noisy = ['Exchange', 'SharePoint', 'Teams'];
this.filters = { actor: '', selectedServices: this.options.services.filter((s) => !noisy.includes(s)), search: '', operation: '', result: '', start: '', end: '', limit: 100, includeTags: '', excludeTags: '' };
this.filters = { actor: '', selectedServices: [...this.options.services], search: '', operation: '', result: '', start: '', end: '', limit: 24, includeTags: '', excludeTags: '' };
this.saveFilters();
this.resetPagination();
this.loadEvents();
},
filterByService(service) {
if (!service) return;
this.filters.selectedServices = [service];
this.saveFilters();
this.resetPagination();
this.loadEvents();
},
filterByResult(result) {
if (!result) return;
this.filters.result = this.filters.result === result ? '' : result;
this.saveFilters();
this.resetPagination();
this.loadEvents();

View File

@@ -28,7 +28,115 @@ body {
.page {
max-width: 1100px;
margin: 0 auto;
padding: 32px 20px 60px;
padding: 0 20px 40px;
display: flex;
flex-direction: column;
min-height: 100vh;
}
.topbar {
display: flex;
align-items: center;
gap: 16px;
padding: 12px 0;
margin-bottom: 8px;
border-bottom: 1px solid var(--border);
flex-wrap: wrap;
}
.topbar__brand {
display: flex;
align-items: center;
gap: 8px;
font-weight: 700;
font-size: 16px;
}
.topbar__logo {
font-size: 20px;
}
.topbar__links {
display: flex;
gap: 16px;
margin-right: auto;
}
.topbar__links a {
color: var(--muted);
font-size: 13px;
text-decoration: none;
font-weight: 500;
transition: color 0.15s ease;
}
.topbar__links a:hover {
color: var(--accent-strong);
}
.topbar__meta {
display: flex;
align-items: center;
gap: 10px;
}
.user-chip {
display: flex;
align-items: center;
gap: 8px;
background: rgba(255, 255, 255, 0.04);
border: 1px solid var(--border);
border-radius: 999px;
padding: 4px 12px 4px 4px;
}
.user-avatar {
width: 26px;
height: 26px;
border-radius: 50%;
background: linear-gradient(135deg, var(--accent), var(--accent-strong));
color: #0b1220;
font-size: 12px;
font-weight: 700;
display: flex;
align-items: center;
justify-content: center;
flex-shrink: 0;
}
.user-details {
display: flex;
flex-direction: column;
line-height: 1.2;
}
.user-name {
font-size: 12px;
font-weight: 600;
color: var(--text);
}
.user-email {
font-size: 11px;
color: var(--muted);
}
.login-hint {
font-size: 12px;
color: var(--muted);
font-style: italic;
}
.topbar__actions {
display: flex;
gap: 8px;
align-items: center;
}
.btn--compact {
padding: 8px 14px;
font-size: 13px;
border-radius: 8px;
}
.hero {
@@ -37,6 +145,7 @@ body {
justify-content: space-between;
gap: 16px;
margin-bottom: 20px;
padding-top: 16px;
}
.eyebrow {
@@ -246,6 +355,27 @@ input {
border-color: rgba(239, 68, 68, 0.5);
}
.pill--clickable {
cursor: pointer;
transition: transform 0.1s ease, box-shadow 0.15s ease, background 0.15s ease;
}
.pill--clickable:hover {
transform: translateY(-1px);
box-shadow: 0 2px 8px rgba(125, 211, 252, 0.2);
background: rgba(125, 211, 252, 0.2);
}
.pill--clickable.pill--ok:hover {
box-shadow: 0 2px 8px rgba(34, 197, 94, 0.2);
background: rgba(34, 197, 94, 0.25);
}
.pill--clickable.pill--warn:hover {
box-shadow: 0 2px 8px rgba(249, 115, 22, 0.2);
background: rgba(249, 115, 22, 0.25);
}
.event h3 {
margin: 0 0 6px;
font-size: 17px;
@@ -508,7 +638,70 @@ input {
gap: 4px;
}
.footer {
margin-top: auto;
padding: 20px 0;
border-top: 1px solid var(--border);
display: flex;
align-items: center;
justify-content: space-between;
gap: 16px;
flex-wrap: wrap;
font-size: 13px;
color: var(--muted);
}
.footer__left {
display: flex;
align-items: center;
gap: 10px;
}
.footer__brand {
font-weight: 600;
color: var(--text);
}
.footer__version {
font-size: 11px;
padding: 2px 8px;
border-radius: 999px;
background: rgba(125, 211, 252, 0.1);
border: 1px solid rgba(125, 211, 252, 0.2);
color: var(--accent-strong);
}
.footer__center {
display: flex;
gap: 16px;
align-items: center;
}
.footer__center a {
color: var(--muted);
text-decoration: none;
transition: color 0.15s ease;
}
.footer__center a:hover {
color: var(--accent-strong);
}
.footer__right {
font-size: 12px;
}
@media (max-width: 640px) {
.topbar {
flex-direction: column;
align-items: flex-start;
gap: 10px;
}
.topbar__links {
margin-right: 0;
}
.hero {
flex-direction: column;
}
@@ -522,4 +715,10 @@ input {
flex-direction: column;
align-items: stretch;
}
.footer {
flex-direction: column;
text-align: center;
gap: 10px;
}
}

117
backend/jobs.py Normal file
View File

@@ -0,0 +1,117 @@
"""arq job functions for async LLM processing."""
import hashlib
import json
import structlog
from arq.connections import RedisSettings
from config import REDIS_URL
logger = structlog.get_logger("aoc.jobs")
# ---------------------------------------------------------------------------
# Cache helpers
# ---------------------------------------------------------------------------
CACHE_TTL_ASK = 3600 # 1 hour
CACHE_TTL_EXPLAIN = 86400 # 24 hours
def _ask_cache_key(question: str, filters: dict, events: list) -> str:
payload = json.dumps({"q": question, "f": filters, "e": [e.get("id") for e in events]}, sort_keys=True)
return f"aoc:cache:ask:{hashlib.md5(payload.encode()).hexdigest()}"
def _explain_cache_key(event_id: str) -> str:
return f"aoc:cache:explain:{event_id}"
async def get_cached_ask(redis, question: str, filters: dict, events: list) -> dict | None:
key = _ask_cache_key(question, filters, events)
raw = await redis.get(key)
if raw:
return json.loads(raw)
return None
async def set_cached_ask(redis, question: str, filters: dict, events: list, result: dict):
key = _ask_cache_key(question, filters, events)
await redis.setex(key, CACHE_TTL_ASK, json.dumps(result, default=str))
async def get_cached_explain(redis, event_id: str) -> dict | None:
key = _explain_cache_key(event_id)
raw = await redis.get(key)
if raw:
return json.loads(raw)
return None
async def set_cached_explain(redis, event_id: str, result: dict):
key = _explain_cache_key(event_id)
await redis.setex(key, CACHE_TTL_EXPLAIN, json.dumps(result, default=str))
# ---------------------------------------------------------------------------
# arq job functions
# ---------------------------------------------------------------------------
async def process_ask_question(
ctx, question: str, filters: dict, events: list, total: int, excluded_services: list | None
):
"""Background job: call LLM for /api/ask and cache result."""
from routes.ask import _call_llm
redis = ctx["redis"]
try:
answer = await _call_llm(question, events, total=total, excluded_services=excluded_services)
result = {"status": "completed", "answer": answer, "llm_used": True, "llm_error": None}
except Exception as exc:
logger.warning("Async ask LLM failed", error=str(exc))
result = {"status": "failed", "answer": "", "llm_used": False, "llm_error": str(exc)}
await set_cached_ask(redis, question, filters, events, result)
return result
async def process_explain_event(ctx, event_id: str, event: dict, related: list):
"""Background job: call LLM for /api/events/{id}/explain and cache result."""
from routes.ask import _explain_event
redis = ctx["redis"]
try:
explanation = await _explain_event(event, related)
result = {"status": "completed", "explanation": explanation, "llm_used": True, "llm_error": None}
except Exception as exc:
logger.warning("Async explain LLM failed", error=str(exc))
result = {"status": "failed", "explanation": "", "llm_used": False, "llm_error": str(exc)}
await set_cached_explain(redis, event_id, result)
return result
# ---------------------------------------------------------------------------
# arq worker configuration
# ---------------------------------------------------------------------------
async def startup(ctx):
from redis.asyncio import Redis
ctx["redis"] = Redis.from_url(REDIS_URL, decode_responses=True)
async def shutdown(ctx):
await ctx["redis"].close()
class WorkerSettings:
functions = [process_ask_question, process_explain_event]
redis_settings = RedisSettings.from_dsn(REDIS_URL)
on_startup = startup
on_shutdown = shutdown
max_jobs = 10
job_timeout = 120
keep_result = 3600
keep_result_forever = False

View File

@@ -19,6 +19,7 @@ from routes.events import router as events_router
from routes.fetch import router as fetch_router
from routes.fetch import run_fetch
from routes.health import router as health_router
from routes.jobs import router as jobs_router
from routes.rules import router as rules_router
from routes.saved_searches import router as saved_searches_router
from routes.webhooks import router as webhooks_router
@@ -122,6 +123,7 @@ if AI_FEATURES_ENABLED:
app.mount("/mcp", mcp_asgi)
app.include_router(saved_searches_router, prefix="/api")
app.include_router(rules_router, prefix="/api")
app.include_router(jobs_router, prefix="/api")
@app.get("/health")
@@ -176,3 +178,6 @@ async def stop_periodic_fetch():
task.cancel()
with suppress(Exception):
await task
from redis_client import close_redis_connections
await close_redis_connections()

View File

@@ -82,6 +82,7 @@ class AskRequest(BaseModel):
end: str | None = None
include_tags: list[str] | None = None
exclude_tags: list[str] | None = None
async_mode: bool = False # enqueue async job instead of waiting
class AskEventRef(BaseModel):
@@ -101,3 +102,4 @@ class AskResponse(BaseModel):
query_info: dict
llm_used: bool
llm_error: str | None = None
job_id: str | None = None

36
backend/redis_client.py Normal file
View File

@@ -0,0 +1,36 @@
"""Async Redis client singleton for caching and job queue."""
import redis.asyncio as aioredis
from arq import create_pool
from arq.connections import ArqRedis, RedisSettings
from config import REDIS_URL
_arq_pool: ArqRedis | None = None
_plain_redis: aioredis.Redis | None = None
async def get_arq_pool() -> ArqRedis:
"""Return a shared arq pool (ArqRedis extends redis.asyncio.Redis)."""
global _arq_pool
if _arq_pool is None:
_arq_pool = await create_pool(RedisSettings.from_dsn(REDIS_URL))
return _arq_pool
async def get_redis() -> aioredis.Redis:
"""Return a shared plain async Redis client."""
global _plain_redis
if _plain_redis is None:
_plain_redis = aioredis.from_url(REDIS_URL, decode_responses=True)
return _plain_redis
async def close_redis_connections():
"""Close all Redis connections (call on shutdown)."""
global _arq_pool, _plain_redis
if _arq_pool:
await _arq_pool.close()
_arq_pool = None
if _plain_redis:
await _plain_redis.close()
_plain_redis = None

View File

@@ -14,3 +14,5 @@ prometheus-client
httpx
gunicorn
mcp
redis
arq

View File

@@ -18,7 +18,9 @@ from config import (
)
from database import events_collection
from fastapi import APIRouter, Depends, HTTPException
from jobs import get_cached_ask, get_cached_explain, set_cached_ask, set_cached_explain
from models.api import AskRequest, AskResponse
from redis_client import get_arq_pool
router = APIRouter(dependencies=[Depends(require_auth)])
logger = structlog.get_logger("aoc.ask")
@@ -640,14 +642,23 @@ async def explain_event(event_id: str, user: dict = Depends(require_auth)):
"llm_error": "LLM_API_KEY not configured",
}
# Check cache first
redis = await get_arq_pool()
cached = await get_cached_explain(redis, event_id)
if cached:
cached["related_count"] = len(related)
return cached
try:
explanation = await _explain_event(event, related)
return {
result = {
"explanation": explanation,
"llm_used": True,
"llm_error": None,
"related_count": len(related),
}
await set_cached_explain(redis, event_id, result)
return result
except Exception as exc:
logger.warning("Event explanation failed", error=str(exc))
return {
@@ -746,19 +757,78 @@ async def ask_question(body: AskRequest, user: dict = Depends(require_auth)):
llm_error="LLM not used — no events found." if not LLM_API_KEY else None,
)
# Try LLM summarisation
# Try LLM summarisation (with caching + optional async)
answer = ""
llm_used = False
llm_error = None
if not LLM_API_KEY:
llm_error = "LLM_API_KEY is not configured. Set it in your .env to enable AI narrative summarisation."
job_id = None
filters_snapshot = {
"services": body.services,
"actor": body.actor,
"operation": body.operation,
"result": body.result,
"start": body.start,
"end": body.end,
"include_tags": body.include_tags,
"exclude_tags": body.exclude_tags,
}
if LLM_API_KEY:
redis = await get_arq_pool()
cached = await get_cached_ask(redis, question, filters_snapshot, events)
if cached:
answer = cached.get("answer", "")
llm_used = cached.get("llm_used", False)
llm_error = cached.get("llm_error")
elif body.async_mode:
pool = await get_arq_pool()
job = await pool.enqueue_job(
"process_ask_question",
question,
filters_snapshot,
events,
total,
excluded_services,
)
job_id = job.job_id if job else None
return AskResponse(
answer="Your question is being processed. Poll /api/jobs/{job_id} for the result.",
events=[_to_event_ref(e) for e in events],
query_info={
"entity": entity,
"start": start,
"end": end,
"event_count": len(events),
"total_matched": total,
"services_queried": query_services,
"excluded_services": excluded_services,
"mongo_query": json.dumps(query, default=str),
},
llm_used=False,
llm_error=None,
job_id=job_id,
)
else:
try:
answer = await _call_llm(question, events, total=total, excluded_services=excluded_services)
llm_used = True
await set_cached_ask(
redis,
question,
filters_snapshot,
events,
{
"answer": answer,
"llm_used": True,
"llm_error": None,
},
)
except Exception as exc:
llm_error = f"LLM call failed: {exc}"
logger.warning("LLM call failed, falling back to structured summary", error=str(exc))
else:
try:
answer = await _call_llm(question, events, total=total, excluded_services=excluded_services)
llm_used = True
except Exception as exc:
llm_error = f"LLM call failed: {exc}"
logger.warning("LLM call failed, falling back to structured summary", error=str(exc))
llm_error = "LLM_API_KEY is not configured. Set it in your .env to enable AI narrative summarisation."
# Fallback: structured summary if LLM unavailable or failed
if not answer:
@@ -797,4 +867,5 @@ async def ask_question(body: AskRequest, user: dict = Depends(require_auth)):
},
llm_used=llm_used,
llm_error=llm_error,
job_id=job_id,
)

View File

@@ -4,6 +4,7 @@ from config import (
AUTH_ENABLED,
AUTH_SCOPE,
AUTH_TENANT_ID,
DEFAULT_PAGE_SIZE,
)
from fastapi import APIRouter
@@ -25,4 +26,5 @@ def auth_config():
def features_config():
return {
"ai_features_enabled": AI_FEATURES_ENABLED,
"default_page_size": DEFAULT_PAGE_SIZE,
}

43
backend/routes/jobs.py Normal file
View File

@@ -0,0 +1,43 @@
"""Job status endpoints for async LLM operations."""
from arq.jobs import Job, JobStatus
from auth import require_auth
from fastapi import APIRouter, Depends, HTTPException
from pydantic import BaseModel
from redis_client import get_redis
router = APIRouter(dependencies=[Depends(require_auth)])
class JobStatusResponse(BaseModel):
job_id: str
status: str # queued, in_progress, complete, not_found, deferred
result: dict | None = None
error: str | None = None
@router.get("/jobs/{job_id}", response_model=JobStatusResponse)
async def get_job_status(job_id: str, user: dict = Depends(require_auth)):
"""Poll for the result of an async LLM job."""
redis = await get_redis()
job = Job(job_id, redis)
status = await job.status()
if status == JobStatus.not_found:
raise HTTPException(status_code=404, detail="Job not found")
result = None
error = None
if status == JobStatus.complete:
try:
result_data = await job.result(timeout=0)
result = result_data if isinstance(result_data, dict) else {"data": str(result_data)}
except Exception as exc:
error = str(exc)
return JobStatusResponse(
job_id=job_id,
status=status.value,
result=result,
error=error,
)

View File

@@ -49,6 +49,21 @@ def client(mock_events_collection, mock_watermarks_collection, monkeypatch):
monkeypatch.setattr("rules.rules_collection", audit_db["alert_rules"])
monkeypatch.setattr("routes.rules.rules_collection", audit_db["alert_rules"])
# Mock Redis so tests don't require a running Redis server
class FakeRedis:
async def get(self, key):
return None
async def setex(self, key, ttl, value):
pass
async def fake_get_arq_pool():
return FakeRedis()
monkeypatch.setattr("redis_client.get_arq_pool", fake_get_arq_pool)
monkeypatch.setattr("routes.ask.get_arq_pool", fake_get_arq_pool)
monkeypatch.setattr("routes.jobs.get_redis", fake_get_arq_pool)
from main import app
return TestClient(app)

View File

@@ -89,6 +89,18 @@ def test_explain_event_with_llm_mock(client, mock_events_collection, monkeypatch
monkeypatch.setattr("routes.ask._explain_event", fake_explain)
class FakeRedis:
async def get(self, key):
return None
async def setex(self, key, ttl, value):
pass
async def fake_get_arq_pool():
return FakeRedis()
monkeypatch.setattr("routes.ask.get_arq_pool", fake_get_arq_pool)
mock_events_collection.insert_one(
{
"id": "evt-explain2",

View File

@@ -1,5 +1,7 @@
import asyncio
from datetime import UTC, datetime, timedelta
from jobs import set_cached_ask
from routes.ask import _build_event_query, _extract_entity, _extract_time_range
# ---------------------------------------------------------------------------
@@ -350,3 +352,131 @@ class TestAskEndpoint:
data = response.json()
assert data["query_info"]["event_count"] == 1
assert data["events"][0]["id"] == "evt-bob"
class TestAskCaching:
def test_ask_cache_hit_returns_cached_answer(self, client, mock_events_collection, monkeypatch):
"""If the answer is cached, the LLM should not be called."""
now = datetime.now(UTC)
mock_events_collection.insert_one(
{
"id": "evt-cache",
"timestamp": now.isoformat(),
"service": "Directory",
"operation": "Add user",
"result": "success",
"actor_display": "Alice",
"target_displays": ["USER-001"],
"display_summary": "summary",
"raw_text": "raw",
}
)
llm_called = False
async def fake_llm(question, events, total=None, excluded_services=None):
nonlocal llm_called
llm_called = True
return "This should NOT appear."
monkeypatch.setattr("routes.ask.LLM_API_KEY", "fake-key")
monkeypatch.setattr("routes.ask._call_llm", fake_llm)
# Pre-populate cache with a specific answer
class CachingFakeRedis:
def __init__(self):
self.store = {}
async def get(self, key):
return self.store.get(key)
async def setex(self, key, ttl, value):
self.store[key] = value
redis = CachingFakeRedis()
# Seed cache with the exact filters the endpoint will generate
filters_snapshot = {
"services": None,
"actor": None,
"operation": None,
"result": None,
"start": None,
"end": None,
"include_tags": None,
"exclude_tags": None,
}
asyncio.run(
set_cached_ask(
redis,
"What happened to USER-001?",
filters_snapshot,
[{"id": "evt-cache"}],
{"answer": "Cached answer!", "llm_used": True, "llm_error": None},
)
)
async def fake_get_arq_pool():
return redis
monkeypatch.setattr("routes.ask.get_arq_pool", fake_get_arq_pool)
response = client.post("/api/ask", json={"question": "What happened to USER-001?"})
assert response.status_code == 200
data = response.json()
assert data["answer"] == "Cached answer!"
assert data["llm_used"] is True
assert llm_called is False
def test_ask_async_mode_returns_job_id(self, client, mock_events_collection, monkeypatch):
"""Async mode should return immediately with a job_id."""
now = datetime.now(UTC)
mock_events_collection.insert_one(
{
"id": "evt-async",
"timestamp": now.isoformat(),
"service": "Directory",
"operation": "Add user",
"result": "success",
"actor_display": "Alice",
"target_displays": ["USER-001"],
"display_summary": "summary",
"raw_text": "raw",
}
)
monkeypatch.setattr("routes.ask.LLM_API_KEY", "fake-key")
# Mock arq pool to capture enqueue_job call
class FakeArqPool:
def __init__(self):
self.enqueued = []
async def get(self, key):
return None
async def setex(self, key, ttl, value):
pass
async def enqueue_job(self, func, *args, **kwargs):
from unittest.mock import MagicMock
job = MagicMock()
job.job_id = "job-12345"
self.enqueued.append((func, args, kwargs))
return job
pool = FakeArqPool()
async def fake_get_arq_pool():
return pool
monkeypatch.setattr("routes.ask.get_arq_pool", fake_get_arq_pool)
response = client.post("/api/ask", json={"question": "What happened to USER-001?", "async_mode": True})
assert response.status_code == 200
data = response.json()
assert data["job_id"] == "job-12345"
assert data["llm_used"] is False
assert "being processed" in data["answer"]
assert len(pool.enqueued) == 1
assert pool.enqueued[0][0] == "process_ask_question"

View File

@@ -1,4 +1,19 @@
services:
redis:
image: valkey/valkey:8-alpine
container_name: aoc-redis
restart: always
volumes:
- redis_data:/data
networks:
- aoc-internal
healthcheck:
test: ["CMD", "redis-cli", "ping"]
interval: 10s
timeout: 3s
retries: 5
start_period: 5s
mongo:
image: mongo:7
container_name: aoc-mongo
@@ -27,9 +42,12 @@ services:
- .env
environment:
MONGO_URI: mongodb://${MONGO_ROOT_USERNAME}:${MONGO_ROOT_PASSWORD}@mongo:27017/
REDIS_URL: redis://redis:6379/0
depends_on:
mongo:
condition: service_healthy
redis:
condition: service_healthy
networks:
- aoc-internal
healthcheck:
@@ -39,6 +57,24 @@ services:
retries: 3
start_period: 10s
worker:
image: git.cqre.net/cqrenet/aoc-backend:${AOC_VERSION:-latest}
container_name: aoc-worker
restart: always
env_file:
- .env
environment:
MONGO_URI: mongodb://${MONGO_ROOT_USERNAME}:${MONGO_ROOT_PASSWORD}@mongo:27017/
REDIS_URL: redis://redis:6379/0
command: ["arq", "jobs.WorkerSettings"]
depends_on:
redis:
condition: service_healthy
mongo:
condition: service_healthy
networks:
- aoc-internal
nginx:
image: nginx:alpine
container_name: aoc-nginx
@@ -58,6 +94,7 @@ services:
volumes:
mongo_data:
redis_data:
networks:
aoc-internal:

View File

@@ -1,4 +1,13 @@
services:
redis:
image: valkey/valkey:8-alpine
container_name: aoc-redis
restart: always
ports:
- "6379:6379"
volumes:
- redis_data:/data
mongo:
image: mongo:7
container_name: aoc-mongo
@@ -21,10 +30,27 @@ services:
- .env
environment:
MONGO_URI: mongodb://${MONGO_ROOT_USERNAME}:${MONGO_ROOT_PASSWORD}@mongo:${MONGO_PORT}/
REDIS_URL: redis://redis:6379/0
depends_on:
- mongo
- redis
ports:
- "8000:8000"
worker:
build: ./backend
container_name: aoc-worker
restart: always
env_file:
- .env
environment:
MONGO_URI: mongodb://${MONGO_ROOT_USERNAME}:${MONGO_ROOT_PASSWORD}@mongo:${MONGO_PORT}/
REDIS_URL: redis://redis:6379/0
command: ["arq", "jobs.WorkerSettings"]
depends_on:
- redis
- mongo
volumes:
mongo_data:
redis_data: