Sync from dev @ 252c1cf

Source: main (252c1cf)
Excluded: live tenant exports, generated artifacts, and dev-only tooling.
This commit is contained in:
2026-04-17 15:57:35 +02:00
commit 17d745bdac
52 changed files with 15601 additions and 0 deletions

View File

@@ -0,0 +1,342 @@
from __future__ import annotations
import importlib.util
import os
import subprocess
import sys
import tempfile
import unittest
from pathlib import Path
from unittest.mock import patch
MODULE_PATH = Path(__file__).resolve().parents[1] / "scripts" / "ensure_rolling_pr.py"
def load_module():
# Preload common helper so the script can import it.
common_path = MODULE_PATH.parent / "common.py"
common_spec = importlib.util.spec_from_file_location("common", common_path)
if common_spec is not None and common_spec.loader is not None:
common_mod = importlib.util.module_from_spec(common_spec)
sys.modules["common"] = common_mod
common_spec.loader.exec_module(common_mod)
module_name = "ensure_rolling_pr"
spec = importlib.util.spec_from_file_location(module_name, MODULE_PATH)
if spec is None or spec.loader is None:
raise RuntimeError(f"Unable to load module from {MODULE_PATH}")
module = importlib.util.module_from_spec(spec)
sys.modules[module_name] = module
spec.loader.exec_module(module)
return module
def _run(cmd: list[str], cwd: Path) -> None:
subprocess.run(cmd, cwd=cwd, check=True, capture_output=True, text=True)
class EnsureRollingPrTests(unittest.TestCase):
@classmethod
def setUpClass(cls) -> None:
cls.module = load_module()
def test_is_workload_config_path_filters_docs_and_reports(self) -> None:
is_path = self.module._is_workload_config_path
self.assertTrue(
is_path(
"tenant-state/entra/Conditional Access/policy.json",
workload_dir="entra",
backup_folder="tenant-state",
reports_subdir="reports",
)
)
self.assertFalse(
is_path(
"tenant-state/entra/Conditional Access/policy.md",
workload_dir="entra",
backup_folder="tenant-state",
reports_subdir="reports",
)
)
self.assertFalse(
is_path(
"tenant-state/reports/entra/assignment_report.md",
workload_dir="entra",
backup_folder="tenant-state",
reports_subdir="reports",
)
)
def test_config_fingerprint_ignores_docs_and_reports(self) -> None:
with tempfile.TemporaryDirectory() as tmp:
repo = Path(tmp)
_run(["git", "init"], repo)
_run(["git", "config", "user.name", "Test"], repo)
_run(["git", "config", "user.email", "test@example.com"], repo)
config_file = repo / "tenant-state" / "entra" / "Conditional Access" / "policy.json"
report_file = repo / "tenant-state" / "reports" / "entra" / "summary.md"
doc_file = repo / "tenant-state" / "entra" / "README.md"
config_file.parent.mkdir(parents=True, exist_ok=True)
report_file.parent.mkdir(parents=True, exist_ok=True)
doc_file.parent.mkdir(parents=True, exist_ok=True)
config_file.write_text('{"state":"enabled"}\n', encoding="utf-8")
report_file.write_text("report v1\n", encoding="utf-8")
doc_file.write_text("doc v1\n", encoding="utf-8")
_run(["git", "add", "."], repo)
_run(["git", "commit", "-m", "initial"], repo)
fp1 = self.module._config_fingerprint_from_local_tree(
repo_root=str(repo),
commitish="HEAD",
workload_dir="entra",
backup_folder="tenant-state",
reports_subdir="reports",
)
report_file.write_text("report v2\n", encoding="utf-8")
doc_file.write_text("doc v2\n", encoding="utf-8")
_run(["git", "add", "."], repo)
_run(["git", "commit", "-m", "doc/report only"], repo)
fp2 = self.module._config_fingerprint_from_local_tree(
repo_root=str(repo),
commitish="HEAD",
workload_dir="entra",
backup_folder="tenant-state",
reports_subdir="reports",
)
config_file.write_text('{"state":"disabled"}\n', encoding="utf-8")
_run(["git", "add", "."], repo)
_run(["git", "commit", "-m", "config change"], repo)
fp3 = self.module._config_fingerprint_from_local_tree(
repo_root=str(repo),
commitish="HEAD",
workload_dir="entra",
backup_folder="tenant-state",
reports_subdir="reports",
)
self.assertTrue(fp1)
self.assertEqual(fp1, fp2)
self.assertNotEqual(fp2, fp3)
def test_ref_has_commit_for_local_and_missing_ref(self) -> None:
with tempfile.TemporaryDirectory() as tmp:
repo = Path(tmp)
_run(["git", "init"], repo)
_run(["git", "config", "user.name", "Test"], repo)
_run(["git", "config", "user.email", "test@example.com"], repo)
(repo / "README.md").write_text("x\n", encoding="utf-8")
_run(["git", "add", "."], repo)
_run(["git", "commit", "-m", "init"], repo)
self.assertTrue(self.module._ref_has_commit(str(repo), "HEAD"))
self.assertFalse(self.module._ref_has_commit(str(repo), "origin/does-not-exist"))
def test_workload_config_diff_exists_ignores_docs_and_reports(self) -> None:
with tempfile.TemporaryDirectory() as tmp:
repo = Path(tmp)
_run(["git", "init"], repo)
_run(["git", "config", "user.name", "Test"], repo)
_run(["git", "config", "user.email", "test@example.com"], repo)
config_file = repo / "tenant-state" / "intune" / "Device Configurations" / "policy.json"
report_file = repo / "tenant-state" / "reports" / "intune" / "summary.md"
doc_file = repo / "tenant-state" / "intune" / "README.md"
config_file.parent.mkdir(parents=True, exist_ok=True)
report_file.parent.mkdir(parents=True, exist_ok=True)
doc_file.parent.mkdir(parents=True, exist_ok=True)
config_file.write_text('{"setting":"enabled"}\n', encoding="utf-8")
report_file.write_text("report v1\n", encoding="utf-8")
doc_file.write_text("doc v1\n", encoding="utf-8")
_run(["git", "add", "."], repo)
_run(["git", "commit", "-m", "baseline"], repo)
baseline_commit = subprocess.run(
["git", "rev-parse", "HEAD"],
cwd=repo,
check=True,
capture_output=True,
text=True,
).stdout.strip()
report_file.write_text("report v2\n", encoding="utf-8")
doc_file.write_text("doc v2\n", encoding="utf-8")
_run(["git", "add", "."], repo)
_run(["git", "commit", "-m", "doc only"], repo)
doc_only_commit = subprocess.run(
["git", "rev-parse", "HEAD"],
cwd=repo,
check=True,
capture_output=True,
text=True,
).stdout.strip()
config_file.write_text('{"setting":"disabled"}\n', encoding="utf-8")
_run(["git", "add", "."], repo)
_run(["git", "commit", "-m", "config change"], repo)
config_change_commit = subprocess.run(
["git", "rev-parse", "HEAD"],
cwd=repo,
check=True,
capture_output=True,
text=True,
).stdout.strip()
self.assertFalse(
self.module._workload_config_diff_exists(
repo_root=str(repo),
baseline_commitish=baseline_commit,
drift_commitish=doc_only_commit,
workload_dir="intune",
backup_folder="tenant-state",
reports_subdir="reports",
)
)
self.assertTrue(
self.module._workload_config_diff_exists(
repo_root=str(repo),
baseline_commitish=baseline_commit,
drift_commitish=config_change_commit,
workload_dir="intune",
backup_folder="tenant-state",
reports_subdir="reports",
)
)
def test_main_suppresses_pr_creation_when_drift_matches_baseline_config(self) -> None:
env = {
"SYSTEM_ACCESSTOKEN": "token",
"SYSTEM_COLLECTIONURI": "https://dev.azure.com/example",
"SYSTEM_TEAMPROJECT": "Project",
"BUILD_REPOSITORY_ID": "repo-id",
}
with patch.dict(os.environ, env, clear=False):
with patch.object(
sys,
"argv",
[
"ensure_rolling_pr.py",
"--repo-root",
"/tmp/repo",
"--workload",
"intune",
"--drift-branch",
"drift/intune",
"--baseline-branch",
"main",
"--pr-title",
"Intune drift review (rolling)",
],
):
with patch.object(self.module, "_query_prs", return_value=[]):
with patch.object(self.module, "_run_git"):
with patch.object(self.module, "_ref_has_commit", return_value=True):
with patch.object(self.module, "_workload_config_diff_exists", return_value=False):
with patch.object(self.module, "_request_json") as request_json:
result = self.module.main()
self.assertEqual(result, 0)
request_json.assert_not_called()
def test_main_creates_pr_as_draft_when_notification_delay_enabled(self) -> None:
env = {
"SYSTEM_ACCESSTOKEN": "token",
"SYSTEM_COLLECTIONURI": "https://dev.azure.com/example",
"SYSTEM_TEAMPROJECT": "Project",
"BUILD_REPOSITORY_ID": "repo-id",
"BUILD_BUILDNUMBER": "42",
"BUILD_BUILDID": "1001",
"ROLLING_PR_DELAY_REVIEWER_NOTIFICATIONS": "true",
}
created_bodies: list[dict[str, object]] = []
def request_json(url: str, headers: dict[str, str], method: str = "GET", body: dict[str, object] | None = None):
if method == "POST" and url.endswith("/pullrequests?api-version=7.1"):
created_bodies.append(body or {})
return {"pullRequestId": 123}
raise AssertionError(f"Unexpected request: {method} {url}")
with patch.dict(os.environ, env, clear=False):
with patch.object(
sys,
"argv",
[
"ensure_rolling_pr.py",
"--repo-root",
"/tmp/repo",
"--workload",
"intune",
"--drift-branch",
"drift/intune",
"--baseline-branch",
"main",
"--pr-title",
"Intune drift review (rolling)",
],
):
with patch.object(self.module, "_query_prs", side_effect=[[], []]):
with patch.object(self.module, "_run_git"):
with patch.object(self.module, "_ref_has_commit", return_value=True):
with patch.object(self.module, "_workload_config_diff_exists", return_value=True):
with patch.object(self.module, "_tree_id_for_commitish", return_value="tree123"):
with patch.object(self.module, "_find_matching_abandoned_pr", return_value=(None, "")):
with patch.object(self.module, "_request_json", side_effect=request_json):
result = self.module.main()
self.assertEqual(result, 0)
self.assertEqual(len(created_bodies), 1)
self.assertTrue(created_bodies[0]["isDraft"])
def test_main_skips_active_pr_patch_when_already_up_to_date(self) -> None:
env = {
"SYSTEM_ACCESSTOKEN": "token",
"SYSTEM_COLLECTIONURI": "https://dev.azure.com/example",
"SYSTEM_TEAMPROJECT": "Project",
"BUILD_REPOSITORY_ID": "repo-id",
}
with patch.dict(os.environ, env, clear=False):
with patch.object(
sys,
"argv",
[
"ensure_rolling_pr.py",
"--repo-root",
"/tmp/repo",
"--workload",
"intune",
"--drift-branch",
"drift/intune",
"--baseline-branch",
"main",
"--pr-title",
"Intune drift review (rolling)",
],
):
with patch.object(
self.module,
"_query_prs",
return_value=[
{
"pullRequestId": 123,
"title": "Intune drift review (rolling)",
"description": "Existing description with summary",
"completionOptions": {"mergeStrategy": "rebase"},
"url": "https://dev.azure.com/example/_apis/git/repositories/repo/pullRequests/123",
}
],
):
with patch.object(self.module, "_request_json") as request_json:
result = self.module.main()
self.assertEqual(result, 0)
request_json.assert_not_called()
if __name__ == "__main__":
unittest.main()

