Files
astral/scripts/queue_post_merge_restore.py
Tomas Kracmar 17d745bdac Sync from dev @ 252c1cf
Source: main (252c1cf)
Excluded: live tenant exports, generated artifacts, and dev-only tooling.
2026-04-17 15:57:35 +02:00

448 lines
15 KiB
Python

#!/usr/bin/env python3
"""Queue restore automatically after merged rolling PR that contains /reject decisions."""
from __future__ import annotations
import argparse
import base64
import datetime as dt
import json
import os
import re
import sys
import urllib.parse
from pathlib import Path
from typing import Any
# common.py lives in the same directory; ensure it can be imported when the
# script is executed directly.
_sys_path_inserted = False
if __file__:
_script_dir = str(Path(__file__).resolve().parent)
if _script_dir not in sys.path:
sys.path.insert(0, _script_dir)
_sys_path_inserted = True
import common
if _sys_path_inserted:
sys.path.pop(0)
_env_text = common.env_text
_env_bool = common.env_bool
_request_json = common.request_json
REJECT_CMD_RE = re.compile(r"(?im)^\s*(?:/|#)?reject\b")
DECISION_RE = re.compile(r"(?im)^\s*(?:/|#)?(?P<decision>reject|accept)\b")
AUTO_TICKET_THREAD_PREFIX = "AUTO-CHANGE-TICKET:"
MERGE_MARKER_PREFIX = "AUTO-RESTORE-AFTER-MERGE:"
def _normalize_branch(branch: str) -> str:
b = branch.strip()
if b.startswith("refs/heads/"):
return b[len("refs/heads/") :]
return b
def _ref_from_branch(branch: str) -> str:
return f"refs/heads/{_normalize_branch(branch)}"
def _parse_iso_utc(value: str) -> dt.datetime | None:
text = (value or "").strip()
if not text:
return None
if text.endswith("Z"):
text = text[:-1] + "+00:00"
try:
parsed = dt.datetime.fromisoformat(text)
except ValueError:
return None
if parsed.tzinfo is None:
parsed = parsed.replace(tzinfo=dt.timezone.utc)
return parsed.astimezone(dt.timezone.utc)
def _query_completed_prs(
repo_api: str,
headers: dict[str, str],
source_ref: str,
target_ref: str,
) -> list[dict[str, Any]]:
query = urllib.parse.urlencode(
{
"searchCriteria.status": "completed",
"searchCriteria.sourceRefName": source_ref,
"searchCriteria.targetRefName": target_ref,
"api-version": "7.1",
},
quote_via=urllib.parse.quote,
safe="/",
)
payload = _request_json(f"{repo_api}/pullrequests?{query}", headers=headers)
items = payload.get("value", []) if isinstance(payload, dict) else []
return sorted(items, key=lambda x: x.get("closedDate", ""), reverse=True)
def _threads(repo_api: str, headers: dict[str, str], pr_id: int) -> list[dict[str, Any]]:
payload = _request_json(
f"{repo_api}/pullrequests/{pr_id}/threads?api-version=7.1",
headers=headers,
)
return payload.get("value", []) if isinstance(payload, dict) else []
def _thread_comment_contents(threads: list[dict[str, Any]]) -> list[str]:
out: list[str] = []
for thread in threads:
comments = thread.get("comments", []) if isinstance(thread.get("comments"), list) else []
for comment in comments:
out.append(str(comment.get("content", "") or ""))
return out
def _ticket_path_from_content(content: str) -> str | None:
marker_re = re.compile(
r"(?:^|\n)\s*(?:Automation marker:\s*)?"
+ re.escape(AUTO_TICKET_THREAD_PREFIX)
+ r"(?P<id>[A-Za-z0-9_-]+)\s*(?:$|\n)"
)
match = marker_re.search(content or "")
if not match:
return None
encoded = match.group("id")
padding = "=" * ((4 - len(encoded) % 4) % 4)
try:
return base64.urlsafe_b64decode((encoded + padding).encode("ascii")).decode("utf-8")
except Exception:
return None
def _latest_thread_decision(comments: list[dict[str, Any]]) -> str | None:
decision: str | None = None
def _comment_sort_key(comment: dict[str, Any]) -> tuple[int, int]:
try:
comment_id = int(comment.get("id", 0))
except Exception:
comment_id = 0
try:
parent_id = int(comment.get("parentCommentId", 0))
except Exception:
parent_id = 0
return (comment_id, parent_id)
for comment in sorted(comments, key=_comment_sort_key):
content = str(comment.get("content", "") or "")
match = DECISION_RE.search(content)
if match:
decision = match.group("decision").lower()
return decision
def _rejected_ticket_paths(threads: list[dict[str, Any]]) -> list[str]:
rejected: set[str] = set()
for thread in threads:
comments = thread.get("comments", []) if isinstance(thread.get("comments"), list) else []
marker_path: str | None = None
for comment in comments:
marker_path = _ticket_path_from_content(str(comment.get("content", "") or ""))
if marker_path:
break
if not marker_path:
continue
decision = _latest_thread_decision(comments)
if decision == "reject":
rejected.add(marker_path)
return sorted(rejected)
def _has_reject_signal(comments: list[str]) -> bool:
for content in comments:
if REJECT_CMD_RE.search(content):
return True
if "Auto-action: /reject detected." in content:
return True
return False
def _has_merge_marker(comments: list[str], merge_commit: str) -> bool:
marker = f"Automation marker: {MERGE_MARKER_PREFIX}{merge_commit}"
return any(marker in content for content in comments)
def _is_permission_error(exc: Exception) -> bool:
msg = str(exc).lower()
return "http 403" in msg or "forbidden" in msg
def _normalize_exclude_csv(value: str) -> str:
normalized = str(value or "").strip()
if normalized.lower() in {"", "none", "null", "n/a", "-", "_none_"}:
return ""
return normalized
def _diagnose_queue_permission(
collection_uri: str,
project: str,
headers: dict[str, str],
definition_id: int,
) -> None:
definition_url = (
f"{collection_uri}/{project}/_apis/build/definitions/{definition_id}"
"?api-version=7.1"
)
try:
payload = _request_json(definition_url, headers=headers)
definition_name = str(payload.get("name", "") or "").strip()
print(
"Diagnostic: restore pipeline definition is readable "
f"(id={definition_id}, name='{definition_name or 'n/a'}')."
)
print(
"Diagnostic: queue call was forbidden, so missing permission is likely "
"'Queue builds' on that restore pipeline (or pipeline is not authorized to use it)."
)
except Exception as diag_exc:
print(
"Diagnostic: unable to read restore pipeline definition "
f"id={definition_id}. Details: {diag_exc}"
)
print(
"Diagnostic: likely wrong definition ID, wrong project, or missing 'View builds' permission "
"for the calling pipeline identity."
)
def _queue_restore_pipeline(
collection_uri: str,
project: str,
headers: dict[str, str],
definition_id: int,
baseline_branch: str,
include_entra_update: bool,
dry_run: bool,
update_assignments: bool,
remove_unmanaged: bool,
max_workers: int,
exclude_csv: str,
restore_mode: str = "full",
restore_paths_csv: str = "",
) -> dict[str, Any]:
build_api = f"{collection_uri}/{project}/_apis/build/builds?api-version=7.1"
template_parameters = {
"dryRun": dry_run,
"updateAssignments": update_assignments,
"removeObjectsNotInBaseline": remove_unmanaged,
"includeEntraUpdate": include_entra_update,
"baselineBranch": baseline_branch,
"maxWorkers": max_workers,
"restoreMode": restore_mode,
}
if restore_mode == "selective" and restore_paths_csv.strip():
template_parameters["restorePathsCsv"] = restore_paths_csv.strip()
exclude_csv = _normalize_exclude_csv(exclude_csv)
if exclude_csv:
template_parameters["excludeCsv"] = exclude_csv
body = {
"definition": {"id": definition_id},
"sourceBranch": _ref_from_branch(baseline_branch),
"templateParameters": template_parameters,
}
return _request_json(build_api, headers=headers, method="POST", body=body)
def _post_pr_thread(repo_api: str, headers: dict[str, str], pr_id: int, content: str) -> None:
_request_json(
f"{repo_api}/pullrequests/{pr_id}/threads?api-version=7.1",
headers=headers,
method="POST",
body={
"comments": [
{
"parentCommentId": 0,
"content": content,
"commentType": 1,
}
],
"status": 1,
},
)
def main() -> int:
parser = argparse.ArgumentParser(description="Queue restore after merged rolling PR with /reject decisions")
parser.add_argument("--workload", required=True, choices=["intune", "entra"])
parser.add_argument("--drift-branch", required=True)
parser.add_argument("--baseline-branch", required=True)
args = parser.parse_args()
if not _env_bool("AUTO_REMEDIATE_AFTER_MERGE", False):
print("Post-merge auto-remediation disabled (set AUTO_REMEDIATE_AFTER_MERGE=true).")
return 0
token = os.environ.get("SYSTEM_ACCESSTOKEN", "").strip()
if not token:
raise SystemExit("SYSTEM_ACCESSTOKEN is empty.")
definition_raw = _env_text("AUTO_REMEDIATE_RESTORE_PIPELINE_ID", "")
if not definition_raw:
print(
"Post-merge auto-remediation queue skipped: "
"AUTO_REMEDIATE_RESTORE_PIPELINE_ID is empty."
)
return 0
try:
definition_id = int(definition_raw)
except ValueError as exc:
raise SystemExit(f"Invalid AUTO_REMEDIATE_RESTORE_PIPELINE_ID: {definition_raw}") from exc
max_workers_raw = _env_text("AUTO_REMEDIATE_MAX_WORKERS", "10")
try:
max_workers = int(max_workers_raw)
except ValueError as exc:
raise SystemExit(f"Invalid AUTO_REMEDIATE_MAX_WORKERS: {max_workers_raw}") from exc
lookback_hours_raw = _env_text("AUTO_REMEDIATE_AFTER_MERGE_LOOKBACK_HOURS", "168")
try:
lookback_hours = int(lookback_hours_raw)
except ValueError as exc:
raise SystemExit(f"Invalid AUTO_REMEDIATE_AFTER_MERGE_LOOKBACK_HOURS: {lookback_hours_raw}") from exc
collection_uri = os.environ["SYSTEM_COLLECTIONURI"].rstrip("/")
project = os.environ["SYSTEM_TEAMPROJECT"]
repository_id = os.environ["BUILD_REPOSITORY_ID"]
include_entra_update = _env_bool("AUTO_REMEDIATE_INCLUDE_ENTRA_UPDATE", False)
dry_run = _env_bool("AUTO_REMEDIATE_DRY_RUN", False)
update_assignments = _env_bool("AUTO_REMEDIATE_UPDATE_ASSIGNMENTS", True)
remove_unmanaged = _env_bool("AUTO_REMEDIATE_REMOVE_OBJECTS", False)
exclude_csv = _normalize_exclude_csv(_env_text("AUTO_REMEDIATE_EXCLUDE_CSV", ""))
source_ref = _ref_from_branch(args.drift_branch)
target_ref = _ref_from_branch(args.baseline_branch)
repo_api = f"{collection_uri}/{project}/_apis/git/repositories/{repository_id}"
headers = {
"Authorization": f"Bearer {token}",
"Content-Type": "application/json",
"Accept": "application/json",
}
cutoff = dt.datetime.now(dt.timezone.utc) - dt.timedelta(hours=lookback_hours)
completed = _query_completed_prs(repo_api, headers, source_ref, target_ref)
candidate: dict[str, Any] | None = None
candidate_threads: list[dict[str, Any]] = []
candidate_comments: list[str] = []
for pr in completed:
closed_at = _parse_iso_utc(str(pr.get("closedDate", "") or ""))
if closed_at and closed_at < cutoff:
continue
merge_commit = (((pr.get("lastMergeCommit") or {}).get("commitId")) or "").strip()
if not merge_commit:
continue
pr_id = int(pr.get("pullRequestId"))
threads = _threads(repo_api, headers, pr_id)
comments = _thread_comment_contents(threads)
if not _has_reject_signal(comments):
continue
if _has_merge_marker(comments, merge_commit):
continue
candidate = pr
candidate_threads = threads
candidate_comments = comments
break
if not candidate:
print("No merged rolling PR requiring post-merge remediation was found.")
return 0
pr_id = int(candidate.get("pullRequestId"))
merge_commit = (((candidate.get("lastMergeCommit") or {}).get("commitId")) or "").strip()
rejected_paths = _rejected_ticket_paths(candidate_threads)
restore_mode = "full"
restore_paths_csv = ""
if args.workload == "intune" and rejected_paths:
restore_mode = "selective"
restore_paths_csv = ",".join(rejected_paths)
print(f"Post-merge remediation scope: selective ({len(rejected_paths)} rejected path(s)).")
for path in rejected_paths:
print(f" - {path}")
else:
print("Post-merge remediation scope: full.")
try:
queued = _queue_restore_pipeline(
collection_uri=collection_uri,
project=project,
headers=headers,
definition_id=definition_id,
baseline_branch=args.baseline_branch,
include_entra_update=include_entra_update,
dry_run=dry_run,
update_assignments=update_assignments,
remove_unmanaged=remove_unmanaged,
max_workers=max_workers,
exclude_csv=exclude_csv,
restore_mode=restore_mode,
restore_paths_csv=restore_paths_csv,
)
except Exception as exc:
if _is_permission_error(exc):
print(
"WARNING: Post-merge remediation queue skipped due permissions. "
f"Definition={definition_id}. Details: {exc}"
)
_diagnose_queue_permission(collection_uri, project, headers, definition_id)
print(
"Grant 'Queue builds' permission for this pipeline identity on the restore pipeline "
"and ensure the pipeline has access to run it."
)
return 0
raise
build_id = queued.get("id")
build_url = ((queued.get("_links") or {}).get("web") or {}).get("href", "")
if not build_url and build_id:
build_url = f"{collection_uri}/{project}/_build/results?buildId={build_id}"
marker = f"Automation marker: {MERGE_MARKER_PREFIX}{merge_commit}"
comment = (
"Auto-remediation queued after merged rolling PR with reviewer /reject decision(s).\n\n"
f"Workload: {args.workload}\n"
f"Merged PR: #{pr_id}\n"
f"Merge commit: {merge_commit}\n"
f"Restore pipeline definition: {definition_id}\n"
f"Restore run: {build_url or '(queued)'}\n\n"
f"{marker}"
)
try:
_post_pr_thread(repo_api, headers, pr_id, comment)
except Exception as exc:
print(f"WARNING: Restore queued, but failed posting merge marker comment on PR #{pr_id}: {exc}")
print(
f"Queued post-merge remediation for PR #{pr_id} (merge_commit={merge_commit}, buildId={build_id})."
)
return 0
if __name__ == "__main__":
try:
raise SystemExit(main())
except Exception as exc:
print(f"WARNING: Failed post-merge remediation check: {exc}", file=sys.stderr)
raise