import requests from datetime import datetime, timedelta from typing import List from graph.auth import get_access_token AUDIT_CONTENT_TYPES = { "Audit.Exchange": "Exchange admin audit", "Audit.SharePoint": "SharePoint admin audit", "Audit.General": "General (Teams/others)", } def _time_window(hours: int): end = datetime.utcnow() start = end - timedelta(hours=hours) # Activity API expects UTC ISO without Z return start.strftime("%Y-%m-%dT%H:%M:%S"), end.strftime("%Y-%m-%dT%H:%M:%S") def _ensure_subscription(content_type: str, token: str, tenant_id: str): url = f"https://manage.office.com/api/v1.0/{tenant_id}/activity/feed/subscriptions/start" params = {"contentType": content_type} headers = {"Authorization": f"Bearer {token}"} try: requests.post(url, params=params, headers=headers, timeout=10) except requests.RequestException: pass # best-effort def _list_content(content_type: str, token: str, tenant_id: str, hours: int) -> List[dict]: start, end = _time_window(hours) url = f"https://manage.office.com/api/v1.0/{tenant_id}/activity/feed/subscriptions/content" params = {"contentType": content_type, "startTime": start, "endTime": end} headers = {"Authorization": f"Bearer {token}"} try: res = requests.get(url, params=params, headers=headers, timeout=20) if res.status_code in (400, 401, 403, 404): # Likely not enabled or insufficient perms; surface the text to the caller. raise RuntimeError(f"{content_type} content listing failed ({res.status_code}): {res.text}") return [] res.raise_for_status() return res.json() or [] except requests.RequestException as exc: raise RuntimeError(f"Failed to list {content_type} content: {exc}") from exc def _download_content(content_uri: str, token: str) -> List[dict]: headers = {"Authorization": f"Bearer {token}"} try: res = requests.get(content_uri, headers=headers, timeout=30) res.raise_for_status() return res.json() or [] except requests.RequestException as exc: raise RuntimeError(f"Failed to download audit content: {exc}") from exc def fetch_unified_audit(hours: int = 24, max_files: int = 50) -> List[dict]: """ Fetch unified audit logs (Exchange, SharePoint, Teams policy changes via Audit.General) using the Office 365 Management Activity API. """ # Need token for manage.office.com token = get_access_token("https://manage.office.com/.default") from config import TENANT_ID # local import to avoid cycles events = [] for content_type in AUDIT_CONTENT_TYPES.keys(): _ensure_subscription(content_type, token, TENANT_ID) contents = _list_content(content_type, token, TENANT_ID, hours) for item in contents[:max_files]: content_uri = item.get("contentUri") if not content_uri: continue events.extend(_download_content(content_uri, token)) return [_normalize_unified(e) for e in events] def _normalize_unified(e: dict) -> dict: """ Map unified audit log shape to the normalized schema used by the app. """ actor_user = { "id": e.get("UserId"), "userPrincipalName": e.get("UserId"), "ipAddress": e.get("ClientIP"), "displayName": e.get("UserId"), } target = { "id": e.get("ObjectId") or e.get("OrganizationId"), "displayName": e.get("ObjectId"), "type": e.get("Workload"), } return { "id": e.get("Id") or e.get("RecordType"), "activityDateTime": e.get("CreationTime"), "category": e.get("Workload"), "activityDisplayName": e.get("Operation"), "result": e.get("ResultStatus"), "initiatedBy": {"user": actor_user}, "targetResources": [target], "raw": e, }