View File

@@ -0,0 +1,252 @@
from __future__ import annotations
import importlib.util
import json
import tempfile
import unittest
from pathlib import Path
from types import SimpleNamespace
from unittest.mock import MagicMock, patch
MODULE_PATH = Path(__file__).resolve().parents[1] / "scripts" / "export_entra_baseline.py"
def load_module():
spec = importlib.util.spec_from_file_location("export_entra_baseline", MODULE_PATH)
if spec is None or spec.loader is None:
raise RuntimeError(f"Unable to load module from {MODULE_PATH}")
module = importlib.util.module_from_spec(spec)
spec.loader.exec_module(module)
return module
class ExportEntraBaselineTests(unittest.TestCase):
@classmethod
def setUpClass(cls) -> None:
cls.module = load_module()
def _namespace(self, root: Path, fail_on_export_error: str) -> SimpleNamespace:
return SimpleNamespace(
root=str(root),
token="token-value",
include_named_locations="true",
include_authentication_strengths="false",
include_conditional_access="false",
include_enterprise_applications="false",
include_app_registrations="false",
enterprise_app_workers=1,
fail_on_export_error=fail_on_export_error,
previous_snapshot_ref="",
)
def test_requested_export_error_is_fatal_by_default(self) -> None:
with tempfile.TemporaryDirectory() as td:
root = Path(td) / "entra"
root.mkdir(parents=True, exist_ok=True)
args = self._namespace(root=root, fail_on_export_error="true")
with (
patch.object(self.module, "parse_args", return_value=args),
patch.object(self.module, "GraphClient") as graph_client_cls,
):
graph_client = MagicMock()
graph_client.get_object.return_value = ({"value": []}, None)
graph_client.get_collection.return_value = ([], "HTTP 500")
graph_client_cls.return_value = graph_client
result = self.module.main()
self.assertEqual(result, 2)
def test_requested_export_error_can_be_non_fatal_when_disabled(self) -> None:
with tempfile.TemporaryDirectory() as td:
root = Path(td) / "entra"
root.mkdir(parents=True, exist_ok=True)
args = self._namespace(root=root, fail_on_export_error="false")
with (
patch.object(self.module, "parse_args", return_value=args),
patch.object(self.module, "GraphClient") as graph_client_cls,
):
graph_client = MagicMock()
graph_client.get_object.return_value = ({"value": []}, None)
graph_client.get_collection.return_value = ([], "HTTP 500")
graph_client_cls.return_value = graph_client
result = self.module.main()
self.assertEqual(result, 0)
def test_normalize_resolution_error_suppresses_transient_dns_variants(self) -> None:
transient_samples = [
"<urlopen error [Errno -3] Temporary failure in name resolution>",
"Temporary failure resolving 'graph.microsoft.com'",
"Failed to resolve host graph.microsoft.com",
"getaddrinfo failed",
]
for sample in transient_samples:
with self.subTest(sample=sample):
self.assertEqual(self.module.normalize_resolution_error(sample), "")
def test_normalize_resolution_error_keeps_non_transient_http_error(self) -> None:
self.assertEqual(self.module.normalize_resolution_error("HTTP 403"), "HTTP 403")
def test_normalize_branch_name_ignores_unresolved_macro(self) -> None:
self.assertEqual(self.module._normalize_branch_name("$(DRIFT_BRANCH_ENTRA)"), "")
def test_required_resource_resolution_backfills_unresolved_from_previous(self) -> None:
current = [
{
"resourceAppId": "00000003-0000-0000-c000-000000000000",
"resourceDisplayName": "Unresolved",
"permissions": [
{
"id": "perm-id-1",
"type": "Scope",
"value": "",
"displayName": "",
"description": "",
}
],
}
]
previous = [
{
"resourceAppId": "00000003-0000-0000-c000-000000000000",
"resourceDisplayName": "Microsoft Graph",
"permissions": [
{
"id": "perm-id-1",
"type": "Scope",
"value": "User.Read.All",
"displayName": "Read all users' full profiles",
"description": "Allows the app to read full profiles.",
}
],
}
]
merged = self.module._merge_required_resource_access_resolution(current, previous)
self.assertEqual(merged[0]["resourceDisplayName"], "Microsoft Graph")
self.assertEqual(merged[0]["permissions"][0]["value"], "User.Read.All")
self.assertEqual(merged[0]["permissions"][0]["displayName"], "Read all users' full profiles")
unresolved_resources, unresolved_permissions = self.module._count_unresolved_required_permissions(merged)
self.assertEqual(unresolved_resources, 0)
self.assertEqual(unresolved_permissions, 0)
def test_app_role_resolution_backfills_unresolved_from_previous(self) -> None:
current = [
{
"resourceId": "resource-1",
"resourceDisplayName": "Unresolved",
"appRoleId": "role-1",
"appRoleValue": "",
"appRoleDisplayName": "",
"principalType": "ServicePrincipal",
}
]
previous = [
{
"resourceId": "resource-1",
"resourceDisplayName": "Office 365 Exchange Online",
"appRoleId": "role-1",
"appRoleValue": "Exchange.ManageAsApp",
"appRoleDisplayName": "Manage Exchange as application",
"principalType": "ServicePrincipal",
}
]
merged = self.module._merge_app_role_assignments_resolution(current, previous)
self.assertEqual(merged[0]["resourceDisplayName"], "Office 365 Exchange Online")
self.assertEqual(merged[0]["appRoleValue"], "Exchange.ManageAsApp")
self.assertEqual(merged[0]["appRoleDisplayName"], "Manage Exchange as application")
unresolved_resources, unresolved_roles = self.module._count_unresolved_app_role_assignments(merged)
self.assertEqual(unresolved_resources, 0)
self.assertEqual(unresolved_roles, 0)
def test_required_resource_access_uses_direct_appid_fallback_when_filter_returns_empty(self) -> None:
app = {
"requiredResourceAccess": [
{
"resourceAppId": "00000003-0000-0000-c000-000000000000",
"resourceAccess": [
{
"id": "e1fe6dd8-ba31-4d61-89e7-88639da4683d",
"type": "Scope",
}
],
}
]
}
client = MagicMock()
client.get_object.side_effect = [
({"value": []}, None),
(
{
"id": "sp-graph",
"appId": "00000003-0000-0000-c000-000000000000",
"displayName": "Microsoft Graph",
"appRoles": [],
"oauth2PermissionScopes": [
{
"id": "e1fe6dd8-ba31-4d61-89e7-88639da4683d",
"value": "User.Read",
"adminConsentDisplayName": "Sign in and read user profile",
"adminConsentDescription": "Allows sign-in and profile read.",
}
],
},
None,
),
]
resolved, unresolved_resources, unresolved_permissions, lookup_errors = self.module.resolve_required_resource_access(
app=app,
client=client,
resource_sp_by_appid={},
)
self.assertEqual(unresolved_resources, 0)
self.assertEqual(unresolved_permissions, 0)
self.assertEqual(lookup_errors, [])
self.assertEqual(resolved[0]["resourceDisplayName"], "Microsoft Graph")
self.assertEqual(resolved[0]["permissions"][0]["value"], "User.Read")
def test_load_resource_sp_cache_from_export_reads_enterprise_apps(self) -> None:
with tempfile.TemporaryDirectory() as td:
root = Path(td) / "entra"
export_dir = root / "Enterprise Applications"
export_dir.mkdir(parents=True, exist_ok=True)
payload = {
"id": "sp-graph",
"appId": "00000003-0000-0000-c000-000000000000",
"displayName": "Microsoft Graph",
"appRoles": [{"id": "role-1", "value": "Directory.Read.All"}],
"oauth2PermissionScopes": [{"id": "scope-1", "value": "User.Read"}],
}
(export_dir / "Microsoft Graph__sp-graph.json").write_text(json.dumps(payload), encoding="utf-8")
cache = self.module._load_resource_sp_cache_from_export(root)
self.assertIn("00000003-0000-0000-c000-000000000000", cache)
graph = cache["00000003-0000-0000-c000-000000000000"]
self.assertEqual(graph["displayName"], "Microsoft Graph")
self.assertEqual(graph["appRoles"][0]["value"], "Directory.Read.All")
self.assertEqual(graph["oauth2PermissionScopes"][0]["value"], "User.Read")
def test_load_resource_sp_cache_from_export_ignores_invalid_files(self) -> None:
with tempfile.TemporaryDirectory() as td:
root = Path(td) / "entra"
export_dir = root / "Enterprise Applications"
export_dir.mkdir(parents=True, exist_ok=True)
(export_dir / "invalid.json").write_text("{", encoding="utf-8")
(export_dir / "missing-appid.json").write_text(json.dumps({"id": "sp-only"}), encoding="utf-8")
cache = self.module._load_resource_sp_cache_from_export(root)
self.assertEqual(cache, {})
if __name__ == "__main__":
unittest.main()

