feat: implement Phase 3 scaling
Some checks failed
CI / lint-and-test (push) Has been cancelled

- Replace skip-based pagination with cursor-based pagination (timestamp|_id cursors)
- Add Prometheus /metrics endpoint with request latency, fetch volume, and error counters
- Implement incremental fetch watermarking per source (watermarks collection in MongoDB)
- Add Graph change notification webhook endpoint (/api/webhooks/graph)
- Add correlation ID middleware for distributed tracing (x-request-id header)
- Update frontend to use cursor-based pagination with Prev/Next navigation
- Update tests for cursor pagination, metrics, webhooks, and watermark mocking
This commit is contained in:
2026-04-14 14:58:50 +02:00
parent 9271b4e461
commit b0198012eb
17 changed files with 402 additions and 147 deletions

View File

@@ -1,3 +1,4 @@
import base64
import re
from auth import require_auth
@@ -8,6 +9,20 @@ from models.api import FilterOptionsResponse, PaginatedEventResponse
router = APIRouter(dependencies=[Depends(require_auth)])
def _encode_cursor(timestamp: str, oid: str) -> str:
payload = f"{timestamp}|{oid}"
return base64.b64encode(payload.encode()).decode()
def _decode_cursor(cursor: str) -> tuple[str, str]:
try:
payload = base64.b64decode(cursor.encode()).decode()
timestamp, oid = payload.split("|", 1)
return timestamp, oid
except Exception as exc:
raise HTTPException(status_code=400, detail="Invalid cursor") from exc
@router.get("/events", response_model=PaginatedEventResponse)
def list_events(
service: str | None = None,
@@ -17,7 +32,7 @@ def list_events(
start: str | None = None,
end: str | None = None,
search: str | None = None,
page: int = Query(default=1, ge=1),
cursor: str | None = None,
page_size: int = Query(default=50, ge=1, le=500),
):
filters = []
@@ -61,26 +76,47 @@ def list_events(
}
)
if cursor:
try:
cursor_ts, cursor_oid = _decode_cursor(cursor)
except HTTPException:
raise
filters.append(
{
"$or": [
{"timestamp": {"$lt": cursor_ts}},
{"timestamp": cursor_ts, "_id": {"$lt": cursor_oid}},
]
}
)
query = {"$and": filters} if filters else {}
safe_page_size = max(1, min(page_size, 500))
safe_page = max(1, page)
skip = (safe_page - 1) * safe_page_size
try:
total = events_collection.count_documents(query)
cursor = events_collection.find(query).sort("timestamp", -1).skip(skip).limit(safe_page_size)
events = list(cursor)
total = events_collection.count_documents(query) if not cursor else -1
cursor_query = (
events_collection.find(query)
.sort([("timestamp", -1), ("_id", -1)])
.limit(safe_page_size)
)
events = list(cursor_query)
except Exception as exc:
raise HTTPException(status_code=500, detail=f"Failed to query events: {exc}") from exc
next_cursor = None
if len(events) == safe_page_size:
last = events[-1]
next_cursor = _encode_cursor(last["timestamp"], str(last["_id"]))
for e in events:
e["_id"] = str(e["_id"])
return {
"items": events,
"total": total,
"page": safe_page,
"page_size": safe_page_size,
"next_cursor": next_cursor,
}

View File

@@ -1,31 +1,46 @@
import time
from auth import require_auth
from database import events_collection
from fastapi import APIRouter, Depends, HTTPException, Query
from graph.audit_logs import fetch_audit_logs
from metrics import track_fetch, track_fetch_duration, track_fetch_error
from models.api import FetchAuditLogsResponse
from models.event_model import normalize_event
from pymongo import UpdateOne
from sources.intune_audit import fetch_intune_audit
from sources.unified_audit import fetch_unified_audit
from watermark import get_watermark, set_watermark
router = APIRouter(dependencies=[Depends(require_auth)])
def run_fetch(hours: int = 168):
from datetime import datetime
window = max(1, min(hours, 720)) # cap to 30 days for sanity
now = datetime.utcnow().isoformat() + "Z"
logs = []
errors = []
def fetch_source(fn, label):
def fetch_source(fn, label, source_key):
start_time = time.time()
try:
return fn(hours=window)
since = get_watermark(source_key)
result = fn(since=since) if since else fn(hours=window)
set_watermark(source_key, now)
track_fetch(source_key, len(result))
return result
except Exception as exc:
errors.append(f"{label}: {exc}")
track_fetch_error(source_key)
return []
finally:
track_fetch_duration(source_key, time.time() - start_time)
logs.extend(fetch_source(fetch_audit_logs, "Directory audit"))
logs.extend(fetch_source(fetch_unified_audit, "Unified audit (Exchange/SharePoint/Teams)"))
logs.extend(fetch_source(fetch_intune_audit, "Intune audit"))
logs.extend(fetch_source(fetch_audit_logs, "Directory audit", "directory"))
logs.extend(fetch_source(fetch_unified_audit, "Unified audit", "unified"))
logs.extend(fetch_source(fetch_intune_audit, "Intune audit", "intune"))
normalized = [normalize_event(e) for e in logs]
if normalized:

View File

@@ -0,0 +1,32 @@
import structlog
from fastapi import APIRouter, Request, Response
router = APIRouter()
logger = structlog.get_logger("aoc.webhooks")
@router.post("/webhooks/graph")
async def graph_webhook(request: Request):
"""
Receive Microsoft Graph change notifications.
Handles the validation handshake by echoing validationToken.
"""
validation_token = request.query_params.get("validationToken")
if validation_token:
return Response(content=validation_token, media_type="text/plain")
try:
body = await request.json()
except Exception as exc:
logger.warning("Invalid webhook payload", error=str(exc))
return Response(status_code=400)
for notification in body.get("value", []):
logger.info(
"Received Graph notification",
change_type=notification.get("changeType"),
resource=notification.get("resource"),
client_state=notification.get("clientState"),
)
return {"status": "accepted"}