- Add AI_FEATURES_ENABLED config flag to gate AI/natural-language features - Conditionally register /api/ask router based on AI_FEATURES_ENABLED - Add GET /api/config/features endpoint for frontend feature detection - Update frontend to hide Ask panel when AI features are disabled - Implement standalone MCP server (backend/mcp_server.py) with tools: * search_events, get_event, get_summary, ask - Add mcp dependency to requirements.txt - Update .env.example, AGENTS.md, and ROADMAP.md - Bump VERSION to 1.3.0
277 lines
9.2 KiB
Python
277 lines
9.2 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
AOC MCP Server
|
|
|
|
Standalone MCP server that exposes audit log search tools for Claude Desktop,
|
|
Cursor, and other MCP clients.
|
|
|
|
Usage:
|
|
python mcp_server.py
|
|
|
|
Claude Desktop config (~/.config/claude/claude_desktop_config.json):
|
|
{
|
|
"mcpServers": {
|
|
"aoc": {
|
|
"command": "python",
|
|
"args": ["/path/to/aoc/backend/mcp_server.py"],
|
|
"env": {"MONGO_URI": "mongodb://..."}
|
|
}
|
|
}
|
|
}
|
|
"""
|
|
|
|
import asyncio
|
|
import json
|
|
import os
|
|
import sys
|
|
from datetime import UTC, datetime, timedelta
|
|
|
|
# Ensure backend modules are importable
|
|
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
|
|
|
|
from database import events_collection
|
|
from mcp.server import Server
|
|
from mcp.server.stdio import stdio_server
|
|
from mcp.types import TextContent, Tool
|
|
|
|
app = Server("aoc")
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Tool definitions
|
|
# ---------------------------------------------------------------------------
|
|
|
|
_SEARCH_EVENTS_SCHEMA = {
|
|
"type": "object",
|
|
"properties": {
|
|
"entity": {"type": "string", "description": "Device name, user UPN, or email to search for"},
|
|
"services": {
|
|
"type": "array",
|
|
"items": {"type": "string"},
|
|
"description": "Filter by service (e.g. Intune, Directory, Exchange)",
|
|
},
|
|
"operation": {"type": "string", "description": "Filter by operation name"},
|
|
"result": {"type": "string", "description": "Filter by result (success, failure)"},
|
|
"days": {"type": "integer", "description": "Number of days to look back (default 7)"},
|
|
"limit": {"type": "integer", "description": "Max events to return (default 20)"},
|
|
},
|
|
}
|
|
|
|
_GET_EVENT_SCHEMA = {
|
|
"type": "object",
|
|
"properties": {
|
|
"event_id": {"type": "string", "description": "The event ID to retrieve"},
|
|
},
|
|
"required": ["event_id"],
|
|
}
|
|
|
|
_GET_SUMMARY_SCHEMA = {
|
|
"type": "object",
|
|
"properties": {
|
|
"days": {"type": "integer", "description": "Number of days to summarise (default 7)"},
|
|
},
|
|
}
|
|
|
|
_ASK_SCHEMA = {
|
|
"type": "object",
|
|
"properties": {
|
|
"question": {"type": "string", "description": "Natural language question about audit logs"},
|
|
"days": {"type": "integer", "description": "Number of days to look back (default 7)"},
|
|
},
|
|
"required": ["question"],
|
|
}
|
|
|
|
|
|
@app.list_tools()
|
|
async def list_tools() -> list[Tool]:
|
|
return [
|
|
Tool(
|
|
name="search_events",
|
|
description="Search audit events by entity, service, operation, or result.",
|
|
inputSchema=_SEARCH_EVENTS_SCHEMA,
|
|
),
|
|
Tool(name="get_event", description="Retrieve a single audit event by its ID.", inputSchema=_GET_EVENT_SCHEMA),
|
|
Tool(
|
|
name="get_summary",
|
|
description="Get an aggregated summary of audit activity for the last N days.",
|
|
inputSchema=_GET_SUMMARY_SCHEMA,
|
|
),
|
|
Tool(
|
|
name="ask",
|
|
description="Ask a natural language question about audit logs. Returns a narrative answer.",
|
|
inputSchema=_ASK_SCHEMA,
|
|
),
|
|
]
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Tool handlers
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
@app.call_tool()
|
|
async def call_tool(name: str, arguments: dict) -> list[TextContent]:
|
|
if name == "search_events":
|
|
return await _handle_search_events(arguments)
|
|
if name == "get_event":
|
|
return await _handle_get_event(arguments)
|
|
if name == "get_summary":
|
|
return await _handle_get_summary(arguments)
|
|
if name == "ask":
|
|
return await _handle_ask(arguments)
|
|
raise ValueError(f"Unknown tool: {name}")
|
|
|
|
|
|
async def _handle_search_events(arguments: dict) -> list[TextContent]:
|
|
days = arguments.get("days", 7)
|
|
limit = min(arguments.get("limit", 20), 100)
|
|
since = (datetime.now(UTC) - timedelta(days=days)).isoformat().replace("+00:00", "Z")
|
|
|
|
filters = [{"timestamp": {"$gte": since}}]
|
|
|
|
services = arguments.get("services")
|
|
if services:
|
|
filters.append({"service": {"$in": services}})
|
|
|
|
operation = arguments.get("operation")
|
|
if operation:
|
|
filters.append({"operation": {"$regex": operation, "$options": "i"}})
|
|
|
|
result = arguments.get("result")
|
|
if result:
|
|
filters.append({"result": {"$regex": result, "$options": "i"}})
|
|
|
|
entity = arguments.get("entity")
|
|
if entity:
|
|
entity_safe = entity.replace(".", "\\.").replace("(", "\\(").replace(")", "\\)")
|
|
filters.append(
|
|
{
|
|
"$or": [
|
|
{"target_displays": {"$elemMatch": {"$regex": entity_safe, "$options": "i"}}},
|
|
{"actor_display": {"$regex": entity_safe, "$options": "i"}},
|
|
{"actor_upn": {"$regex": entity_safe, "$options": "i"}},
|
|
{"raw_text": {"$regex": entity_safe, "$options": "i"}},
|
|
]
|
|
}
|
|
)
|
|
|
|
query = {"$and": filters}
|
|
cursor = events_collection.find(query).sort("timestamp", -1).limit(limit)
|
|
events = list(cursor)
|
|
|
|
if not events:
|
|
return [TextContent(type="text", text="No matching events found.")]
|
|
|
|
lines = [f"Found {len(events)} event(s):\n"]
|
|
for e in events:
|
|
ts = e.get("timestamp", "?")[:16].replace("T", " ")
|
|
svc = e.get("service", "?")
|
|
op = e.get("operation", "?")
|
|
actor = e.get("actor_display", "?")
|
|
result_str = e.get("result", "?")
|
|
lines.append(f"{ts} | {svc} | {op} | {actor} | {result_str}")
|
|
|
|
return [TextContent(type="text", text="\n".join(lines))]
|
|
|
|
|
|
async def _handle_get_event(arguments: dict) -> list[TextContent]:
|
|
event_id = arguments["event_id"]
|
|
event = events_collection.find_one({"id": event_id})
|
|
if not event:
|
|
return [TextContent(type="text", text=f"Event {event_id} not found.")]
|
|
event.pop("_id", None)
|
|
return [TextContent(type="text", text=json.dumps(event, indent=2, default=str))]
|
|
|
|
|
|
async def _handle_get_summary(arguments: dict) -> list[TextContent]:
|
|
days = arguments.get("days", 7)
|
|
since = (datetime.now(UTC) - timedelta(days=days)).isoformat().replace("+00:00", "Z")
|
|
query = {"timestamp": {"$gte": since}}
|
|
|
|
total = events_collection.count_documents(query)
|
|
if total == 0:
|
|
return [TextContent(type="text", text="No events in the specified period.")]
|
|
|
|
# Aggregation pipelines
|
|
svc_pipeline = [
|
|
{"$match": query},
|
|
{"$group": {"_id": "$service", "count": {"$sum": 1}}},
|
|
{"$sort": {"count": -1}},
|
|
{"$limit": 10},
|
|
]
|
|
op_pipeline = [
|
|
{"$match": query},
|
|
{"$group": {"_id": "$operation", "count": {"$sum": 1}}},
|
|
{"$sort": {"count": -1}},
|
|
{"$limit": 10},
|
|
]
|
|
result_pipeline = [
|
|
{"$match": query},
|
|
{"$group": {"_id": "$result", "count": {"$sum": 1}}},
|
|
{"$sort": {"count": -1}},
|
|
]
|
|
actor_pipeline = [
|
|
{"$match": query},
|
|
{"$group": {"_id": "$actor_display", "count": {"$sum": 1}}},
|
|
{"$sort": {"count": -1}},
|
|
{"$limit": 10},
|
|
]
|
|
|
|
svc_counts = list(events_collection.aggregate(svc_pipeline))
|
|
op_counts = list(events_collection.aggregate(op_pipeline))
|
|
result_counts = list(events_collection.aggregate(result_pipeline))
|
|
actor_counts = list(events_collection.aggregate(actor_pipeline))
|
|
|
|
lines = [f"Summary for the last {days} days ({total} total events)\n"]
|
|
|
|
lines.append("By service:")
|
|
for row in svc_counts:
|
|
lines.append(f" {row['_id'] or 'Unknown'}: {row['count']}")
|
|
|
|
lines.append("\nBy action:")
|
|
for row in op_counts:
|
|
lines.append(f" {row['_id'] or 'Unknown'}: {row['count']}")
|
|
|
|
lines.append("\nBy result:")
|
|
for row in result_counts:
|
|
lines.append(f" {row['_id'] or 'Unknown'}: {row['count']}")
|
|
|
|
lines.append("\nTop actors:")
|
|
for row in actor_counts:
|
|
lines.append(f" {row['_id'] or 'Unknown'}: {row['count']}")
|
|
|
|
return [TextContent(type="text", text="\n".join(lines))]
|
|
|
|
|
|
async def _handle_ask(arguments: dict) -> list[TextContent]:
|
|
"""For now, the MCP 'ask' tool returns a helpful message directing the user to the web UI,
|
|
since the full NLQ pipeline requires LLM configuration that may not be available in the MCP context."""
|
|
question = arguments["question"]
|
|
days = arguments.get("days", 7)
|
|
|
|
# Perform a search to give the user something useful immediately
|
|
result = await _handle_search_events({"entity": "", "days": days, "limit": 50})
|
|
base_text = result[0].text if result else ""
|
|
|
|
text = (
|
|
f"You asked: '{question}'\n\n"
|
|
f"Here are the most recent {min(50, base_text.count(chr(10)) - 1)} events from the last {days} days:\n\n"
|
|
f"{base_text}\n\n"
|
|
f"Tip: Use the 'search_events' tool with specific filters (services, operation, result) "
|
|
f"to narrow down the dataset before asking follow-up questions."
|
|
)
|
|
return [TextContent(type="text", text=text)]
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Entry point
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
async def main():
|
|
async with stdio_server() as (read_stream, write_stream):
|
|
await app.run(read_stream, write_stream, app.create_initialization_options())
|
|
|
|
|
|
if __name__ == "__main__":
|
|
asyncio.run(main())
|