View File

@@ -0,0 +1,164 @@
from __future__ import annotations
import importlib.util
import json
import subprocess
import tempfile
import unittest
from pathlib import Path
MODULE_PATH = Path(__file__).resolve().parents[1] / "scripts" / "filter_entra_enrichment_noise.py"
def load_module():
spec = importlib.util.spec_from_file_location("filter_entra_enrichment_noise", MODULE_PATH)
if spec is None or spec.loader is None:
raise RuntimeError(f"Unable to load module from {MODULE_PATH}")
module = importlib.util.module_from_spec(spec)
spec.loader.exec_module(module)
return module
def _git(repo: Path, *args: str) -> None:
subprocess.run(
["git", *args],
cwd=str(repo),
check=True,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
)
class FilterEntraEnrichmentNoiseTests(unittest.TestCase):
@classmethod
def setUpClass(cls) -> None:
cls.module = load_module()
def test_is_enrichment_only_change_true(self) -> None:
old_text = json.dumps(
{
"displayName": "App",
"requiredResourceAccess": [{"resourceAppId": "00000003-0000-0000-c000-000000000000"}],
"requiredResourceAccessResolved": [{"resourceDisplayName": "Microsoft Graph"}],
"resolutionStatus": {"requiredResourceAccess": {"unresolvedPermissionCount": 0}},
}
)
new_text = json.dumps(
{
"displayName": "App",
"requiredResourceAccess": [{"resourceAppId": "00000003-0000-0000-c000-000000000000"}],
"requiredResourceAccessResolved": [{"resourceDisplayName": "Unresolved"}],
"resolutionStatus": {"requiredResourceAccess": {"unresolvedPermissionCount": 6}},
}
)
self.assertTrue(self.module._is_enrichment_only_change(old_text, new_text))
def test_is_enrichment_only_change_false_when_config_changes(self) -> None:
old_text = json.dumps(
{
"displayName": "App",
"requiredResourceAccess": [{"resourceAppId": "00000003-0000-0000-c000-000000000000"}],
}
)
new_text = json.dumps(
{
"displayName": "App",
"requiredResourceAccess": [{"resourceAppId": "11111111-0000-0000-c000-000000000000"}],
}
)
self.assertFalse(self.module._is_enrichment_only_change(old_text, new_text))
def test_filter_reverts_only_enrichment_changes(self) -> None:
with tempfile.TemporaryDirectory() as td:
repo = Path(td)
_git(repo, "init")
_git(repo, "config", "user.email", "tester@example.com")
_git(repo, "config", "user.name", "Tester")
workload_dir = repo / "tenant-state" / "entra" / "App Registrations"
workload_dir.mkdir(parents=True, exist_ok=True)
file_path = workload_dir / "Test App__id.json"
baseline = {
"displayName": "App",
"requiredResourceAccess": [{"resourceAppId": "00000003-0000-0000-c000-000000000000"}],
"requiredResourceAccessResolved": [{"resourceDisplayName": "Microsoft Graph"}],
"resolutionStatus": {"requiredResourceAccess": {"unresolvedPermissionCount": 0}},
}
file_path.write_text(json.dumps(baseline, indent=2) + "\n", encoding="utf-8")
_git(repo, "add", ".")
_git(repo, "commit", "-m", "baseline")
enrichment_only = {
"displayName": "App",
"requiredResourceAccess": [{"resourceAppId": "00000003-0000-0000-c000-000000000000"}],
"requiredResourceAccessResolved": [{"resourceDisplayName": "Unresolved"}],
"resolutionStatus": {"requiredResourceAccess": {"unresolvedPermissionCount": 6}},
}
file_path.write_text(json.dumps(enrichment_only, indent=2) + "\n", encoding="utf-8")
residual_before = self.module.find_enrichment_only_modified_files(
repo_root=repo,
workload_root="tenant-state/entra",
)
self.assertEqual(residual_before, ["tenant-state/entra/App Registrations/Test App__id.json"])
reverted = self.module.filter_enrichment_only_files(repo_root=repo, workload_root="tenant-state/entra")
self.assertEqual(reverted, ["tenant-state/entra/App Registrations/Test App__id.json"])
residual_after = self.module.find_enrichment_only_modified_files(
repo_root=repo,
workload_root="tenant-state/entra",
)
self.assertEqual(residual_after, [])
status = subprocess.run(
["git", "status", "--short"],
cwd=str(repo),
check=True,
capture_output=True,
text=True,
).stdout.strip()
self.assertEqual(status, "")
def test_filter_keeps_real_config_changes(self) -> None:
with tempfile.TemporaryDirectory() as td:
repo = Path(td)
_git(repo, "init")
_git(repo, "config", "user.email", "tester@example.com")
_git(repo, "config", "user.name", "Tester")
workload_dir = repo / "tenant-state" / "entra" / "App Registrations"
workload_dir.mkdir(parents=True, exist_ok=True)
file_path = workload_dir / "Test App__id.json"
baseline = {
"displayName": "App",
"requiredResourceAccess": [{"resourceAppId": "00000003-0000-0000-c000-000000000000"}],
"requiredResourceAccessResolved": [{"resourceDisplayName": "Microsoft Graph"}],
}
file_path.write_text(json.dumps(baseline, indent=2) + "\n", encoding="utf-8")
_git(repo, "add", ".")
_git(repo, "commit", "-m", "baseline")
config_changed = {
"displayName": "App",
"requiredResourceAccess": [{"resourceAppId": "11111111-0000-0000-c000-000000000000"}],
"requiredResourceAccessResolved": [{"resourceDisplayName": "Unresolved"}],
}
file_path.write_text(json.dumps(config_changed, indent=2) + "\n", encoding="utf-8")
reverted = self.module.filter_enrichment_only_files(repo_root=repo, workload_root="tenant-state/entra")
self.assertEqual(reverted, [])
status = subprocess.run(
["git", "status", "--short"],
cwd=str(repo),
check=True,
capture_output=True,
text=True,
).stdout
self.assertIn("Test App__id.json", status)
if __name__ == "__main__":
unittest.main()

