Files
aoc/backend/mcp_server.py
Tomas Kracmar 60b6ad15c4
All checks were successful
CI / lint-and-test (push) Successful in 45s
Release / build-and-push (push) Successful in 1m34s
Release v1.3.0: AI feature flag and MCP server
- Add AI_FEATURES_ENABLED config flag to gate AI/natural-language features
- Conditionally register /api/ask router based on AI_FEATURES_ENABLED
- Add GET /api/config/features endpoint for frontend feature detection
- Update frontend to hide Ask panel when AI features are disabled
- Implement standalone MCP server (backend/mcp_server.py) with tools:
  * search_events, get_event, get_summary, ask
- Add mcp dependency to requirements.txt
- Update .env.example, AGENTS.md, and ROADMAP.md
- Bump VERSION to 1.3.0
2026-04-20 18:11:26 +02:00

277 lines
9.2 KiB
Python

#!/usr/bin/env python3
"""
AOC MCP Server
Standalone MCP server that exposes audit log search tools for Claude Desktop,
Cursor, and other MCP clients.
Usage:
python mcp_server.py
Claude Desktop config (~/.config/claude/claude_desktop_config.json):
{
"mcpServers": {
"aoc": {
"command": "python",
"args": ["/path/to/aoc/backend/mcp_server.py"],
"env": {"MONGO_URI": "mongodb://..."}
}
}
}
"""
import asyncio
import json
import os
import sys
from datetime import UTC, datetime, timedelta
# Ensure backend modules are importable
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
from database import events_collection
from mcp.server import Server
from mcp.server.stdio import stdio_server
from mcp.types import TextContent, Tool
app = Server("aoc")
# ---------------------------------------------------------------------------
# Tool definitions
# ---------------------------------------------------------------------------
_SEARCH_EVENTS_SCHEMA = {
"type": "object",
"properties": {
"entity": {"type": "string", "description": "Device name, user UPN, or email to search for"},
"services": {
"type": "array",
"items": {"type": "string"},
"description": "Filter by service (e.g. Intune, Directory, Exchange)",
},
"operation": {"type": "string", "description": "Filter by operation name"},
"result": {"type": "string", "description": "Filter by result (success, failure)"},
"days": {"type": "integer", "description": "Number of days to look back (default 7)"},
"limit": {"type": "integer", "description": "Max events to return (default 20)"},
},
}
_GET_EVENT_SCHEMA = {
"type": "object",
"properties": {
"event_id": {"type": "string", "description": "The event ID to retrieve"},
},
"required": ["event_id"],
}
_GET_SUMMARY_SCHEMA = {
"type": "object",
"properties": {
"days": {"type": "integer", "description": "Number of days to summarise (default 7)"},
},
}
_ASK_SCHEMA = {
"type": "object",
"properties": {
"question": {"type": "string", "description": "Natural language question about audit logs"},
"days": {"type": "integer", "description": "Number of days to look back (default 7)"},
},
"required": ["question"],
}
@app.list_tools()
async def list_tools() -> list[Tool]:
return [
Tool(
name="search_events",
description="Search audit events by entity, service, operation, or result.",
inputSchema=_SEARCH_EVENTS_SCHEMA,
),
Tool(name="get_event", description="Retrieve a single audit event by its ID.", inputSchema=_GET_EVENT_SCHEMA),
Tool(
name="get_summary",
description="Get an aggregated summary of audit activity for the last N days.",
inputSchema=_GET_SUMMARY_SCHEMA,
),
Tool(
name="ask",
description="Ask a natural language question about audit logs. Returns a narrative answer.",
inputSchema=_ASK_SCHEMA,
),
]
# ---------------------------------------------------------------------------
# Tool handlers
# ---------------------------------------------------------------------------
@app.call_tool()
async def call_tool(name: str, arguments: dict) -> list[TextContent]:
if name == "search_events":
return await _handle_search_events(arguments)
if name == "get_event":
return await _handle_get_event(arguments)
if name == "get_summary":
return await _handle_get_summary(arguments)
if name == "ask":
return await _handle_ask(arguments)
raise ValueError(f"Unknown tool: {name}")
async def _handle_search_events(arguments: dict) -> list[TextContent]:
days = arguments.get("days", 7)
limit = min(arguments.get("limit", 20), 100)
since = (datetime.now(UTC) - timedelta(days=days)).isoformat().replace("+00:00", "Z")
filters = [{"timestamp": {"$gte": since}}]
services = arguments.get("services")
if services:
filters.append({"service": {"$in": services}})
operation = arguments.get("operation")
if operation:
filters.append({"operation": {"$regex": operation, "$options": "i"}})
result = arguments.get("result")
if result:
filters.append({"result": {"$regex": result, "$options": "i"}})
entity = arguments.get("entity")
if entity:
entity_safe = entity.replace(".", "\\.").replace("(", "\\(").replace(")", "\\)")
filters.append(
{
"$or": [
{"target_displays": {"$elemMatch": {"$regex": entity_safe, "$options": "i"}}},
{"actor_display": {"$regex": entity_safe, "$options": "i"}},
{"actor_upn": {"$regex": entity_safe, "$options": "i"}},
{"raw_text": {"$regex": entity_safe, "$options": "i"}},
]
}
)
query = {"$and": filters}
cursor = events_collection.find(query).sort("timestamp", -1).limit(limit)
events = list(cursor)
if not events:
return [TextContent(type="text", text="No matching events found.")]
lines = [f"Found {len(events)} event(s):\n"]
for e in events:
ts = e.get("timestamp", "?")[:16].replace("T", " ")
svc = e.get("service", "?")
op = e.get("operation", "?")
actor = e.get("actor_display", "?")
result_str = e.get("result", "?")
lines.append(f"{ts} | {svc} | {op} | {actor} | {result_str}")
return [TextContent(type="text", text="\n".join(lines))]
async def _handle_get_event(arguments: dict) -> list[TextContent]:
event_id = arguments["event_id"]
event = events_collection.find_one({"id": event_id})
if not event:
return [TextContent(type="text", text=f"Event {event_id} not found.")]
event.pop("_id", None)
return [TextContent(type="text", text=json.dumps(event, indent=2, default=str))]
async def _handle_get_summary(arguments: dict) -> list[TextContent]:
days = arguments.get("days", 7)
since = (datetime.now(UTC) - timedelta(days=days)).isoformat().replace("+00:00", "Z")
query = {"timestamp": {"$gte": since}}
total = events_collection.count_documents(query)
if total == 0:
return [TextContent(type="text", text="No events in the specified period.")]
# Aggregation pipelines
svc_pipeline = [
{"$match": query},
{"$group": {"_id": "$service", "count": {"$sum": 1}}},
{"$sort": {"count": -1}},
{"$limit": 10},
]
op_pipeline = [
{"$match": query},
{"$group": {"_id": "$operation", "count": {"$sum": 1}}},
{"$sort": {"count": -1}},
{"$limit": 10},
]
result_pipeline = [
{"$match": query},
{"$group": {"_id": "$result", "count": {"$sum": 1}}},
{"$sort": {"count": -1}},
]
actor_pipeline = [
{"$match": query},
{"$group": {"_id": "$actor_display", "count": {"$sum": 1}}},
{"$sort": {"count": -1}},
{"$limit": 10},
]
svc_counts = list(events_collection.aggregate(svc_pipeline))
op_counts = list(events_collection.aggregate(op_pipeline))
result_counts = list(events_collection.aggregate(result_pipeline))
actor_counts = list(events_collection.aggregate(actor_pipeline))
lines = [f"Summary for the last {days} days ({total} total events)\n"]
lines.append("By service:")
for row in svc_counts:
lines.append(f" {row['_id'] or 'Unknown'}: {row['count']}")
lines.append("\nBy action:")
for row in op_counts:
lines.append(f" {row['_id'] or 'Unknown'}: {row['count']}")
lines.append("\nBy result:")
for row in result_counts:
lines.append(f" {row['_id'] or 'Unknown'}: {row['count']}")
lines.append("\nTop actors:")
for row in actor_counts:
lines.append(f" {row['_id'] or 'Unknown'}: {row['count']}")
return [TextContent(type="text", text="\n".join(lines))]
async def _handle_ask(arguments: dict) -> list[TextContent]:
"""For now, the MCP 'ask' tool returns a helpful message directing the user to the web UI,
since the full NLQ pipeline requires LLM configuration that may not be available in the MCP context."""
question = arguments["question"]
days = arguments.get("days", 7)
# Perform a search to give the user something useful immediately
result = await _handle_search_events({"entity": "", "days": days, "limit": 50})
base_text = result[0].text if result else ""
text = (
f"You asked: '{question}'\n\n"
f"Here are the most recent {min(50, base_text.count(chr(10)) - 1)} events from the last {days} days:\n\n"
f"{base_text}\n\n"
f"Tip: Use the 'search_events' tool with specific filters (services, operation, result) "
f"to narrow down the dataset before asking follow-up questions."
)
return [TextContent(type="text", text=text)]
# ---------------------------------------------------------------------------
# Entry point
# ---------------------------------------------------------------------------
async def main():
async with stdio_server() as (read_stream, write_stream):
await app.run(read_stream, write_stream, app.create_initialization_options())
if __name__ == "__main__":
asyncio.run(main())