Files
astral/scripts/generate_assignment_report.py
Tomas Kracmar 17d745bdac Sync from dev @ 252c1cf
Source: main (252c1cf)
Excluded: live tenant exports, generated artifacts, and dev-only tooling.
2026-04-17 15:57:35 +02:00

420 lines
14 KiB
Python

#!/usr/bin/env python3
"""Generate a policy assignment inventory report from Intune backup JSON files."""
from __future__ import annotations
import argparse
import csv
import json
from dataclasses import dataclass
from datetime import datetime, timezone
from pathlib import Path
from typing import Iterable
GROUP_TARGET_TYPES = {
"#microsoft.graph.groupAssignmentTarget",
"#microsoft.graph.exclusionGroupAssignmentTarget",
}
DEFAULT_POLICY_TYPES = {
"app configuration",
"app protection",
"applications",
"compliance policies",
"conditional access",
"device configurations",
"enrollment configurations",
"enrollment profiles",
"filters",
"scripts",
"settings catalog",
}
def parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(description=__doc__)
parser.add_argument("--root", required=True, help="Path to the workload backup root (for example tenant-state/intune).")
parser.add_argument(
"--output-dir",
required=True,
help="Directory where report files will be written.",
)
parser.add_argument(
"--policy-type",
action="append",
default=[],
help=(
"Optional filter for policy type (top-level backup folder name). "
"Repeat the flag or pass a comma-separated list."
),
)
parser.add_argument(
"--graph-type",
action="append",
default=[],
help=(
"Optional filter for Graph @odata.type values. "
"Repeat the flag or pass a comma-separated list."
),
)
return parser.parse_args()
@dataclass
class AssignmentRow:
category: str
policy_type: str
object_name: str
object_type: str
assignment_state: str
assignment_count: int
intent: str
assignment_target: str
target_type: str
assignment_filter: str
filter_type: str
source_file: str
def safe_text(value: object) -> str:
if value is None:
return ""
return str(value).strip()
def normalize_intent(intent: str) -> str:
normalized = safe_text(intent).lower()
if normalized in {"apply", "include"}:
return "Include"
if normalized in {"exclude"}:
return "Exclude"
if not normalized:
return "Include"
return normalized.capitalize()
def infer_intent(assignment: dict, target_type: str) -> str:
target_type_lower = safe_text(target_type).lower()
if "exclusion" in target_type_lower:
return "Exclude"
explicit = safe_text(assignment.get("intent"))
if explicit:
return normalize_intent(explicit)
return "Include"
def resolve_assignment_target(target: dict) -> str:
target_type = safe_text(target.get("@odata.type"))
if target_type == "#microsoft.graph.allDevicesAssignmentTarget":
return "All devices"
if target_type == "#microsoft.graph.allLicensedUsersAssignmentTarget":
return "All users"
if target_type in GROUP_TARGET_TYPES:
return (
safe_text(target.get("groupDisplayName"))
or safe_text(target.get("groupName"))
or safe_text(target.get("groupId"))
or "Unresolved group"
)
return (
safe_text(target.get("groupDisplayName"))
or safe_text(target.get("groupName"))
or safe_text(target.get("displayName"))
or safe_text(target.get("id"))
or "Unknown target"
)
def escape_md_cell(value: str) -> str:
return value.replace("\\", "\\\\").replace("|", "\\|").replace("\n", " ").strip()
def parse_filter_values(raw_values: list[str]) -> set[str]:
values = set()
for raw in raw_values:
for item in safe_text(raw).split(","):
normalized = safe_text(item)
if normalized:
values.add(normalized.lower())
return values
def iter_assignment_rows(
root: Path,
policy_type_filter: set[str],
graph_type_filter: set[str],
) -> Iterable[AssignmentRow]:
excluded_categories = {
"App Registrations",
"Enterprise Applications",
}
for path in sorted(root.rglob("*.json")):
try:
rel_path = path.relative_to(root)
except ValueError:
continue
if rel_path.parts and rel_path.parts[0] in {"reports"}:
continue
if "__archive__" in rel_path.parts:
continue
try:
payload = json.loads(path.read_text(encoding="utf-8"))
except Exception:
continue
if not isinstance(payload, dict):
continue
object_name = safe_text(payload.get("displayName")) or safe_text(payload.get("name"))
if not object_name:
object_name = path.stem.split("__")[0]
object_type = safe_text(payload.get("@odata.type"))
category = "/".join(rel_path.parent.parts)
policy_type = rel_path.parts[0] if rel_path.parts else ""
if any(
category == excluded or category.startswith(f"{excluded}/")
for excluded in excluded_categories
):
continue
if policy_type_filter and policy_type.lower() not in policy_type_filter:
continue
if graph_type_filter and object_type.lower() not in graph_type_filter:
continue
assignments = payload.get("assignments")
if not isinstance(assignments, list):
yield AssignmentRow(
category=category,
policy_type=policy_type,
object_name=object_name,
object_type=object_type,
assignment_state="NotExported",
assignment_count=0,
intent="None",
assignment_target="Not exported in backup",
target_type="",
assignment_filter="",
filter_type="",
source_file=rel_path.as_posix(),
)
continue
if not assignments:
yield AssignmentRow(
category=category,
policy_type=policy_type,
object_name=object_name,
object_type=object_type,
assignment_state="Unassigned",
assignment_count=0,
intent="None",
assignment_target="No assignments",
target_type="",
assignment_filter="",
filter_type="",
source_file=rel_path.as_posix(),
)
continue
assignment_count = len([item for item in assignments if isinstance(item, dict)])
if assignment_count == 0:
yield AssignmentRow(
category=category,
policy_type=policy_type,
object_name=object_name,
object_type=object_type,
assignment_state="Unassigned",
assignment_count=0,
intent="None",
assignment_target="No assignments",
target_type="",
assignment_filter="",
filter_type="",
source_file=rel_path.as_posix(),
)
continue
for assignment in assignments:
if not isinstance(assignment, dict):
continue
target = assignment.get("target") if isinstance(assignment.get("target"), dict) else {}
target_type = safe_text(target.get("@odata.type"))
intent = infer_intent(assignment, target_type)
assignment_target = resolve_assignment_target(target)
assignment_filter = safe_text(target.get("deviceAndAppManagementAssignmentFilterId"))
filter_type = safe_text(target.get("deviceAndAppManagementAssignmentFilterType"))
yield AssignmentRow(
category=category,
policy_type=policy_type,
object_name=object_name,
object_type=object_type,
assignment_state="Assigned",
assignment_count=assignment_count,
intent=intent,
assignment_target=assignment_target,
target_type=target_type,
assignment_filter=assignment_filter,
filter_type=filter_type,
source_file=rel_path.as_posix(),
)
def write_csv(rows: list[AssignmentRow], output_path: Path) -> None:
output_path.parent.mkdir(parents=True, exist_ok=True)
with output_path.open("w", encoding="utf-8", newline="") as handle:
writer = csv.writer(handle)
writer.writerow(
[
"Category",
"PolicyType",
"ObjectName",
"ObjectType",
"AssignmentState",
"AssignmentCount",
"Intent",
"AssignmentTarget",
"TargetType",
"AssignmentFilter",
"FilterType",
"SourceFile",
]
)
for row in rows:
writer.writerow(
[
row.category,
row.policy_type,
row.object_name,
row.object_type,
row.assignment_state,
row.assignment_count,
row.intent,
row.assignment_target,
row.target_type,
row.assignment_filter,
row.filter_type,
row.source_file,
]
)
def write_markdown(rows: list[AssignmentRow], output_path: Path) -> None:
output_path.parent.mkdir(parents=True, exist_ok=True)
generated = datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M:%S UTC")
objects = {(row.category, row.object_name, row.source_file) for row in rows}
assigned_objects = {
(row.category, row.object_name, row.source_file)
for row in rows
if row.assignment_state == "Assigned"
}
unassigned_objects = {
(row.category, row.object_name, row.source_file)
for row in rows
if row.assignment_state == "Unassigned"
}
not_exported_objects = {
(row.category, row.object_name, row.source_file)
for row in rows
if row.assignment_state == "NotExported"
}
policy_type_counts = {}
for row in rows:
key = row.policy_type or "Unknown"
policy_type_counts[key] = policy_type_counts.get(key, 0) + 1
with output_path.open("w", encoding="utf-8") as handle:
handle.write("# Policy Assignment Inventory Report\n\n")
handle.write(f"Generated: `{generated}`\n\n")
handle.write(f"- Total objects in report: **{len(objects)}**\n")
handle.write(f"- Objects with assignments: **{len(assigned_objects)}**\n")
handle.write(f"- Objects without assignments: **{len(unassigned_objects)}**\n")
handle.write(f"- Objects with assignment field not exported: **{len(not_exported_objects)}**\n")
handle.write(f"- Total rows: **{len(rows)}**\n\n")
handle.write("## Rows by policy type\n\n")
handle.write("| Policy Type | Rows |\n")
handle.write("|---|---|\n")
for policy_type, count in sorted(policy_type_counts.items(), key=lambda item: item[0].lower()):
handle.write(f"| {escape_md_cell(policy_type)} | {count} |\n")
handle.write("\n")
handle.write(
"| Policy Type | Category | Object | Object Type | Assignment State | Assignment Count | Intent | Assignment Target | Target Type | Filter | Filter Type | Source |\n"
)
handle.write("|---|---|---|---|---|---|---|---|---|---|---|---|\n")
for row in rows:
handle.write(
"| "
+ " | ".join(
[
escape_md_cell(row.policy_type),
escape_md_cell(row.category),
escape_md_cell(row.object_name),
escape_md_cell(row.object_type),
escape_md_cell(row.assignment_state),
escape_md_cell(str(row.assignment_count)),
escape_md_cell(row.intent),
escape_md_cell(row.assignment_target),
escape_md_cell(row.target_type),
escape_md_cell(row.assignment_filter),
escape_md_cell(row.filter_type),
escape_md_cell(row.source_file),
]
)
+ " |\n"
)
def main() -> int:
args = parse_args()
root = Path(args.root).resolve()
output_dir = Path(args.output_dir).resolve()
policy_type_filter = parse_filter_values(args.policy_type)
graph_type_filter = parse_filter_values(args.graph_type)
using_default_policy_scope = False
if not policy_type_filter:
policy_type_filter = set(DEFAULT_POLICY_TYPES)
using_default_policy_scope = True
if not root.exists():
raise SystemExit(f"Backup path does not exist: {root}")
rows = sorted(
iter_assignment_rows(root, policy_type_filter, graph_type_filter),
key=lambda x: (
x.policy_type.lower(),
x.category.lower(),
x.object_name.lower(),
x.assignment_state,
x.intent.lower(),
x.assignment_target.lower(),
),
)
markdown_path = output_dir / "policy-assignments.md"
csv_path = output_dir / "policy-assignments.csv"
write_markdown(rows, markdown_path)
write_csv(rows, csv_path)
print(
f"Generated assignment report with {len(rows)} rows: "
f"{markdown_path} and {csv_path}"
)
if using_default_policy_scope:
print(
"Applied default policy scope: "
+ ", ".join(sorted(DEFAULT_POLICY_TYPES))
)
elif policy_type_filter:
print(f"Applied policy type filter: {', '.join(sorted(policy_type_filter))}")
if graph_type_filter:
print(f"Applied graph type filter: {', '.join(sorted(graph_type_filter))}")
return 0
if __name__ == "__main__":
raise SystemExit(main())