View File

@@ -0,0 +1,109 @@
from __future__ import annotations
import importlib.util
import json
import subprocess
import tempfile
import unittest
from pathlib import Path
MODULE_PATH = Path(__file__).resolve().parents[1] / "scripts" / "filter_intune_partial_settings_noise.py"
def load_module():
spec = importlib.util.spec_from_file_location("filter_intune_partial_settings_noise", MODULE_PATH)
if spec is None or spec.loader is None:
raise RuntimeError(f"Unable to load module from {MODULE_PATH}")
module = importlib.util.module_from_spec(spec)
spec.loader.exec_module(module)
return module
def _git(repo: Path, *args: str) -> None:
subprocess.run(
["git", *args],
cwd=str(repo),
check=True,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
)
class FilterIntunePartialSettingsNoiseTests(unittest.TestCase):
@classmethod
def setUpClass(cls) -> None:
cls.module = load_module()
def test_partial_payload_detection(self) -> None:
self.assertTrue(self.module._is_partial_settings_payload({"settingCount": 1}))
self.assertTrue(self.module._is_partial_settings_payload({"settingCount": 2, "settings": []}))
self.assertFalse(self.module._is_partial_settings_payload({"settingCount": 0, "settings": []}))
self.assertFalse(self.module._is_partial_settings_payload({"settingCount": 2, "settings": [{"id": "0"}]}))
def test_restore_partial_settings_from_baseline(self) -> None:
with tempfile.TemporaryDirectory() as td:
repo = Path(td)
_git(repo, "init")
_git(repo, "config", "user.email", "tester@example.com")
_git(repo, "config", "user.name", "Tester")
workload_dir = repo / "tenant-state" / "intune" / "Settings Catalog"
workload_dir.mkdir(parents=True, exist_ok=True)
file_path = workload_dir / "Policy__abc.json"
baseline = {
"name": "Policy",
"settingCount": 2,
"settings": [{"id": "0"}, {"id": "1"}],
}
file_path.write_text(json.dumps(baseline, indent=2) + "\n", encoding="utf-8")
_git(repo, "add", ".")
_git(repo, "commit", "-m", "baseline")
partial = {
"name": "Policy",
"settingCount": 2,
}
file_path.write_text(json.dumps(partial, indent=2) + "\n", encoding="utf-8")
restored, unresolved = self.module.restore_partial_settings_from_baseline(
repo_root=repo,
backup_root=repo / "tenant-state" / "intune",
baseline_ref="HEAD",
)
self.assertEqual(restored, ["tenant-state/intune/Settings Catalog/Policy__abc.json"])
self.assertEqual(unresolved, [])
payload = json.loads(file_path.read_text(encoding="utf-8"))
self.assertEqual(payload["settings"], [{"id": "0"}, {"id": "1"}])
def test_partial_settings_unresolved_without_baseline(self) -> None:
with tempfile.TemporaryDirectory() as td:
repo = Path(td)
_git(repo, "init")
_git(repo, "config", "user.email", "tester@example.com")
_git(repo, "config", "user.name", "Tester")
(repo / "README.md").write_text("test\n", encoding="utf-8")
_git(repo, "add", ".")
_git(repo, "commit", "-m", "init")
workload_dir = repo / "tenant-state" / "intune" / "Settings Catalog"
workload_dir.mkdir(parents=True, exist_ok=True)
file_path = workload_dir / "Policy__missing.json"
file_path.write_text(json.dumps({"settingCount": 4}, indent=2) + "\n", encoding="utf-8")
restored, unresolved = self.module.restore_partial_settings_from_baseline(
repo_root=repo,
backup_root=repo / "tenant-state" / "intune",
baseline_ref="HEAD",
)
self.assertEqual(restored, [])
self.assertEqual(unresolved, ["tenant-state/intune/Settings Catalog/Policy__missing.json"])
if __name__ == "__main__":
unittest.main()

