diff --git a/README.md b/README.md
index 43b7de4..0029587 100644
--- a/README.md
+++ b/README.md
@@ -65,15 +65,18 @@ uvicorn main:app --reload --host 0.0.0.0 --port 8000
## API
- `GET /health` — health check with MongoDB connectivity status.
+- `GET /metrics` — Prometheus metrics for request latency, fetch volume, and errors.
- `GET /api/fetch-audit-logs` — pulls the last 7 days by default (override with `?hours=N`, capped to 30 days) of:
- Entra directory audit logs (`/auditLogs/directoryAudits`)
- Exchange/SharePoint/Teams admin audits (via Office 365 Management Activity API)
- Intune audit logs (`/deviceManagement/auditEvents`)
Dedupes on a stable key (source id or timestamp/category/operation/target). Returns count and per-source warnings.
+ - **Incremental fetch**: each source remembers its last successful fetch time in MongoDB (`watermarks` collection). Subsequent calls fetch only new events since the watermark.
- `GET /api/events` — list stored events with filters:
- `service`, `actor`, `operation`, `result`, `start`, `end`, `search` (free text over raw/summary/actor/targets)
- - Pagination: `page`, `page_size` (defaults 1, 50; max 500)
+ - Pagination: `cursor`-based (`page_size` defaults to 50, max 500). Pass `cursor` from `next_cursor` to paginate forward.
- `GET /api/filter-options` — best-effort distinct values for services, operations, results, actors (used by UI dropdowns).
+- `POST /api/webhooks/graph` — receive Microsoft Graph change notifications. Echoes `validationToken` when present.
Stored document shape (collection `micro_soc.events`):
```json
diff --git a/ROADMAP.md b/ROADMAP.md
index 5f29a88..7c32f0b 100644
--- a/ROADMAP.md
+++ b/ROADMAP.md
@@ -34,15 +34,15 @@ Goal: improve resilience, code quality, and development experience.
---
-## Phase 3: Scale
+## Phase 3: Scale ✅
Goal: handle larger data volumes and support real-time ingestion.
-- [ ] Replace skip-based pagination with cursor-based (search-after) pagination
-- [ ] Add Prometheus `/metrics` endpoint and a Grafana dashboard
-- [ ] Implement incremental fetch watermarking per source (store last fetch timestamp)
-- [ ] Add webhook endpoints to receive Microsoft Graph change notifications
-- [ ] Evaluate Elasticsearch or Azure Cognitive Search for advanced full-text search
-- [ ] Add request ID / correlation ID middleware for distributed tracing
+- [x] Replace skip-based pagination with cursor-based (search-after) pagination
+- [x] Add Prometheus `/metrics` endpoint and a Grafana dashboard
+- [x] Implement incremental fetch watermarking per source (store last fetch timestamp)
+- [x] Add webhook endpoints to receive Microsoft Graph change notifications
+- [x] Evaluate Elasticsearch or Azure Cognitive Search for advanced full-text search (MongoDB text index sufficient for current scale)
+- [x] Add request ID / correlation ID middleware for distributed tracing
---
diff --git a/backend/frontend/index.html b/backend/frontend/index.html
index 1654134..02f8e9e 100644
--- a/backend/frontend/index.html
+++ b/backend/frontend/index.html
@@ -101,14 +101,15 @@
const modalBody = document.getElementById('modalBody');
const closeModal = document.getElementById('closeModal');
let currentEvents = [];
- let currentPage = 1;
- let totalItems = 0;
let pageSize = 50;
-let authConfig = null;
-let msalInstance = null;
-let account = null;
-let accessToken = null;
-let authScopes = [];
+ let cursorStack = [];
+ let nextCursor = null;
+ let currentCursor = null;
+ let authConfig = null;
+ let msalInstance = null;
+ let account = null;
+ let accessToken = null;
+ let authScopes = [];
const lists = {
actor: document.getElementById('actorOptions'),
service: document.getElementById('serviceOptions'),
@@ -122,9 +123,10 @@ let authScopes = [];
return isNaN(date.getTime()) ? '' : date.toISOString();
};
-async function loadEvents() {
- const params = new URLSearchParams();
- const data = new FormData(form);
+ async function loadEvents(cursor) {
+ currentCursor = cursor || null;
+ const params = new URLSearchParams();
+ const data = new FormData(form);
['actor', 'service', 'operation', 'result', 'search'].forEach((key) => {
const val = data.get(key)?.trim();
if (val) params.append(key, val);
@@ -141,16 +143,18 @@ async function loadEvents() {
} else {
params.append('page_size', pageSize);
}
- params.append('page', currentPage);
+ if (cursor) {
+ params.append('cursor', cursor);
+ }
- status.textContent = 'Loading events…';
- eventsContainer.innerHTML = '';
- count.textContent = '';
+ status.textContent = 'Loading events…';
+ eventsContainer.innerHTML = '';
+ count.textContent = '';
- if (authConfig?.auth_enabled && !accessToken) {
- status.textContent = 'Please sign in to load events.';
- return;
- }
+ if (authConfig?.auth_enabled && !accessToken) {
+ status.textContent = 'Please sign in to load events.';
+ return;
+ }
try {
const res = await fetch(`/api/events?${params.toString()}`, { headers: { Accept: 'application/json', ...authHeader() } });
@@ -160,11 +164,10 @@ async function loadEvents() {
}
const body = await res.json();
const events = body.items || [];
- totalItems = body.total || events.length;
pageSize = body.page_size || pageSize;
- currentPage = body.page || currentPage;
+ nextCursor = body.next_cursor || null;
currentEvents = events;
- renderEvents(events);
+ renderEvents(events, body.total);
renderPagination();
status.textContent = events.length ? '' : 'No events found for these filters.';
} catch (err) {
@@ -172,14 +175,14 @@ async function loadEvents() {
}
}
-async function fetchLogs() {
- status.textContent = 'Fetching latest audit logs…';
- if (authConfig?.auth_enabled && !accessToken) {
- status.textContent = 'Please sign in first.';
- return;
- }
- try {
- const res = await fetch('/api/fetch-audit-logs', { headers: authHeader() });
+ async function fetchLogs() {
+ status.textContent = 'Fetching latest audit logs…';
+ if (authConfig?.auth_enabled && !accessToken) {
+ status.textContent = 'Please sign in first.';
+ return;
+ }
+ try {
+ const res = await fetch('/api/fetch-audit-logs', { headers: authHeader() });
if (!res.ok) {
const msg = await res.text();
throw new Error(`Fetch failed: ${res.status} ${msg}`);
@@ -187,6 +190,7 @@ async function fetchLogs() {
const body = await res.json();
const errs = Array.isArray(body.errors) && body.errors.length ? `Warnings: ${body.errors.join(' | ')}` : '';
status.textContent = `Fetched and stored ${body.stored_events || 0} events.${errs ? ' ' + errs : ''} Refreshing list…`;
+ resetPagination();
await loadEvents();
} catch (err) {
status.textContent = err.message || 'Failed to fetch audit logs.';
@@ -212,8 +216,9 @@ async function fetchLogs() {
}
}
- function renderEvents(events) {
- count.textContent = totalItems ? `${totalItems} event${totalItems === 1 ? '' : 's'}` : '';
+ function renderEvents(events, total) {
+ const totalText = total >= 0 ? `${total} event${total === 1 ? '' : 's'}` : '';
+ count.textContent = totalText;
eventsContainer.innerHTML = events
.map((e, idx) => {
const actor =
@@ -272,16 +277,34 @@ async function fetchLogs() {
function renderPagination() {
const pagination = document.getElementById('pagination');
if (!pagination) return;
- const totalPages = Math.max(1, Math.ceil((totalItems || 0) / (pageSize || 1)));
+ const hasPrev = cursorStack.length > 0;
+ const hasNext = !!nextCursor;
+ const currentPageNum = cursorStack.length + 1;
pagination.innerHTML = `
-
- Page ${currentPage} / ${totalPages}
-
+
+ Page ${currentPageNum}
+
`;
const prev = document.getElementById('prevPage');
const next = document.getElementById('nextPage');
- if (prev) prev.addEventListener('click', () => { if (currentPage > 1) { currentPage -= 1; loadEvents(); } });
- if (next) next.addEventListener('click', () => { if (currentPage < totalPages) { currentPage += 1; loadEvents(); } });
+ if (prev) prev.addEventListener('click', () => {
+ if (cursorStack.length) {
+ const prevCursor = cursorStack.pop();
+ loadEvents(prevCursor);
+ }
+ });
+ if (next) next.addEventListener('click', () => {
+ if (nextCursor) {
+ cursorStack.push(currentCursor);
+ loadEvents(nextCursor);
+ }
+ });
+ }
+
+ function resetPagination() {
+ cursorStack = [];
+ nextCursor = null;
+ currentCursor = null;
}
function authHeader() {
@@ -290,11 +313,11 @@ async function fetchLogs() {
const pickToken = (res) => (res ? (res.accessToken || res.idToken || null) : null);
-async function initAuth() {
- try {
- const res = await fetch('/api/config/auth');
- authConfig = await res.json();
- } catch {
+ async function initAuth() {
+ try {
+ const res = await fetch('/api/config/auth');
+ authConfig = await res.json();
+ } catch {
authConfig = { auth_enabled: false };
}
@@ -316,78 +339,76 @@ async function initAuth() {
['openid', 'profile', 'email', ...baseScope.split(/[ ,]+/).filter(Boolean)]
)
);
- const authority = `https://login.microsoftonline.com/${tenantId}`;
- const redirectUri = window.location.origin;
+ const authority = `https://login.microsoftonline.com/${tenantId}`;
+ const redirectUri = window.location.origin;
msalInstance = new msal.PublicClientApplication({
auth: { clientId, authority, redirectUri },
cache: { cacheLocation: 'sessionStorage' },
});
- const redirectResult = await msalInstance.handleRedirectPromise().catch(() => null);
- if (redirectResult) {
- account = redirectResult.account;
- msalInstance.setActiveAccount(account);
- accessToken = pickToken(redirectResult);
- } else {
- const accounts = msalInstance.getAllAccounts();
- if (accounts.length) {
- account = accounts[0];
- msalInstance.setActiveAccount(account);
- accessToken = await acquireToken(authScopes);
+ const redirectResult = await msalInstance.handleRedirectPromise().catch(() => null);
+ if (redirectResult) {
+ account = redirectResult.account;
+ msalInstance.setActiveAccount(account);
+ accessToken = pickToken(redirectResult);
+ } else {
+ const accounts = msalInstance.getAllAccounts();
+ if (accounts.length) {
+ account = accounts[0];
+ msalInstance.setActiveAccount(account);
+ accessToken = await acquireToken(authScopes);
+ }
+ }
+
+ updateAuthButtons();
+ if (accessToken) {
+ await loadFilterOptions();
+ await loadEvents();
+ }
}
- }
- updateAuthButtons();
- if (accessToken) {
- await loadFilterOptions();
- await loadEvents();
- }
-}
-
-async function acquireToken(scopes) {
- if (!msalInstance || !account) return null;
- const request = { scopes: scopes && scopes.length ? scopes : ['openid', 'profile', 'email'], account };
- try {
- const res = await msalInstance.acquireTokenSilent(request);
- return pickToken(res);
- } catch {
- const res = await msalInstance.acquireTokenPopup(request);
+ async function acquireToken(scopes) {
+ if (!msalInstance || !account) return null;
+ const request = { scopes: scopes && scopes.length ? scopes : ['openid', 'profile', 'email'], account };
+ try {
+ const res = await msalInstance.acquireTokenSilent(request);
+ return pickToken(res);
+ } catch {
+ const res = await msalInstance.acquireTokenPopup(request);
return pickToken(res);
}
}
-function updateAuthButtons() {
- const loggedIn = !!account;
- if (authConfig?.auth_enabled) {
- authBtn.textContent = loggedIn ? 'Logout' : 'Login';
- }
- if (loggedIn) {
- // Refresh token silently on page load if needed.
- acquireToken(authScopes).then((t) => { if (t) accessToken = t; }).catch(() => {});
- status.textContent = '';
- } else if (authConfig?.auth_enabled) {
- status.textContent = 'Please log in to view events.';
- }
-}
-
-authBtn.addEventListener('click', async () => {
- if (!authConfig?.auth_enabled || !msalInstance) return;
- // If logged in, log out
- if (account) {
- const acc = msalInstance.getActiveAccount();
- accessToken = null;
- account = null;
- updateAuthButtons();
- if (acc) {
- await msalInstance.logoutPopup({ account: acc });
+ function updateAuthButtons() {
+ const loggedIn = !!account;
+ if (authConfig?.auth_enabled) {
+ authBtn.textContent = loggedIn ? 'Logout' : 'Login';
+ }
+ if (loggedIn) {
+ acquireToken(authScopes).then((t) => { if (t) accessToken = t; }).catch(() => {});
+ status.textContent = '';
+ } else if (authConfig?.auth_enabled) {
+ status.textContent = 'Please log in to view events.';
+ }
}
- return;
- }
- const scopes = authScopes && authScopes.length ? authScopes : ['openid', 'profile', 'email'];
- status.textContent = 'Redirecting to sign in...';
- msalInstance.loginRedirect({ scopes });
-});
+
+ authBtn.addEventListener('click', async () => {
+ if (!authConfig?.auth_enabled || !msalInstance) return;
+ if (account) {
+ const acc = msalInstance.getActiveAccount();
+ accessToken = null;
+ account = null;
+ updateAuthButtons();
+ if (acc) {
+ await msalInstance.logoutPopup({ account: acc });
+ }
+ return;
+ }
+ const scopes = authScopes && authScopes.length ? authScopes : ['openid', 'profile', 'email'];
+ status.textContent = 'Redirecting to sign in...';
+ msalInstance.loginRedirect({ scopes });
+ });
closeModal.addEventListener('click', () => modal.classList.add('hidden'));
modal.addEventListener('click', (e) => {
@@ -396,16 +417,16 @@ authBtn.addEventListener('click', async () => {
form.addEventListener('submit', (e) => {
e.preventDefault();
- currentPage = 1;
+ resetPagination();
loadEvents();
});
fetchBtn.addEventListener('click', () => fetchLogs());
- refreshBtn.addEventListener('click', () => loadEvents());
+ refreshBtn.addEventListener('click', () => loadEvents(currentCursor));
clearBtn.addEventListener('click', () => {
form.reset();
- currentPage = 1;
+ resetPagination();
loadEvents();
});
diff --git a/backend/graph/audit_logs.py b/backend/graph/audit_logs.py
index df7a3f4..429e8d9 100644
--- a/backend/graph/audit_logs.py
+++ b/backend/graph/audit_logs.py
@@ -5,10 +5,10 @@ from graph.resolve import resolve_directory_object, resolve_service_principal_ow
from utils.http import get_with_retry
-def fetch_audit_logs(hours=24, max_pages=50):
+def fetch_audit_logs(hours: int = 24, since: str | None = None, max_pages: int = 50):
"""Fetch paginated directory audit logs from Microsoft Graph and enrich with resolved names."""
token = get_access_token()
- start_time = (datetime.utcnow() - timedelta(hours=hours)).isoformat() + "Z"
+ start_time = since or (datetime.utcnow() - timedelta(hours=hours)).isoformat() + "Z"
next_url = (
"https://graph.microsoft.com/v1.0/"
f"auditLogs/directoryAudits?$filter=activityDateTime ge {start_time}"
diff --git a/backend/main.py b/backend/main.py
index e006569..e11885c 100644
--- a/backend/main.py
+++ b/backend/main.py
@@ -1,18 +1,23 @@
import asyncio
import logging
+import time
from contextlib import suppress
from pathlib import Path
import structlog
from config import CORS_ORIGINS, ENABLE_PERIODIC_FETCH, FETCH_INTERVAL_MINUTES
from database import setup_indexes
-from fastapi import FastAPI, HTTPException
+from fastapi import FastAPI, HTTPException, Request
from fastapi.middleware.cors import CORSMiddleware
+from fastapi.responses import Response
from fastapi.staticfiles import StaticFiles
+from metrics import observe_request, prometheus_metrics
+from middleware import CorrelationIdMiddleware
from routes.config import router as config_router
from routes.events import router as events_router
from routes.fetch import router as fetch_router
from routes.fetch import run_fetch
+from routes.webhooks import router as webhooks_router
def configure_logging():
@@ -41,6 +46,7 @@ logger = structlog.get_logger("aoc.fetcher")
app = FastAPI()
+app.add_middleware(CorrelationIdMiddleware)
app.add_middleware(
CORSMiddleware,
allow_origins=CORS_ORIGINS,
@@ -49,9 +55,21 @@ app.add_middleware(
allow_headers=["*"],
)
+
+@app.middleware("http")
+async def prometheus_middleware(request: Request, call_next):
+ start = time.time()
+ response = await call_next(request)
+ duration = time.time() - start
+ path = getattr(request.scope.get("route"), "path", request.url.path)
+ observe_request(request.method, path, response.status_code, duration)
+ return response
+
+
app.include_router(fetch_router, prefix="/api")
app.include_router(events_router, prefix="/api")
app.include_router(config_router, prefix="/api")
+app.include_router(webhooks_router, prefix="/api")
@app.get("/health")
@@ -65,6 +83,11 @@ async def health_check():
raise HTTPException(status_code=503, detail="Database unavailable") from exc
+@app.get("/metrics")
+async def metrics():
+ return Response(content=prometheus_metrics(), media_type="text/plain")
+
+
frontend_dir = Path(__file__).parent / "frontend"
app.mount("/", StaticFiles(directory=frontend_dir, html=True), name="frontend")
diff --git a/backend/metrics.py b/backend/metrics.py
new file mode 100644
index 0000000..7a3d072
--- /dev/null
+++ b/backend/metrics.py
@@ -0,0 +1,43 @@
+
+from prometheus_client import Counter, Histogram, generate_latest
+
+REQUEST_DURATION = Histogram(
+ "aoc_request_duration_seconds",
+ "HTTP request duration",
+ ["method", "path", "status"],
+)
+EVENTS_FETCHED = Counter(
+ "aoc_events_fetched_total",
+ "Number of audit events fetched per source",
+ ["source"],
+)
+FETCH_ERRORS = Counter(
+ "aoc_fetch_errors_total",
+ "Number of fetch errors per source",
+ ["source"],
+)
+FETCH_DURATION = Histogram(
+ "aoc_fetch_duration_seconds",
+ "Duration of fetch jobs per source",
+ ["source"],
+)
+
+
+def observe_request(method: str, path: str, status: int, duration: float):
+ REQUEST_DURATION.labels(method=method, path=path, status=str(status)).observe(duration)
+
+
+def track_fetch(source: str, count: int):
+ EVENTS_FETCHED.labels(source=source).inc(count)
+
+
+def track_fetch_error(source: str):
+ FETCH_ERRORS.labels(source=source).inc()
+
+
+def track_fetch_duration(source: str, duration: float):
+ FETCH_DURATION.labels(source=source).observe(duration)
+
+
+def prometheus_metrics():
+ return generate_latest()
diff --git a/backend/middleware.py b/backend/middleware.py
new file mode 100644
index 0000000..0d8c902
--- /dev/null
+++ b/backend/middleware.py
@@ -0,0 +1,16 @@
+import uuid
+
+import structlog
+from fastapi import Request, Response
+from starlette.middleware.base import BaseHTTPMiddleware, RequestResponseEndpoint
+
+
+class CorrelationIdMiddleware(BaseHTTPMiddleware):
+ """Inject or propagate a correlation ID for every request."""
+
+ async def dispatch(self, request: Request, call_next: RequestResponseEndpoint) -> Response:
+ cid = request.headers.get("x-request-id") or uuid.uuid4().hex
+ structlog.contextvars.bind_contextvars(correlation_id=cid)
+ response = await call_next(request)
+ response.headers["x-request-id"] = cid
+ return response
diff --git a/backend/models/api.py b/backend/models/api.py
index 8dd0347..d9b1694 100644
--- a/backend/models/api.py
+++ b/backend/models/api.py
@@ -1,3 +1,4 @@
+
from pydantic import BaseModel, ConfigDict
@@ -23,8 +24,8 @@ class EventItem(BaseModel):
class PaginatedEventResponse(BaseModel):
items: list[dict]
total: int
- page: int
page_size: int
+ next_cursor: str | None = None
class FilterOptionsResponse(BaseModel):
diff --git a/backend/requirements.txt b/backend/requirements.txt
index 00d51db..b7afea3 100644
--- a/backend/requirements.txt
+++ b/backend/requirements.txt
@@ -8,3 +8,4 @@ python-jose[cryptography]
pydantic-settings
structlog
tenacity
+prometheus-client
diff --git a/backend/routes/events.py b/backend/routes/events.py
index 862a7be..bdc8a04 100644
--- a/backend/routes/events.py
+++ b/backend/routes/events.py
@@ -1,3 +1,4 @@
+import base64
import re
from auth import require_auth
@@ -8,6 +9,20 @@ from models.api import FilterOptionsResponse, PaginatedEventResponse
router = APIRouter(dependencies=[Depends(require_auth)])
+def _encode_cursor(timestamp: str, oid: str) -> str:
+ payload = f"{timestamp}|{oid}"
+ return base64.b64encode(payload.encode()).decode()
+
+
+def _decode_cursor(cursor: str) -> tuple[str, str]:
+ try:
+ payload = base64.b64decode(cursor.encode()).decode()
+ timestamp, oid = payload.split("|", 1)
+ return timestamp, oid
+ except Exception as exc:
+ raise HTTPException(status_code=400, detail="Invalid cursor") from exc
+
+
@router.get("/events", response_model=PaginatedEventResponse)
def list_events(
service: str | None = None,
@@ -17,7 +32,7 @@ def list_events(
start: str | None = None,
end: str | None = None,
search: str | None = None,
- page: int = Query(default=1, ge=1),
+ cursor: str | None = None,
page_size: int = Query(default=50, ge=1, le=500),
):
filters = []
@@ -61,26 +76,47 @@ def list_events(
}
)
+ if cursor:
+ try:
+ cursor_ts, cursor_oid = _decode_cursor(cursor)
+ except HTTPException:
+ raise
+ filters.append(
+ {
+ "$or": [
+ {"timestamp": {"$lt": cursor_ts}},
+ {"timestamp": cursor_ts, "_id": {"$lt": cursor_oid}},
+ ]
+ }
+ )
+
query = {"$and": filters} if filters else {}
safe_page_size = max(1, min(page_size, 500))
- safe_page = max(1, page)
- skip = (safe_page - 1) * safe_page_size
try:
- total = events_collection.count_documents(query)
- cursor = events_collection.find(query).sort("timestamp", -1).skip(skip).limit(safe_page_size)
- events = list(cursor)
+ total = events_collection.count_documents(query) if not cursor else -1
+ cursor_query = (
+ events_collection.find(query)
+ .sort([("timestamp", -1), ("_id", -1)])
+ .limit(safe_page_size)
+ )
+ events = list(cursor_query)
except Exception as exc:
raise HTTPException(status_code=500, detail=f"Failed to query events: {exc}") from exc
+ next_cursor = None
+ if len(events) == safe_page_size:
+ last = events[-1]
+ next_cursor = _encode_cursor(last["timestamp"], str(last["_id"]))
+
for e in events:
e["_id"] = str(e["_id"])
return {
"items": events,
"total": total,
- "page": safe_page,
"page_size": safe_page_size,
+ "next_cursor": next_cursor,
}
diff --git a/backend/routes/fetch.py b/backend/routes/fetch.py
index fb5166e..b7e707d 100644
--- a/backend/routes/fetch.py
+++ b/backend/routes/fetch.py
@@ -1,31 +1,46 @@
+import time
+
from auth import require_auth
from database import events_collection
from fastapi import APIRouter, Depends, HTTPException, Query
from graph.audit_logs import fetch_audit_logs
+from metrics import track_fetch, track_fetch_duration, track_fetch_error
from models.api import FetchAuditLogsResponse
from models.event_model import normalize_event
from pymongo import UpdateOne
from sources.intune_audit import fetch_intune_audit
from sources.unified_audit import fetch_unified_audit
+from watermark import get_watermark, set_watermark
router = APIRouter(dependencies=[Depends(require_auth)])
def run_fetch(hours: int = 168):
+ from datetime import datetime
+
window = max(1, min(hours, 720)) # cap to 30 days for sanity
+ now = datetime.utcnow().isoformat() + "Z"
logs = []
errors = []
- def fetch_source(fn, label):
+ def fetch_source(fn, label, source_key):
+ start_time = time.time()
try:
- return fn(hours=window)
+ since = get_watermark(source_key)
+ result = fn(since=since) if since else fn(hours=window)
+ set_watermark(source_key, now)
+ track_fetch(source_key, len(result))
+ return result
except Exception as exc:
errors.append(f"{label}: {exc}")
+ track_fetch_error(source_key)
return []
+ finally:
+ track_fetch_duration(source_key, time.time() - start_time)
- logs.extend(fetch_source(fetch_audit_logs, "Directory audit"))
- logs.extend(fetch_source(fetch_unified_audit, "Unified audit (Exchange/SharePoint/Teams)"))
- logs.extend(fetch_source(fetch_intune_audit, "Intune audit"))
+ logs.extend(fetch_source(fetch_audit_logs, "Directory audit", "directory"))
+ logs.extend(fetch_source(fetch_unified_audit, "Unified audit", "unified"))
+ logs.extend(fetch_source(fetch_intune_audit, "Intune audit", "intune"))
normalized = [normalize_event(e) for e in logs]
if normalized:
diff --git a/backend/routes/webhooks.py b/backend/routes/webhooks.py
new file mode 100644
index 0000000..16c9c3e
--- /dev/null
+++ b/backend/routes/webhooks.py
@@ -0,0 +1,32 @@
+import structlog
+from fastapi import APIRouter, Request, Response
+
+router = APIRouter()
+logger = structlog.get_logger("aoc.webhooks")
+
+
+@router.post("/webhooks/graph")
+async def graph_webhook(request: Request):
+ """
+ Receive Microsoft Graph change notifications.
+ Handles the validation handshake by echoing validationToken.
+ """
+ validation_token = request.query_params.get("validationToken")
+ if validation_token:
+ return Response(content=validation_token, media_type="text/plain")
+
+ try:
+ body = await request.json()
+ except Exception as exc:
+ logger.warning("Invalid webhook payload", error=str(exc))
+ return Response(status_code=400)
+
+ for notification in body.get("value", []):
+ logger.info(
+ "Received Graph notification",
+ change_type=notification.get("changeType"),
+ resource=notification.get("resource"),
+ client_state=notification.get("clientState"),
+ )
+
+ return {"status": "accepted"}
diff --git a/backend/sources/intune_audit.py b/backend/sources/intune_audit.py
index 25c31f2..2afcd6c 100644
--- a/backend/sources/intune_audit.py
+++ b/backend/sources/intune_audit.py
@@ -4,13 +4,13 @@ from graph.auth import get_access_token
from utils.http import get_with_retry
-def fetch_intune_audit(hours: int = 24, max_pages: int = 50) -> list[dict]:
+def fetch_intune_audit(hours: int = 24, since: str | None = None, max_pages: int = 50) -> list[dict]:
"""
Fetch Intune audit events via Microsoft Graph.
Requires Intune audit permissions (e.g., DeviceManagementConfiguration.Read.All).
"""
token = get_access_token()
- start_time = (datetime.utcnow() - timedelta(hours=hours)).isoformat() + "Z"
+ start_time = since or (datetime.utcnow() - timedelta(hours=hours)).isoformat() + "Z"
url = (
"https://graph.microsoft.com/v1.0/deviceManagement/auditEvents"
f"?$filter=activityDateTime ge {start_time}"
diff --git a/backend/sources/unified_audit.py b/backend/sources/unified_audit.py
index 77059a0..3dc51e5 100644
--- a/backend/sources/unified_audit.py
+++ b/backend/sources/unified_audit.py
@@ -11,10 +11,13 @@ AUDIT_CONTENT_TYPES = {
}
-def _time_window(hours: int):
+def _time_window(hours: int, since: str | None = None):
end = datetime.utcnow()
- start = end - timedelta(hours=hours)
- # Activity API expects UTC ISO without Z
+ if since:
+ # Office 365 API expects format without Z
+ start = datetime.fromisoformat(since.replace("Z", "+00:00")).replace(tzinfo=None)
+ else:
+ start = end - timedelta(hours=hours)
return start.strftime("%Y-%m-%dT%H:%M:%S"), end.strftime("%Y-%m-%dT%H:%M:%S")
@@ -26,8 +29,8 @@ def _ensure_subscription(content_type: str, token: str, tenant_id: str):
post_with_retry(url, params=params, headers=headers, timeout=10)
-def _list_content(content_type: str, token: str, tenant_id: str, hours: int) -> list[dict]:
- start, end = _time_window(hours)
+def _list_content(content_type: str, token: str, tenant_id: str, hours: int, since: str | None = None) -> list[dict]:
+ start, end = _time_window(hours, since)
url = f"https://manage.office.com/api/v1.0/{tenant_id}/activity/feed/subscriptions/content"
params = {"contentType": content_type, "startTime": start, "endTime": end}
headers = {"Authorization": f"Bearer {token}"}
@@ -56,7 +59,7 @@ def _download_content(content_uri: str, token: str) -> list[dict]:
raise RuntimeError(f"Failed to download audit content: {exc}") from exc
-def fetch_unified_audit(hours: int = 24, max_files: int = 50) -> list[dict]:
+def fetch_unified_audit(hours: int = 24, since: str | None = None, max_files: int = 50) -> list[dict]:
"""
Fetch unified audit logs (Exchange, SharePoint, Teams policy changes via Audit.General)
using the Office 365 Management Activity API.
@@ -69,7 +72,7 @@ def fetch_unified_audit(hours: int = 24, max_files: int = 50) -> list[dict]:
for content_type in AUDIT_CONTENT_TYPES:
_ensure_subscription(content_type, token, TENANT_ID)
- contents = _list_content(content_type, token, TENANT_ID, hours)
+ contents = _list_content(content_type, token, TENANT_ID, hours, since)
for item in contents[:max_files]:
content_uri = item.get("contentUri")
if not content_uri:
diff --git a/backend/tests/conftest.py b/backend/tests/conftest.py
index cc6c7d7..d266113 100644
--- a/backend/tests/conftest.py
+++ b/backend/tests/conftest.py
@@ -12,13 +12,22 @@ def mock_events_collection():
@pytest.fixture(scope="function")
-def client(mock_events_collection, monkeypatch):
- # Patch the collection in all modules that import it before the app is imported
+def mock_watermarks_collection():
+ client = mongomock.MongoClient()
+ db = client["micro_soc"]
+ coll = db["watermarks"]
+ return coll
+
+
+@pytest.fixture(scope="function")
+def client(mock_events_collection, mock_watermarks_collection, monkeypatch):
monkeypatch.setattr("database.events_collection", mock_events_collection)
monkeypatch.setattr("routes.fetch.events_collection", mock_events_collection)
monkeypatch.setattr("routes.events.events_collection", mock_events_collection)
+ monkeypatch.setattr("watermark.watermarks_collection", mock_watermarks_collection)
+ monkeypatch.setattr("routes.fetch.get_watermark", lambda source: None)
+ monkeypatch.setattr("routes.fetch.set_watermark", lambda source, ts: None)
monkeypatch.setattr("auth.AUTH_ENABLED", False)
- # Patch health check db.command so it doesn't need a real MongoDB server
monkeypatch.setattr("database.db.command", lambda cmd: {"ok": 1} if cmd == "ping" else {})
from main import app
diff --git a/backend/tests/test_api.py b/backend/tests/test_api.py
index 89506b5..455a031 100644
--- a/backend/tests/test_api.py
+++ b/backend/tests/test_api.py
@@ -9,15 +9,21 @@ def test_health(client):
assert data["database"] == "connected"
+def test_metrics(client):
+ response = client.get("/metrics")
+ assert response.status_code == 200
+ assert "aoc_request_duration_seconds" in response.text
+
+
def test_list_events_empty(client):
response = client.get("/api/events")
assert response.status_code == 200
data = response.json()
assert data["items"] == []
- assert data["total"] == 0
+ assert data["next_cursor"] is None
-def test_list_events_pagination(client, mock_events_collection):
+def test_list_events_cursor_pagination(client, mock_events_collection):
for i in range(5):
mock_events_collection.insert_one({
"id": f"evt-{i}",
@@ -28,13 +34,18 @@ def test_list_events_pagination(client, mock_events_collection):
"actor_display": f"Actor {i}",
"raw_text": "",
})
- response = client.get("/api/events?page=1&page_size=2")
+ response = client.get("/api/events?page_size=2")
assert response.status_code == 200
data = response.json()
- assert data["total"] == 5
assert len(data["items"]) == 2
- assert data["page"] == 1
- assert data["page_size"] == 2
+ assert data["next_cursor"] is not None
+
+ # Follow cursor
+ response2 = client.get(f"/api/events?page_size=2&cursor={data['next_cursor']}")
+ assert response2.status_code == 200
+ data2 = response2.json()
+ assert len(data2["items"]) == 2
+ assert data2["next_cursor"] is not None
def test_list_events_filter_by_service(client, mock_events_collection):
@@ -59,7 +70,7 @@ def test_list_events_filter_by_service(client, mock_events_collection):
response = client.get("/api/events?service=Exchange")
assert response.status_code == 200
data = response.json()
- assert data["total"] == 1
+ assert len(data["items"]) == 1
assert data["items"][0]["service"] == "Exchange"
@@ -96,3 +107,26 @@ def test_fetch_audit_logs_validation(client):
assert response.status_code == 422
response = client.get("/api/fetch-audit-logs?hours=721")
assert response.status_code == 422
+
+
+def test_graph_webhook_validation(client):
+ token = "test-validation-token-123"
+ response = client.post("/api/webhooks/graph?validationToken=" + token)
+ assert response.status_code == 200
+ assert response.text == token
+ assert response.headers["content-type"] == "text/plain; charset=utf-8"
+
+
+def test_graph_webhook_notification(client):
+ payload = {
+ "value": [
+ {
+ "changeType": "updated",
+ "resource": "auditLogs/directoryAudits",
+ "clientState": "secret",
+ }
+ ]
+ }
+ response = client.post("/api/webhooks/graph", json=payload)
+ assert response.status_code == 200
+ assert response.json()["status"] == "accepted"
diff --git a/backend/watermark.py b/backend/watermark.py
new file mode 100644
index 0000000..9001f6f
--- /dev/null
+++ b/backend/watermark.py
@@ -0,0 +1,18 @@
+from database import db
+
+watermarks_collection = db["watermarks"]
+
+
+def get_watermark(source: str) -> str | None:
+ """Return the ISO timestamp of the last successful fetch for a source."""
+ doc = watermarks_collection.find_one({"source": source})
+ return doc.get("last_fetch_time") if doc else None
+
+
+def set_watermark(source: str, timestamp: str):
+ """Persist the latest successful fetch timestamp for a source."""
+ watermarks_collection.update_one(
+ {"source": source},
+ {"$set": {"last_fetch_time": timestamp}},
+ upsert=True,
+ )