P5-1 docs/architecture/DORMANT-TRIGGERS.yaml -- 5 entries (IMP-16/17/18/19 active + IMP-20 followup-linked #55). P5-2 scripts/check_dormant_triggers.py -- standalone, reads registry, scans tree + diff, writes .orchestrator/dormant_alerts.json, exit 0 always. P5-3 orchestrator.py -- _check_dormant_triggers() helper + Stage 4->5 informational alert branch (skips audit-only, never blocks). P5-4 tests/orchestrator_unit/test_dormant_triggers.py -- 30 cases (yaml schema, registry contents, checker matching, false-positive guards, manual-evidence skip, orchestrator branch, audit bypass, governance ref). P5-5 PROJECT-INTENT-AND-GOVERNANCE.md -- single anti-patterns row referencing the L3 registry as binding contract surface. Tests: pytest -q tests = 337 passed (baseline 307 + 30 new). Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
192 lines
6.1 KiB
Python
192 lines
6.1 KiB
Python
"""Dormant trigger guard — L3 machine-readable check (issue #58, P5-2).
|
|
|
|
Reads docs/architecture/DORMANT-TRIGGERS.yaml, scans the changed-file surface
|
|
(working tree via `git status --porcelain` + recent commit via
|
|
`git diff HEAD~1..HEAD --name-only`), and writes any matching activation
|
|
candidates to .orchestrator/dormant_alerts.json.
|
|
|
|
Guardrails (per Stage 1 scope-lock) :
|
|
- Informational only. Exit code is ALWAYS 0 — orchestrator never blocks on alerts.
|
|
- manual_evidence_required entries are skipped (require human gate).
|
|
- followup_issue entries are skipped (already tracked by the open follow-up).
|
|
- No LLM call. Deterministic file-pattern + content-pattern matching only.
|
|
- No hardcoding : the registry yaml is the single source of truth.
|
|
|
|
Run :
|
|
python scripts/check_dormant_triggers.py
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
import json
|
|
import re
|
|
import subprocess
|
|
import sys
|
|
from datetime import datetime, timezone
|
|
from pathlib import Path
|
|
|
|
import yaml
|
|
|
|
REPO_ROOT = Path(__file__).resolve().parent.parent
|
|
REGISTRY_PATH = REPO_ROOT / "docs" / "architecture" / "DORMANT-TRIGGERS.yaml"
|
|
ALERT_OUT_PATH = REPO_ROOT / ".orchestrator" / "dormant_alerts.json"
|
|
|
|
|
|
def load_registry(path: Path = REGISTRY_PATH) -> list[dict]:
|
|
if not path.exists():
|
|
return []
|
|
with path.open("r", encoding="utf-8") as f:
|
|
data = yaml.safe_load(f) or []
|
|
if not isinstance(data, list):
|
|
raise ValueError(f"{path} must be a YAML list of entries.")
|
|
return data
|
|
|
|
|
|
def _git_lines(args: list[str]) -> list[str]:
|
|
try:
|
|
out = subprocess.run(
|
|
["git"] + args,
|
|
cwd=str(REPO_ROOT),
|
|
capture_output=True,
|
|
text=True,
|
|
timeout=20,
|
|
check=False,
|
|
)
|
|
except (OSError, subprocess.TimeoutExpired):
|
|
return []
|
|
if out.returncode != 0:
|
|
return []
|
|
return [ln for ln in out.stdout.splitlines() if ln.strip()]
|
|
|
|
|
|
def collect_changed_files() -> list[str]:
|
|
files: set[str] = set()
|
|
for ln in _git_lines(["status", "--porcelain"]):
|
|
path = ln[3:].strip() if len(ln) >= 4 else ln.strip()
|
|
if "->" in path:
|
|
path = path.split("->", 1)[1].strip()
|
|
path = path.strip('"')
|
|
if path:
|
|
files.add(path.replace("\\", "/"))
|
|
for ln in _git_lines(["diff", "HEAD~1..HEAD", "--name-only"]):
|
|
if ln.strip():
|
|
files.add(ln.strip().replace("\\", "/"))
|
|
return sorted(files)
|
|
|
|
|
|
def _glob_to_regex(pat: str) -> str:
|
|
"""Translate a posix-style glob with ``**`` to an anchored regex.
|
|
|
|
``**/`` matches zero or more directory levels (so ``src/**/*.py`` matches
|
|
both ``src/adapter.py`` and ``src/foo/adapter.py``). ``*`` and ``?`` do
|
|
NOT cross directory separators. Mirrors common ``.gitignore``-style
|
|
semantics; ``fnmatch.fnmatch`` alone cannot express this.
|
|
"""
|
|
out: list[str] = []
|
|
i = 0
|
|
n = len(pat)
|
|
while i < n:
|
|
if pat[i : i + 3] == "**/":
|
|
out.append("(?:.*/)?")
|
|
i += 3
|
|
elif pat[i : i + 2] == "**":
|
|
out.append(".*")
|
|
i += 2
|
|
elif pat[i] == "*":
|
|
out.append("[^/]*")
|
|
i += 1
|
|
elif pat[i] == "?":
|
|
out.append("[^/]")
|
|
i += 1
|
|
else:
|
|
out.append(re.escape(pat[i]))
|
|
i += 1
|
|
return "^" + "".join(out) + "$"
|
|
|
|
|
|
def _glob_match(path: str, patterns: list[str]) -> bool:
|
|
for pat in patterns:
|
|
if re.match(_glob_to_regex(pat), path):
|
|
return True
|
|
return False
|
|
|
|
|
|
def _content_match(file_path: Path, patterns: list[str]) -> list[str]:
|
|
if not patterns or not file_path.exists() or not file_path.is_file():
|
|
return []
|
|
try:
|
|
text = file_path.read_text(encoding="utf-8", errors="replace")
|
|
except OSError:
|
|
return []
|
|
hits = []
|
|
for pat in patterns:
|
|
try:
|
|
if re.search(pat, text):
|
|
hits.append(pat)
|
|
except re.error:
|
|
if pat in text:
|
|
hits.append(pat)
|
|
return hits
|
|
|
|
|
|
def check_entry(entry: dict, changed: list[str]) -> dict | None:
|
|
trig = entry.get("trigger") or {}
|
|
if trig.get("manual_evidence_required"):
|
|
return None
|
|
if entry.get("followup_issue"):
|
|
return None
|
|
file_patterns = trig.get("file_patterns") or []
|
|
content_patterns = trig.get("content_patterns") or []
|
|
if not file_patterns:
|
|
return None
|
|
matched_files = [p for p in changed if _glob_match(p, file_patterns)]
|
|
if not matched_files:
|
|
return None
|
|
if content_patterns:
|
|
hits: list[dict] = []
|
|
for mf in matched_files:
|
|
hit_patterns = _content_match(REPO_ROOT / mf, content_patterns)
|
|
if hit_patterns:
|
|
hits.append({"file": mf, "patterns": hit_patterns})
|
|
if not hits:
|
|
return None
|
|
match_info = {"files": [h["file"] for h in hits], "content_hits": hits}
|
|
else:
|
|
match_info = {"files": matched_files, "content_hits": []}
|
|
return {
|
|
"issue": entry.get("issue"),
|
|
"title": entry.get("title"),
|
|
"doc": entry.get("doc"),
|
|
"status": entry.get("status"),
|
|
"on_trigger": entry.get("on_trigger"),
|
|
"match": match_info,
|
|
}
|
|
|
|
|
|
def write_alerts(alerts: list[dict], path: Path = ALERT_OUT_PATH) -> None:
|
|
path.parent.mkdir(parents=True, exist_ok=True)
|
|
payload = {
|
|
"generated_at": datetime.now(timezone.utc).isoformat(),
|
|
"registry": str(REGISTRY_PATH.relative_to(REPO_ROOT)).replace("\\", "/"),
|
|
"alerts": alerts,
|
|
}
|
|
path.write_text(json.dumps(payload, indent=2, ensure_ascii=False), encoding="utf-8")
|
|
|
|
|
|
def main() -> int:
|
|
entries = load_registry()
|
|
changed = collect_changed_files()
|
|
alerts = [a for a in (check_entry(e, changed) for e in entries) if a]
|
|
write_alerts(alerts)
|
|
if alerts:
|
|
print(f"[dormant-trigger-guard] {len(alerts)} alert(s) written -> "
|
|
f"{ALERT_OUT_PATH.relative_to(REPO_ROOT)}")
|
|
for a in alerts:
|
|
print(f" - #{a['issue']} {a['title']} (files: {len(a['match']['files'])})")
|
|
else:
|
|
print("[dormant-trigger-guard] no dormant trigger alerts on current change surface.")
|
|
return 0
|
|
|
|
|
|
if __name__ == "__main__":
|
|
sys.exit(main())
|