Some checks failed
CI / lint-and-test (push) Has been cancelled
- Cache Graph API tokens with expiry-aware reuse in graph/auth.py - Add tenacity-based retry/backoff wrapper (utils/http.py) and apply to all Graph/source API calls - Add Pydantic request/response models (models/api.py) and FastAPI query constraints - Add unit tests for event_model, auth and integration tests for API endpoints - Configure ruff linter/formatter in pyproject.toml - Add GitHub Actions CI pipeline (.github/workflows/ci.yml) - Add requirements-dev.txt with pytest, mongomock, httpx, ruff - Clean up typing imports and fix ruff linting across codebase
73 lines
2.4 KiB
Python
73 lines
2.4 KiB
Python
from datetime import datetime, timedelta
|
|
|
|
from graph.auth import get_access_token
|
|
from utils.http import get_with_retry
|
|
|
|
|
|
def fetch_intune_audit(hours: int = 24, max_pages: int = 50) -> list[dict]:
|
|
"""
|
|
Fetch Intune audit events via Microsoft Graph.
|
|
Requires Intune audit permissions (e.g., DeviceManagementConfiguration.Read.All).
|
|
"""
|
|
token = get_access_token()
|
|
start_time = (datetime.utcnow() - timedelta(hours=hours)).isoformat() + "Z"
|
|
url = (
|
|
"https://graph.microsoft.com/v1.0/deviceManagement/auditEvents"
|
|
f"?$filter=activityDateTime ge {start_time}"
|
|
)
|
|
headers = {"Authorization": f"Bearer {token}"}
|
|
|
|
events = []
|
|
pages = 0
|
|
while url:
|
|
if pages >= max_pages:
|
|
raise RuntimeError(f"Aborting Intune pagination after {max_pages} pages.")
|
|
try:
|
|
res = get_with_retry(url, headers=headers, timeout=20)
|
|
res.raise_for_status()
|
|
body = res.json()
|
|
except RuntimeError:
|
|
raise
|
|
except Exception as exc:
|
|
raise RuntimeError(f"Failed to fetch Intune audit logs: {exc}") from exc
|
|
|
|
events.extend(body.get("value", []))
|
|
url = body.get("@odata.nextLink")
|
|
pages += 1
|
|
|
|
return [_normalize_intune(e) for e in events]
|
|
|
|
|
|
def _normalize_intune(e: dict) -> dict:
|
|
"""
|
|
Map Intune audit event to normalized schema.
|
|
"""
|
|
actor = e.get("actor", {}) or {}
|
|
target = e.get("resources", [{}])[0] if e.get("resources") else {}
|
|
|
|
return {
|
|
"id": e.get("id"),
|
|
"activityDateTime": e.get("activityDateTime"),
|
|
"category": e.get("category") or "Intune",
|
|
"activityDisplayName": e.get("activity") or e.get("activityType"),
|
|
"result": e.get("activityResult") or e.get("result"),
|
|
"initiatedBy": {
|
|
"user": {
|
|
"id": actor.get("userId"),
|
|
"userPrincipalName": actor.get("userPrincipalName"),
|
|
"displayName": actor.get("userName"),
|
|
"ipAddress": actor.get("ipAddress"),
|
|
}
|
|
},
|
|
"targetResources": [
|
|
{
|
|
"id": target.get("id"),
|
|
"displayName": target.get("displayName") or target.get("modifiedProperties", [{}])[0].get("displayName"),
|
|
"type": target.get("type"),
|
|
}
|
|
]
|
|
if target
|
|
else [],
|
|
"raw": e,
|
|
}
|