View File

@@ -0,0 +1,103 @@
from __future__ import annotations
import base64
import importlib.util
import sys
import unittest
from pathlib import Path
from unittest.mock import patch
MODULE_PATH = Path(__file__).resolve().parents[1] / "scripts" / "queue_post_merge_restore.py"
def load_module():
# Preload common helper so the script can import it.
common_path = MODULE_PATH.parent / "common.py"
common_spec = importlib.util.spec_from_file_location("common", common_path)
if common_spec is not None and common_spec.loader is not None:
common_mod = importlib.util.module_from_spec(common_spec)
sys.modules["common"] = common_mod
common_spec.loader.exec_module(common_mod)
module_name = "queue_post_merge_restore"
spec = importlib.util.spec_from_file_location(module_name, MODULE_PATH)
if spec is None or spec.loader is None:
raise RuntimeError(f"Unable to load module from {MODULE_PATH}")
module = importlib.util.module_from_spec(spec)
sys.modules[module_name] = module
spec.loader.exec_module(module)
return module
def _marker(path: str) -> str:
encoded = base64.urlsafe_b64encode(path.encode("utf-8")).decode("ascii").rstrip("=")
return f"Automation marker: AUTO-CHANGE-TICKET:{encoded}"
class QueuePostMergeRestoreTests(unittest.TestCase):
@classmethod
def setUpClass(cls) -> None:
cls.module = load_module()
def test_ticket_path_from_content_decodes_marker(self) -> None:
path = "tenant-state/intune/Device Configurations/macOS - WiFi TEST_macOSWiFiConfiguration__id.json"
content = f"Header\n{_marker(path)}\nBody"
self.assertEqual(self.module._ticket_path_from_content(content), path)
def test_rejected_ticket_paths_uses_latest_decision(self) -> None:
accepted_path = "tenant-state/intune/Settings Catalog/A.json"
rejected_path = "tenant-state/intune/Settings Catalog/B.json"
threads = [
{
"comments": [
{"id": 1, "parentCommentId": 0, "content": _marker(accepted_path)},
{"id": 2, "parentCommentId": 0, "content": "/reject"},
{"id": 3, "parentCommentId": 0, "content": "/accept"},
]
},
{
"comments": [
{"id": 1, "parentCommentId": 0, "content": _marker(rejected_path)},
{"id": 2, "parentCommentId": 0, "content": "/accept"},
{"id": 3, "parentCommentId": 0, "content": "/reject"},
]
},
]
self.assertEqual(self.module._rejected_ticket_paths(threads), [rejected_path])
def test_queue_restore_pipeline_includes_selective_params(self) -> None:
captured: dict[str, object] = {}
def _fake_request(url: str, headers: dict[str, str], method: str = "GET", body: dict | None = None):
captured["url"] = url
captured["method"] = method
captured["body"] = body or {}
return {"id": 123}
with patch.object(self.module, "_request_json", side_effect=_fake_request):
self.module._queue_restore_pipeline(
collection_uri="https://dev.azure.com/org",
project="proj",
headers={"Authorization": "Bearer x"},
definition_id=42,
baseline_branch="main",
include_entra_update=False,
dry_run=False,
update_assignments=True,
remove_unmanaged=False,
max_workers=10,
exclude_csv="",
restore_mode="selective",
restore_paths_csv="tenant-state/intune/Device Configurations/macOS - WiFi TEST.json",
)
body = captured["body"]
self.assertIsInstance(body, dict)
template = body["templateParameters"]
self.assertEqual(template["restoreMode"], "selective")
self.assertIn("restorePathsCsv", template)
if __name__ == "__main__":
unittest.main()

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,157 @@
from __future__ import annotations
import json
import subprocess
import sys
import tempfile
import unittest
from pathlib import Path
SCRIPT_PATH = Path(__file__).resolve().parents[1] / "scripts" / "validate_backup_outputs.py"
def run_validator(*args: str) -> subprocess.CompletedProcess[str]:
cmd = [sys.executable, str(SCRIPT_PATH), *args]
return subprocess.run(cmd, check=False, text=True, capture_output=True)
class ValidateBackupOutputsTests(unittest.TestCase):
def test_intune_validation_passes_with_required_outputs(self) -> None:
with tempfile.TemporaryDirectory() as td:
base = Path(td)
root = base / "tenant-state" / "intune"
reports = base / "tenant-state" / "reports" / "intune"
(root / "Device Configurations").mkdir(parents=True, exist_ok=True)
reports.mkdir(parents=True, exist_ok=True)
(root / "Device Configurations" / "policy__id.json").write_text(
json.dumps({"id": "id-1", "displayName": "Policy"}) + "\n",
encoding="utf-8",
)
(reports / "policy-assignments.md").write_text("# report\n", encoding="utf-8")
(reports / "policy-assignments.csv").write_text("a,b\n", encoding="utf-8")
(reports / "object-inventory-all.csv").write_text("a,b\n", encoding="utf-8")
result = run_validator(
"--workload",
"intune",
"--mode",
"light",
"--root",
str(root),
"--reports-root",
str(reports),
)
self.assertEqual(result.returncode, 0, msg=result.stdout + result.stderr)
def test_intune_validation_fails_when_assignment_csv_missing(self) -> None:
with tempfile.TemporaryDirectory() as td:
base = Path(td)
root = base / "tenant-state" / "intune"
reports = base / "tenant-state" / "reports" / "intune"
(root / "Device Configurations").mkdir(parents=True, exist_ok=True)
reports.mkdir(parents=True, exist_ok=True)
(root / "Device Configurations" / "policy__id.json").write_text("{}", encoding="utf-8")
(reports / "policy-assignments.md").write_text("# report\n", encoding="utf-8")
(reports / "object-inventory-all.csv").write_text("a,b\n", encoding="utf-8")
result = run_validator(
"--workload",
"intune",
"--mode",
"full",
"--root",
str(root),
"--reports-root",
str(reports),
)
self.assertNotEqual(result.returncode, 0)
self.assertIn("Missing Intune assignment CSV report", result.stdout)
def test_entra_light_validation_allows_non_effective_enterprise_apps(self) -> None:
with tempfile.TemporaryDirectory() as td:
base = Path(td)
root = base / "tenant-state" / "entra"
reports = base / "tenant-state" / "reports" / "entra"
(root / "Named Locations").mkdir(parents=True, exist_ok=True)
reports.mkdir(parents=True, exist_ok=True)
(root / "Named Locations" / "Named Locations.md").write_text("# named\n", encoding="utf-8")
(reports / "object-inventory-all.csv").write_text("a,b\n", encoding="utf-8")
result = run_validator(
"--workload",
"entra",
"--mode",
"light",
"--root",
str(root),
"--reports-root",
str(reports),
"--include-named-locations",
"true",
"--include-enterprise-applications",
"true",
"--include-enterprise-applications-effective",
"false",
)
self.assertEqual(result.returncode, 0, msg=result.stdout + result.stderr)
def test_entra_light_validation_allows_non_effective_app_registrations(self) -> None:
with tempfile.TemporaryDirectory() as td:
base = Path(td)
root = base / "tenant-state" / "entra"
reports = base / "tenant-state" / "reports" / "entra"
(root / "Named Locations").mkdir(parents=True, exist_ok=True)
reports.mkdir(parents=True, exist_ok=True)
(root / "Named Locations" / "Named Locations.md").write_text("# named\n", encoding="utf-8")
(reports / "object-inventory-all.csv").write_text("a,b\n", encoding="utf-8")
result = run_validator(
"--workload",
"entra",
"--mode",
"light",
"--root",
str(root),
"--reports-root",
str(reports),
"--include-named-locations",
"true",
"--include-app-registrations",
"true",
"--include-app-registrations-effective",
"false",
)
self.assertEqual(result.returncode, 0, msg=result.stdout + result.stderr)
def test_entra_validation_fails_when_required_index_missing(self) -> None:
with tempfile.TemporaryDirectory() as td:
base = Path(td)
root = base / "tenant-state" / "entra"
reports = base / "tenant-state" / "reports" / "entra"
root.mkdir(parents=True, exist_ok=True)
reports.mkdir(parents=True, exist_ok=True)
(reports / "object-inventory-all.csv").write_text("a,b\n", encoding="utf-8")
result = run_validator(
"--workload",
"entra",
"--mode",
"full",
"--root",
str(root),
"--reports-root",
str(reports),
"--include-named-locations",
"true",
)
self.assertNotEqual(result.returncode, 0)
self.assertIn("Missing Entra export index for 'Named Locations'", result.stdout)
if __name__ == "__main__":
unittest.main()