feat(#58): L3 dormant trigger guard -- DORMANT-TRIGGERS.yaml + checker + orchestrator hook
P5-1 docs/architecture/DORMANT-TRIGGERS.yaml -- 5 entries (IMP-16/17/18/19 active + IMP-20 followup-linked #55). P5-2 scripts/check_dormant_triggers.py -- standalone, reads registry, scans tree + diff, writes .orchestrator/dormant_alerts.json, exit 0 always. P5-3 orchestrator.py -- _check_dormant_triggers() helper + Stage 4->5 informational alert branch (skips audit-only, never blocks). P5-4 tests/orchestrator_unit/test_dormant_triggers.py -- 30 cases (yaml schema, registry contents, checker matching, false-positive guards, manual-evidence skip, orchestrator branch, audit bypass, governance ref). P5-5 PROJECT-INTENT-AND-GOVERNANCE.md -- single anti-patterns row referencing the L3 registry as binding contract surface. Tests: pytest -q tests = 337 passed (baseline 307 + 30 new). Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
191
scripts/check_dormant_triggers.py
Normal file
191
scripts/check_dormant_triggers.py
Normal file
@@ -0,0 +1,191 @@
|
||||
"""Dormant trigger guard — L3 machine-readable check (issue #58, P5-2).
|
||||
|
||||
Reads docs/architecture/DORMANT-TRIGGERS.yaml, scans the changed-file surface
|
||||
(working tree via `git status --porcelain` + recent commit via
|
||||
`git diff HEAD~1..HEAD --name-only`), and writes any matching activation
|
||||
candidates to .orchestrator/dormant_alerts.json.
|
||||
|
||||
Guardrails (per Stage 1 scope-lock) :
|
||||
- Informational only. Exit code is ALWAYS 0 — orchestrator never blocks on alerts.
|
||||
- manual_evidence_required entries are skipped (require human gate).
|
||||
- followup_issue entries are skipped (already tracked by the open follow-up).
|
||||
- No LLM call. Deterministic file-pattern + content-pattern matching only.
|
||||
- No hardcoding : the registry yaml is the single source of truth.
|
||||
|
||||
Run :
|
||||
python scripts/check_dormant_triggers.py
|
||||
"""
|
||||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
import re
|
||||
import subprocess
|
||||
import sys
|
||||
from datetime import datetime, timezone
|
||||
from pathlib import Path
|
||||
|
||||
import yaml
|
||||
|
||||
REPO_ROOT = Path(__file__).resolve().parent.parent
|
||||
REGISTRY_PATH = REPO_ROOT / "docs" / "architecture" / "DORMANT-TRIGGERS.yaml"
|
||||
ALERT_OUT_PATH = REPO_ROOT / ".orchestrator" / "dormant_alerts.json"
|
||||
|
||||
|
||||
def load_registry(path: Path = REGISTRY_PATH) -> list[dict]:
|
||||
if not path.exists():
|
||||
return []
|
||||
with path.open("r", encoding="utf-8") as f:
|
||||
data = yaml.safe_load(f) or []
|
||||
if not isinstance(data, list):
|
||||
raise ValueError(f"{path} must be a YAML list of entries.")
|
||||
return data
|
||||
|
||||
|
||||
def _git_lines(args: list[str]) -> list[str]:
|
||||
try:
|
||||
out = subprocess.run(
|
||||
["git"] + args,
|
||||
cwd=str(REPO_ROOT),
|
||||
capture_output=True,
|
||||
text=True,
|
||||
timeout=20,
|
||||
check=False,
|
||||
)
|
||||
except (OSError, subprocess.TimeoutExpired):
|
||||
return []
|
||||
if out.returncode != 0:
|
||||
return []
|
||||
return [ln for ln in out.stdout.splitlines() if ln.strip()]
|
||||
|
||||
|
||||
def collect_changed_files() -> list[str]:
|
||||
files: set[str] = set()
|
||||
for ln in _git_lines(["status", "--porcelain"]):
|
||||
path = ln[3:].strip() if len(ln) >= 4 else ln.strip()
|
||||
if "->" in path:
|
||||
path = path.split("->", 1)[1].strip()
|
||||
path = path.strip('"')
|
||||
if path:
|
||||
files.add(path.replace("\\", "/"))
|
||||
for ln in _git_lines(["diff", "HEAD~1..HEAD", "--name-only"]):
|
||||
if ln.strip():
|
||||
files.add(ln.strip().replace("\\", "/"))
|
||||
return sorted(files)
|
||||
|
||||
|
||||
def _glob_to_regex(pat: str) -> str:
|
||||
"""Translate a posix-style glob with ``**`` to an anchored regex.
|
||||
|
||||
``**/`` matches zero or more directory levels (so ``src/**/*.py`` matches
|
||||
both ``src/adapter.py`` and ``src/foo/adapter.py``). ``*`` and ``?`` do
|
||||
NOT cross directory separators. Mirrors common ``.gitignore``-style
|
||||
semantics; ``fnmatch.fnmatch`` alone cannot express this.
|
||||
"""
|
||||
out: list[str] = []
|
||||
i = 0
|
||||
n = len(pat)
|
||||
while i < n:
|
||||
if pat[i : i + 3] == "**/":
|
||||
out.append("(?:.*/)?")
|
||||
i += 3
|
||||
elif pat[i : i + 2] == "**":
|
||||
out.append(".*")
|
||||
i += 2
|
||||
elif pat[i] == "*":
|
||||
out.append("[^/]*")
|
||||
i += 1
|
||||
elif pat[i] == "?":
|
||||
out.append("[^/]")
|
||||
i += 1
|
||||
else:
|
||||
out.append(re.escape(pat[i]))
|
||||
i += 1
|
||||
return "^" + "".join(out) + "$"
|
||||
|
||||
|
||||
def _glob_match(path: str, patterns: list[str]) -> bool:
|
||||
for pat in patterns:
|
||||
if re.match(_glob_to_regex(pat), path):
|
||||
return True
|
||||
return False
|
||||
|
||||
|
||||
def _content_match(file_path: Path, patterns: list[str]) -> list[str]:
|
||||
if not patterns or not file_path.exists() or not file_path.is_file():
|
||||
return []
|
||||
try:
|
||||
text = file_path.read_text(encoding="utf-8", errors="replace")
|
||||
except OSError:
|
||||
return []
|
||||
hits = []
|
||||
for pat in patterns:
|
||||
try:
|
||||
if re.search(pat, text):
|
||||
hits.append(pat)
|
||||
except re.error:
|
||||
if pat in text:
|
||||
hits.append(pat)
|
||||
return hits
|
||||
|
||||
|
||||
def check_entry(entry: dict, changed: list[str]) -> dict | None:
|
||||
trig = entry.get("trigger") or {}
|
||||
if trig.get("manual_evidence_required"):
|
||||
return None
|
||||
if entry.get("followup_issue"):
|
||||
return None
|
||||
file_patterns = trig.get("file_patterns") or []
|
||||
content_patterns = trig.get("content_patterns") or []
|
||||
if not file_patterns:
|
||||
return None
|
||||
matched_files = [p for p in changed if _glob_match(p, file_patterns)]
|
||||
if not matched_files:
|
||||
return None
|
||||
if content_patterns:
|
||||
hits: list[dict] = []
|
||||
for mf in matched_files:
|
||||
hit_patterns = _content_match(REPO_ROOT / mf, content_patterns)
|
||||
if hit_patterns:
|
||||
hits.append({"file": mf, "patterns": hit_patterns})
|
||||
if not hits:
|
||||
return None
|
||||
match_info = {"files": [h["file"] for h in hits], "content_hits": hits}
|
||||
else:
|
||||
match_info = {"files": matched_files, "content_hits": []}
|
||||
return {
|
||||
"issue": entry.get("issue"),
|
||||
"title": entry.get("title"),
|
||||
"doc": entry.get("doc"),
|
||||
"status": entry.get("status"),
|
||||
"on_trigger": entry.get("on_trigger"),
|
||||
"match": match_info,
|
||||
}
|
||||
|
||||
|
||||
def write_alerts(alerts: list[dict], path: Path = ALERT_OUT_PATH) -> None:
|
||||
path.parent.mkdir(parents=True, exist_ok=True)
|
||||
payload = {
|
||||
"generated_at": datetime.now(timezone.utc).isoformat(),
|
||||
"registry": str(REGISTRY_PATH.relative_to(REPO_ROOT)).replace("\\", "/"),
|
||||
"alerts": alerts,
|
||||
}
|
||||
path.write_text(json.dumps(payload, indent=2, ensure_ascii=False), encoding="utf-8")
|
||||
|
||||
|
||||
def main() -> int:
|
||||
entries = load_registry()
|
||||
changed = collect_changed_files()
|
||||
alerts = [a for a in (check_entry(e, changed) for e in entries) if a]
|
||||
write_alerts(alerts)
|
||||
if alerts:
|
||||
print(f"[dormant-trigger-guard] {len(alerts)} alert(s) written -> "
|
||||
f"{ALERT_OUT_PATH.relative_to(REPO_ROOT)}")
|
||||
for a in alerts:
|
||||
print(f" - #{a['issue']} {a['title']} (files: {len(a['match']['files'])})")
|
||||
else:
|
||||
print("[dormant-trigger-guard] no dormant trigger alerts on current change surface.")
|
||||
return 0
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
sys.exit(main())
|
||||
Reference in New Issue
Block a user