"""Dormant trigger guard — L3 machine-readable check (issue #58, P5-2). Reads docs/architecture/DORMANT-TRIGGERS.yaml, scans the changed-file surface (working tree via `git status --porcelain` + recent commit via `git diff HEAD~1..HEAD --name-only`), and writes any matching activation candidates to .orchestrator/dormant_alerts.json. Guardrails (per Stage 1 scope-lock) : - Informational only. Exit code is ALWAYS 0 — orchestrator never blocks on alerts. - manual_evidence_required entries are skipped (require human gate). - followup_issue entries are skipped (already tracked by the open follow-up). - No LLM call. Deterministic file-pattern + content-pattern matching only. - No hardcoding : the registry yaml is the single source of truth. Run : python scripts/check_dormant_triggers.py """ from __future__ import annotations import json import re import subprocess import sys from datetime import datetime, timezone from pathlib import Path import yaml REPO_ROOT = Path(__file__).resolve().parent.parent REGISTRY_PATH = REPO_ROOT / "docs" / "architecture" / "DORMANT-TRIGGERS.yaml" ALERT_OUT_PATH = REPO_ROOT / ".orchestrator" / "dormant_alerts.json" def load_registry(path: Path = REGISTRY_PATH) -> list[dict]: if not path.exists(): return [] with path.open("r", encoding="utf-8") as f: data = yaml.safe_load(f) or [] if not isinstance(data, list): raise ValueError(f"{path} must be a YAML list of entries.") return data def _git_lines(args: list[str]) -> list[str]: try: out = subprocess.run( ["git"] + args, cwd=str(REPO_ROOT), capture_output=True, text=True, timeout=20, check=False, ) except (OSError, subprocess.TimeoutExpired): return [] if out.returncode != 0: return [] return [ln for ln in out.stdout.splitlines() if ln.strip()] def collect_changed_files() -> list[str]: files: set[str] = set() for ln in _git_lines(["status", "--porcelain"]): path = ln[3:].strip() if len(ln) >= 4 else ln.strip() if "->" in path: path = path.split("->", 1)[1].strip() path = path.strip('"') if path: files.add(path.replace("\\", "/")) for ln in _git_lines(["diff", "HEAD~1..HEAD", "--name-only"]): if ln.strip(): files.add(ln.strip().replace("\\", "/")) return sorted(files) def _glob_to_regex(pat: str) -> str: """Translate a posix-style glob with ``**`` to an anchored regex. ``**/`` matches zero or more directory levels (so ``src/**/*.py`` matches both ``src/adapter.py`` and ``src/foo/adapter.py``). ``*`` and ``?`` do NOT cross directory separators. Mirrors common ``.gitignore``-style semantics; ``fnmatch.fnmatch`` alone cannot express this. """ out: list[str] = [] i = 0 n = len(pat) while i < n: if pat[i : i + 3] == "**/": out.append("(?:.*/)?") i += 3 elif pat[i : i + 2] == "**": out.append(".*") i += 2 elif pat[i] == "*": out.append("[^/]*") i += 1 elif pat[i] == "?": out.append("[^/]") i += 1 else: out.append(re.escape(pat[i])) i += 1 return "^" + "".join(out) + "$" def _glob_match(path: str, patterns: list[str]) -> bool: for pat in patterns: if re.match(_glob_to_regex(pat), path): return True return False def _content_match(file_path: Path, patterns: list[str]) -> list[str]: if not patterns or not file_path.exists() or not file_path.is_file(): return [] try: text = file_path.read_text(encoding="utf-8", errors="replace") except OSError: return [] hits = [] for pat in patterns: try: if re.search(pat, text): hits.append(pat) except re.error: if pat in text: hits.append(pat) return hits def check_entry(entry: dict, changed: list[str]) -> dict | None: trig = entry.get("trigger") or {} if trig.get("manual_evidence_required"): return None if entry.get("followup_issue"): return None file_patterns = trig.get("file_patterns") or [] content_patterns = trig.get("content_patterns") or [] if not file_patterns: return None matched_files = [p for p in changed if _glob_match(p, file_patterns)] if not matched_files: return None if content_patterns: hits: list[dict] = [] for mf in matched_files: hit_patterns = _content_match(REPO_ROOT / mf, content_patterns) if hit_patterns: hits.append({"file": mf, "patterns": hit_patterns}) if not hits: return None match_info = {"files": [h["file"] for h in hits], "content_hits": hits} else: match_info = {"files": matched_files, "content_hits": []} return { "issue": entry.get("issue"), "title": entry.get("title"), "doc": entry.get("doc"), "status": entry.get("status"), "on_trigger": entry.get("on_trigger"), "match": match_info, } def write_alerts(alerts: list[dict], path: Path = ALERT_OUT_PATH) -> None: path.parent.mkdir(parents=True, exist_ok=True) payload = { "generated_at": datetime.now(timezone.utc).isoformat(), "registry": str(REGISTRY_PATH.relative_to(REPO_ROOT)).replace("\\", "/"), "alerts": alerts, } path.write_text(json.dumps(payload, indent=2, ensure_ascii=False), encoding="utf-8") def main() -> int: entries = load_registry() changed = collect_changed_files() alerts = [a for a in (check_entry(e, changed) for e in entries) if a] write_alerts(alerts) if alerts: print(f"[dormant-trigger-guard] {len(alerts)} alert(s) written -> " f"{ALERT_OUT_PATH.relative_to(REPO_ROOT)}") for a in alerts: print(f" - #{a['issue']} {a['title']} (files: {len(a['match']['files'])})") else: print("[dormant-trigger-guard] no dormant trigger alerts on current change surface.") return 0 if __name__ == "__main__": sys.exit(main())