- Add WEBHOOK_CLIENT_SECRET validation for Graph webhooks - Add Redis-backed rate limiting (fetch/ask/write/default tiers) - Validate LLM_BASE_URL to prevent SSRF (HTTPS only, block private IPs) - Enforce non-wildcard CORS when AUTH_ENABLED=true - Add Content-Security-Policy headers - Fix audit middleware to use verified JWT claims via contextvars - Cap bulk_tags updates to 10,000 documents - Return generic error messages to clients (no internal detail leakage) - Strict AlertCondition Pydantic model for alert rules - Security warning on MCP stdio server startup - Remove MongoDB/Redis host ports from docker-compose - Remove mongo_query from /ask API response
98 lines
2.9 KiB
Python
98 lines
2.9 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
AOC MCP Server — stdio transport
|
|
|
|
Standalone MCP server for local use (Claude Desktop, Cursor, etc.).
|
|
For the HTTP/SSE version (production, behind auth), see routes/mcp.py.
|
|
|
|
Usage:
|
|
python mcp_server.py
|
|
|
|
Claude Desktop config (~/.config/claude/claude_desktop_config.json):
|
|
{
|
|
"mcpServers": {
|
|
"aoc": {
|
|
"command": "python",
|
|
"args": ["/path/to/aoc/backend/mcp_server.py"],
|
|
"env": {"MONGO_URI": "mongodb://..."}
|
|
}
|
|
}
|
|
}
|
|
"""
|
|
|
|
import asyncio
|
|
import os
|
|
import sys
|
|
|
|
# Ensure backend modules are importable when run standalone
|
|
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
|
|
|
|
from mcp.server import Server
|
|
from mcp.server.stdio import stdio_server
|
|
from mcp.types import TextContent, Tool
|
|
from mcp_common import (
|
|
ASK_SCHEMA,
|
|
GET_EVENT_SCHEMA,
|
|
GET_SUMMARY_SCHEMA,
|
|
SEARCH_EVENTS_SCHEMA,
|
|
handle_ask,
|
|
handle_get_event,
|
|
handle_get_summary,
|
|
handle_search_events,
|
|
)
|
|
|
|
# Security warning: this standalone stdio server has no authentication.
|
|
# Only run it in trusted environments (e.g. local Claude Desktop) and
|
|
# ensure the MongoDB connection uses authenticated credentials.
|
|
print("=" * 60, file=sys.stderr)
|
|
print("AOC MCP Server (stdio transport)", file=sys.stderr)
|
|
print("WARNING: No authentication layer. Only run in trusted", file=sys.stderr)
|
|
print("environments or behind a VPN. See AGENTS.md for details.", file=sys.stderr)
|
|
print("=" * 60, file=sys.stderr)
|
|
|
|
app = Server("aoc")
|
|
|
|
|
|
@app.list_tools()
|
|
async def list_tools() -> list[Tool]:
|
|
return [
|
|
Tool(
|
|
name="search_events",
|
|
description="Search audit events by entity, service, operation, or result.",
|
|
inputSchema=SEARCH_EVENTS_SCHEMA,
|
|
),
|
|
Tool(name="get_event", description="Retrieve a single audit event by its ID.", inputSchema=GET_EVENT_SCHEMA),
|
|
Tool(
|
|
name="get_summary",
|
|
description="Get an aggregated summary of audit activity for the last N days.",
|
|
inputSchema=GET_SUMMARY_SCHEMA,
|
|
),
|
|
Tool(
|
|
name="ask",
|
|
description="Ask a natural language question about audit logs. Returns a narrative answer.",
|
|
inputSchema=ASK_SCHEMA,
|
|
),
|
|
]
|
|
|
|
|
|
@app.call_tool()
|
|
async def call_tool(name: str, arguments: dict) -> list[TextContent]:
|
|
if name == "search_events":
|
|
return await handle_search_events(arguments)
|
|
if name == "get_event":
|
|
return await handle_get_event(arguments)
|
|
if name == "get_summary":
|
|
return await handle_get_summary(arguments)
|
|
if name == "ask":
|
|
return await handle_ask(arguments)
|
|
raise ValueError(f"Unknown tool: {name}")
|
|
|
|
|
|
async def main():
|
|
async with stdio_server() as (read_stream, write_stream):
|
|
await app.run(read_stream, write_stream, app.create_initialization_options())
|
|
|
|
|
|
if __name__ == "__main__":
|
|
asyncio.run(main())
|