From f3bff898fbe00b09d7f4271c5ddc6b376ed12f71 Mon Sep 17 00:00:00 2001 From: kyeongmin Date: Mon, 18 May 2026 18:56:06 +0900 Subject: [PATCH] feat(orchestrator): initial orchestrator + subprocess cleanup hardening MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Pre-existing P0+P1 fixes (verified via #45 pilot 2026-05-18): - P0-1: detect_agent first-line only (fixes #45 infinite loop) - P0-2: stage_start_count sanity reset on external comment delete - P0-3: 32 pytest cases for parse/detect regressions - P1-4: execution-issue mode prompt (compact scope-tight) - P1-5: Stage 2 COMPACT_PLAN_RULE (size budget, no code snippets) - P1-6: tests:[] orchestrator-level enforcement at Stage 2 YES guard - P1-7: dual-write CRLF/trailing-whitespace normalize P3 subprocess cleanup (PID 2780 orphan grandchild regression): - (pid, create_time) signature tracking — Windows PID reuse safe - _kill_process_tree: parent-alive traversal path - _kill_tracked: parent-dead orphan path - _run_with_tree_kill: 1s monitor thread captures descendants live - atexit + SIGINT safety net via _SPAWNED set - 4 subprocess.run sites switched to wrapper (compaction/exit_report/ run_claude/run_codex) - 12 cleanup pytest cases incl. C6 PID 2780 regression test Selenium boundary unchanged — driver.quit() in phase_z2_pipeline.py and slide_measurer.py already protected by try/finally. Total: 44/44 pytest pass (32 core + 12 cleanup). Co-Authored-By: Claude Opus 4.7 (1M context) --- orchestrator.py | 1480 +++++++++++++++++ tests/orchestrator_unit/__init__.py | 0 .../test_orchestrator_core.py | 247 +++ .../test_subprocess_cleanup.py | 264 +++ 4 files changed, 1991 insertions(+) create mode 100644 orchestrator.py create mode 100644 tests/orchestrator_unit/__init__.py create mode 100644 tests/orchestrator_unit/test_orchestrator_core.py create mode 100644 tests/orchestrator_unit/test_subprocess_cleanup.py diff --git a/orchestrator.py b/orchestrator.py new file mode 100644 index 0000000..1c12205 --- /dev/null +++ b/orchestrator.py @@ -0,0 +1,1480 @@ +#!/usr/bin/env python3 +""" + Orchestrator v6 — Exit Report Contract + Evidence-based Consensus + + 핵심: + 1. "Read ALL comments" 제거 → context pack 직접 주입 + 2. 완료 stage = canonical exit report (계약서) 사용 + 3. 현재 stage = stage_start 이후 comment만 + 4. 5라운드마다 mid-stage compaction + 5. exit report = Gitea + local 동시 저장 (1회 생성) + 6. FINAL_CONSENSUS: YES에 evidence block 필수 + 7. evidence 없는 YES → orchestrator 거부 + 8. context pack 크기 로그 +""" + +import subprocess, requests, time, sys, os, re, argparse, json, glob +import threading, atexit, signal +from pathlib import Path +from datetime import datetime +from urllib.parse import quote + +# P3-1 (2026-05-18) — subprocess cleanup hardening (PID 2780 orphan grandchild regression). +# psutil 은 환경에 이미 설치돼 있음 (Phase A 보완 검토 확인). +import psutil + +# ═══════════════════════════════════════════════════════════════ +# Config +# ═══════════════════════════════════════════════════════════════ + +GITEA_URL = os.environ.get("GITEA_URL", "https://gitea.hmac.kr") +GITEA_TOKEN = os.environ.get("GITEA_TOKEN", "266ad0d2cc7ba0be580387544cd099193fd9fc85") +GITEA_REPO = os.environ.get("GITEA_REPO", "Kyeongmin/C.E.L_Slide_test2") +PROJECT_DIR = os.environ.get("PROJECT_DIR", os.getcwd()) + +def _find_claude(): + env = os.environ.get("CLAUDE_EXE") + if env: return env + m = sorted(glob.glob(os.path.expanduser(r"~\.vscode\extensions\anthropic.claude-code-*\resources\native-binary\claude.exe"))) + return m[-1] if m else "claude" + +def _find_codex(): + env = os.environ.get("CODEX_CMD") + if env: return env + p = os.path.expanduser(r"~\AppData\Roaming\npm\codex.cmd") + return p if os.path.exists(p) else "codex" + +CLAUDE_EXE = _find_claude() +CODEX_CMD = _find_codex() +POLL_INTERVAL = 15 +AGENT_TIMEOUT = 1800 +COMPACT_EVERY = 5 + +ORCH_DIR = Path(PROJECT_DIR) / ".orchestrator" +ISSUES_DIR = ORCH_DIR / "issues" +TMP_DIR = ORCH_DIR / "tmp" +DRAFTS_DIR = ORCH_DIR / "drafts" # D-axis 2026-05-18 — agent ↔ orchestrator transport + +def ts(): return datetime.now().strftime("%H:%M:%S") +def log(msg): print(f" {ts()} | {msg}") +def header(msg): print(f"\n {'='*60}\n {msg}\n {'='*60}\n") +def divider(msg): print(f"\n {'─'*60}\n {msg}\n {'─'*60}") + +# ═══════════════════════════════════════════════════════════════ +# P3-1/2/3 (2026-05-18) — Subprocess Tree Cleanup +# PID + create_time 추적 → PID 재사용 회피. orphan grandchild (PID 2780 path) +# 까지 정리. Selenium driver.quit() 는 pipeline 책임 (orchestrator 미터치). +# ═══════════════════════════════════════════════════════════════ + +# 전역 추적 set — wrapper 가 정상 cleanup 후 discard. atexit 안전망용. +# 요소 = (pid, create_time_float). PID 재사용 시 create_time 으로 동일 프로세스 확인. +_SPAWNED = set() +_ORCH_PID = os.getpid() +_ORCH_CREATE = None # main() 에서 채움 + +def _proc_signature(p): + """psutil.Process → (pid, create_time) 또는 None (이미 사라짐).""" + try: + return (p.pid, p.create_time()) + except (psutil.NoSuchProcess, psutil.AccessDenied): + return None + +def _is_same_process(pid, create_time, tolerance=0.001): + """기록된 (pid, create_time) 가 현재 살아있는 동일 프로세스인지 확인. + create_time 일치 = PID 재사용 아님. tolerance 는 float 비교 안전 마진.""" + if pid <= 0 or pid == _ORCH_PID: + return False + try: + p = psutil.Process(pid) + return abs(p.create_time() - create_time) < tolerance and p.is_running() + except (psutil.NoSuchProcess, psutil.AccessDenied): + return False + +def _kill_process_tree(root_pid, timeout=5): + """Parent ALIVE path — psutil.children(recursive=True) traversal. + timeout/Ctrl+C 같이 부모가 아직 살아있을 때 사용.""" + if root_pid <= 0 or root_pid == _ORCH_PID: + return 0 + try: + root = psutil.Process(root_pid) + except psutil.NoSuchProcess: + return 0 + try: + tree = [root] + root.children(recursive=True) + except psutil.NoSuchProcess: + tree = [root] + for p in tree: + try: p.terminate() + except (psutil.NoSuchProcess, psutil.AccessDenied): pass + try: + _, alive = psutil.wait_procs(tree, timeout=timeout) + except Exception: + alive = tree + for p in alive: + try: p.kill() + except (psutil.NoSuchProcess, psutil.AccessDenied): pass + return len(tree) + +def _kill_tracked(sigs, timeout=5): + """Parent DEAD path — (pid, create_time) signature 리스트로 직접 정리. + PID 재사용 회피 위해 create_time 일치 확인. orphan grandchild path.""" + procs = [] + for pid, ct in sigs: + if not _is_same_process(pid, ct): + continue + try: + procs.append(psutil.Process(pid)) + except (psutil.NoSuchProcess, psutil.AccessDenied): + continue + if not procs: + return 0 + for p in procs: + try: p.terminate() + except (psutil.NoSuchProcess, psutil.AccessDenied): pass + try: + _, alive = psutil.wait_procs(procs, timeout=timeout) + except Exception: + alive = procs + for p in alive: + try: p.kill() + except (psutil.NoSuchProcess, psutil.AccessDenied): pass + return len(procs) + +def _run_with_tree_kill(cmd, *, input=None, timeout=None, **popen_kwargs): + """subprocess.run 의 트리 안전 버전. + - Popen 으로 띄움 + - 백그라운드 monitor thread 가 1초 주기로 descendant (pid, create_time) 수집 + - 정상 종료 path 와 timeout path 모두 finally 에서 tracked 정리 + - 반환: subprocess.CompletedProcess (기존 호출부 호환). + Timeout 은 raise subprocess.TimeoutExpired — 기존 except 호환. + """ + tracked = set() # (pid, create_time) tuples + stop_event = threading.Event() + + proc = subprocess.Popen( + cmd, + stdin=subprocess.PIPE if input is not None else None, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + **popen_kwargs, + ) + # 직접 child 도 signature 로 추적 (재사용 회피). + try: + root_psu = psutil.Process(proc.pid) + root_sig = _proc_signature(root_psu) + if root_sig: + _SPAWNED.add(root_sig) + except psutil.NoSuchProcess: + root_sig = None + + def _monitor(): + try: + root = psutil.Process(proc.pid) + except psutil.NoSuchProcess: + return + while not stop_event.is_set(): + try: + for child in root.children(recursive=True): + sig = _proc_signature(child) + if sig: + tracked.add(sig) + _SPAWNED.add(sig) + except (psutil.NoSuchProcess, psutil.AccessDenied): + break # parent died — monitor exits; finally takes over + stop_event.wait(timeout=1.0) + + mon = threading.Thread(target=_monitor, daemon=True) + mon.start() + + encode = isinstance(input, str) + inp = input.encode("utf-8") if encode else input + text_mode = popen_kwargs.get("text", False) or popen_kwargs.get("encoding") + + try: + stdout, stderr = proc.communicate(input=inp, timeout=timeout) + rc = proc.returncode + except subprocess.TimeoutExpired: + # Parent still alive here — full tree traversal kill first. + _kill_process_tree(proc.pid) + try: + stdout, stderr = proc.communicate() + except Exception: + stdout, stderr = b"", b"" + # TimeoutExpired 가 가진 partial output 보존을 위해 raise 직전 cleanup. + stop_event.set(); mon.join(timeout=2.0) + _kill_tracked(list(tracked)) + # _SPAWNED 정리 — 이번 호출에서 수집한 것만 discard (다른 호출 추적 보호). + for s in tracked: _SPAWNED.discard(s) + if root_sig: _SPAWNED.discard(root_sig) + raise + finally: + if not stop_event.is_set(): + stop_event.set() + mon.join(timeout=2.0) + # CRITICAL: tracked descendant 직접 정리 (parent 죽었어도 잡힘 — PID 2780 path). + _kill_tracked(list(tracked)) + # Safety net: proc 자체 어쩌다 살아있으면 마저. + if proc.poll() is None: + _kill_process_tree(proc.pid) + # _SPAWNED 정리. + for s in tracked: _SPAWNED.discard(s) + if root_sig: _SPAWNED.discard(root_sig) + + # text/encoding 처리 — Popen 은 bytes 로만 받고, 호출부의 encoding= 옵션 흉내. + enc = popen_kwargs.get("encoding") + errors = popen_kwargs.get("errors", "strict") + if enc: + try: stdout = stdout.decode(enc, errors) + except Exception: pass + try: stderr = stderr.decode(enc, errors) + except Exception: pass + elif text_mode: + try: stdout = stdout.decode("utf-8", "replace") + except Exception: pass + try: stderr = stderr.decode("utf-8", "replace") + except Exception: pass + + return subprocess.CompletedProcess(args=cmd, returncode=rc, stdout=stdout, stderr=stderr) + +def _orchestrator_exit_cleanup(): + """orchestrator 종료 시 마지막 안전망. _SPAWNED 에 남은 추적 PID 일괄 정리.""" + if not _SPAWNED: + return + cleaned = _kill_tracked(list(_SPAWNED)) + if cleaned: + try: log(f" exit cleanup: {cleaned} tracked subprocess tree(s) terminated") + except Exception: pass + _SPAWNED.clear() + +def _sigint_handler(sig, frame): + try: log(" SIGINT — running exit cleanup") + except Exception: pass + _orchestrator_exit_cleanup() + sys.exit(130) + +# ═══════════════════════════════════════════════════════════════ +# State +# ═══════════════════════════════════════════════════════════════ + +STATE_FILE = ORCH_DIR / "stage_state.json" + +def load_state(): + return json.loads(STATE_FILE.read_text(encoding="utf-8")) if STATE_FILE.exists() else {} + +def save_state(data): + STATE_FILE.parent.mkdir(parents=True, exist_ok=True) + STATE_FILE.write_text(json.dumps(data, indent=2, ensure_ascii=False), encoding="utf-8") + +def get_issue_state(n): return load_state().get(str(n), {"stage": "problem-review"}) + +def update_issue_state(n, **kw): + s = load_state(); s.setdefault(str(n), {"stage": "problem-review"}).update(kw); save_state(s) + +def clear_state(n=None): + if n: + s = load_state(); s.pop(str(n), None); save_state(s) + for f in ISSUES_DIR.glob(f"{n}_*"): f.unlink(missing_ok=True) + # D-axis 2026-05-18 — drafts 도 청소 (issue 별 stage/round 별 파일 모두) + if DRAFTS_DIR.exists(): + for f in DRAFTS_DIR.glob(f"{n}_*"): f.unlink(missing_ok=True) + else: + save_state({}) + if ISSUES_DIR.exists(): + for f in ISSUES_DIR.glob("*"): f.unlink(missing_ok=True) + if DRAFTS_DIR.exists(): + for f in DRAFTS_DIR.glob("*"): f.unlink(missing_ok=True) + +# ═══════════════════════════════════════════════════════════════ +# Exit Report / Compaction files +# ═══════════════════════════════════════════════════════════════ + +def _erp(n, sid): return ISSUES_DIR / f"{n}_stage_{sid}_exit.md" + +def save_exit_report(n, sid, txt): + ISSUES_DIR.mkdir(parents=True, exist_ok=True) + _erp(n, sid).write_text(txt, encoding="utf-8") + +def log_orchestrator_event(n, msg): + # Fix 9 (2026-05-17) — Phase A-3a: Category C noise → local log. + # exit-report / auto-escalate / rewind-announcement 은 Gitea POST 하지 않음. + # 진실 source = save_exit_report (local *_exit.md) + stage_state.json + failure_report_path. + # 사람 가시성 = 본 log file. agent context 는 영향 받지 않음. + p = ISSUES_DIR / f"{n}_orchestrator.log" + p.parent.mkdir(parents=True, exist_ok=True) + ts = datetime.now().strftime("%Y-%m-%d %H:%M:%S") + with p.open("a", encoding="utf-8") as f: + f.write(f"[{ts}] {msg}\n") + +def _atomic_replace(tmp_path, dest_path, max_retries=3, backoff=0.5): + """Atomic file replace with Windows transient retry. + Phase A-2 reported [WinError 5] from os.replace when AV / IDE / file watcher + briefly locks destination during write. Retry with linear backoff. + POSIX = first attempt succeeds. Windows = transient race may need 1-2 retries.""" + for attempt in range(max_retries): + try: + os.replace(tmp_path, dest_path) + return + except PermissionError: + if attempt == max_retries - 1: + raise + time.sleep(backoff * (attempt + 1)) + +# ═══════════════════════════════════════════════════════════════ +# D-axis 2026-05-18 — Agent Draft Files (transport refactor) +# ═══════════════════════════════════════════════════════════════ +# agent (Claude/Codex) writes comment body to draft file (NOT direct Gitea POST). +# orchestrator reads draft → validates → Gitea POST → injects next agent's context. +# Goal: accountability (POST 매 round 유지) + reliability (no agent POST lies). +# Path pattern: .orchestrator/drafts/_stage___r.md + +def _draft_path(n, sid, agent, rnd): + return DRAFTS_DIR / f"{n}_stage_{sid}_{agent}_r{rnd}.md" + +def _load_draft(n, sid, agent, rnd): + p = _draft_path(n, sid, agent, rnd) + if not p.exists(): return None + try: + return p.read_text(encoding="utf-8").strip() + except Exception: + return None + +def _save_draft(n, sid, agent, rnd, content): + p = _draft_path(n, sid, agent, rnd) + p.parent.mkdir(parents=True, exist_ok=True) + tmp = p.with_suffix(".md.tmp") + tmp.write_text(content, encoding="utf-8") + _atomic_replace(tmp, p) # atomic + Windows retry + +def _validate_draft(content, sid, agent): + """Validate draft body. Returns (ok: bool, errors: list[str]). + D-1 stub — only emptiness check. Full validation (FINAL_CONSENSUS / EVIDENCE + / IMPLEMENTATION_UNITS / Remaining units 필드) added in D-3.""" + if not content or not content.strip(): + return (False, ["draft empty"]) + return (True, []) + +def _collect_stage_drafts(n, sid, agent, rnd): + """D-4 (2026-05-18) — collect local drafts from current stage in chronological order. + Used by build_context_pack to inject local transcript instead of Gitea-fetched comments. + Order: claude r1 → codex r1 → claude r2 → codex r2 → ... + Stops at the current (agent, rnd) call — that draft doesn't exist yet. + Returns list of (round, agent, body_truncated_3000).""" + out = [] + for r in range(1, rnd + 1): + for ag in ("claude", "codex"): + if r == rnd and ag == agent: + return out # current call — its own draft not yet written + d = _load_draft(n, sid, ag, r) + if d: + out.append((r, ag, d[:3000])) + return out + +def _verify_dual_write(n, sid, agent, rnd, gitea_body): + """D-3 (2026-05-18) — log-only verification. + Compares the agent's local draft against the Gitea comment body it posted. + Does NOT fail-fast or interrupt the round — purely observational. + Goal: collect data on whether dual-write is reliable BEFORE D-5 cutover. + P1-7 (2026-05-18) — normalize before compare. CRLF/LF + trailing whitespace + on each line + final newline = cosmetic differences. We compare semantic content.""" + draft = _load_draft(n, sid, agent, rnd) + if draft is None: + log(f" ⚠️ dual-write SKIPPED: draft not found for {agent} r{rnd}") + return + def _norm(s): + if not s: return "" + # CRLF/CR → LF, strip trailing whitespace per line, strip overall + s = s.replace("\r\n", "\n").replace("\r", "\n") + return "\n".join(line.rstrip() for line in s.split("\n")).strip() + g = _norm(gitea_body) + d = _norm(draft) + if d == g: + log(f" ✅ dual-write OK: draft == gitea body ({len(d)} chars, normalized) for {agent} r{rnd}") + return + if len(d) == len(g): + diff_idx = next((i for i in range(len(d)) if d[i] != g[i]), -1) + log(f" ⚠️ dual-write MISMATCH (same len, diff content) for {agent} r{rnd}: first diff at index {diff_idx}") + else: + log(f" ⚠️ dual-write LEN-DIFF for {agent} r{rnd}: draft={len(d)} gitea={len(g)} (normalized)") + +def load_exit_report(n, sid): + p = _erp(n, sid) + return p.read_text(encoding="utf-8") if p.exists() else None + +def load_all_exit_reports(n, up_to): + parts = [] + for i in range(up_to): + r = load_exit_report(n, STAGES[i]["id"]) + if r: parts.append(f"=== {STAGES[i]['label']} Exit Report ===\n{r}") + return "\n\n".join(parts) or "(no prior reports)" + +def load_latest_compaction(n, sid): + files = sorted(ISSUES_DIR.glob(f"{n}_stage_{sid}_compact_r*.md")) + return files[-1].read_text(encoding="utf-8") if files else None + +# ═══════════════════════════════════════════════════════════════ +# Gitea API +# ═══════════════════════════════════════════════════════════════ + +def gitea(path, method="GET", data=None): + # Fix 6 (2026-05-17) — timeout 강제. 없으면 Gitea API 늦을 때 무한 hang + # (round=5 Codex OK 뒤 get_comments hang 사례 발생). + url = f"{GITEA_URL}/api/v1/repos/{GITEA_REPO}/{path}" + h = {"Authorization": f"token {GITEA_TOKEN}", "Content-Type": "application/json"} + r = getattr(requests, method.lower())( + url, headers=h, + json=data if method != "GET" else None, + timeout=(10, 30), # connect 10s / read 30s + ) + r.raise_for_status(); return r.json() + +def _comments_cache_path(n): + return ISSUES_DIR / f"{n}_comments_cache.json" + +def _load_comments_cache(n): + p = _comments_cache_path(n) + if not p.exists(): return [] + try: + data = json.loads(p.read_text(encoding="utf-8")) + return data if isinstance(data, list) else [] + except Exception: + return [] # corrupt cache → empty fallback + +def _save_comments_cache(n, comments): + p = _comments_cache_path(n) + p.parent.mkdir(parents=True, exist_ok=True) + tmp = p.with_suffix(".json.tmp") + tmp.write_text(json.dumps(comments, ensure_ascii=False), encoding="utf-8") + _atomic_replace(tmp, p) # atomic + Windows transient retry + +def get_comments(n): + # Fix 8 (2026-05-17) — Phase A-2: local cache + since= incremental fetch. + # Gitea API 가 page= 파라미터 무시 + 응답 100 cap. 매 호출 full fetch 하면: + # (a) 100 cap invisibility — 새 comment 가 응답 밖으로 밀려 invisible + # (b) wait_comment blind spot — 새 Codex comment 영원히 못 찾음 + # (c) trafic 낭비 — round 마다 800 KB + # 대응: cache 에 누적, since= 로 incremental fetch, id 기준 merge. + cache = _load_comments_cache(n) + since_ts = None + if cache: + # max(updated_at or created_at) — edit 까지 잡으려면 updated_at 우선 + def _ts(c): return c.get("updated_at") or c.get("created_at") or "" + since_ts = max((_ts(c) for c in cache), default=None) + path = f"issues/{n}/comments?limit=100" + if since_ts: + # since timestamp 안 `:` / `+` (timezone offset) 안전하게 인코딩 + path += f"&since={quote(since_ts, safe='')}" + try: + batch = gitea(path) + except Exception as e: + # network/transient — cache 그대로 반환 (안 보이는 새 comment 는 다음 poll 에) + return sorted(cache, key=lambda c: c.get("id", 0)) + # merge by id (newer wins for edits) + by_id = {c["id"]: c for c in cache if "id" in c} + for c in batch: + if "id" in c: + by_id[c["id"]] = c + merged = list(by_id.values()) + _save_comments_cache(n, merged) + return sorted(merged, key=lambda c: c["id"]) +def get_issue(n): return gitea(f"issues/{n}") +def get_open_issues(): return sorted(gitea("issues?state=open&limit=50"), key=lambda i: i["number"]) +def set_label(n, l): + try: gitea(f"issues/{n}/labels", "POST", {"labels": [l]}) + except: pass + +# ═══════════════════════════════════════════════════════════════ +# Consensus + Evidence +# ═══════════════════════════════════════════════════════════════ + +def has_consensus(body): + """마지막 줄이 정확히 FINAL_CONSENSUS: YES인지 확인 (legacy — parse_consensus 우선)""" + lines = body.strip().splitlines() + return lines[-1].strip() == "FINAL_CONSENSUS: YES" if lines else False + +# 2026-05-16 — rewind dispatcher 지원. agent 가 NO 시 rewind_target 명시 강제. +REWIND_TARGET_TO_SID = { + "retry_same": None, # 같은 stage 재시도 (technical_fail only — e.g. push network) + "continue_same": None, # 같은 stage round 계속 — stage 안 남은 planned unit 진행 (Stage 3 만). + "stage_1_review": "problem-review", + "stage_2_plan": "simulation-plan", + "stage_3_edit": "code-edit", + "stage_4_verify": "test-verify", + "stage_5_push": "commit-push", +} +VERIFY_STAGES = ("test-verify", "final-close") # retry_same / continue_same 금지 stage + +def parse_consensus(body): + """FINAL_CONSENSUS + rewind_target 파싱. + Returns (status, rewind_target) : + status : 'YES' | 'NO' | None + rewind_target : 'retry_same' | 'stage_1_review' | ... | 'stage_5_push' | None + NO 시 rewind_target 누락 = agent 에게 supplement 요청 (run_stage 처리).""" + if not body: + return (None, None) + status = None + target = None + for line in body.strip().splitlines()[-10:]: + sl = line.strip() + if sl == "FINAL_CONSENSUS: YES": + status = "YES" + elif sl == "FINAL_CONSENSUS: NO": + status = "NO" + m = re.match(r"^rewind_target:\s*(\S+)\s*$", sl, re.IGNORECASE) + if m: + t = m.group(1).lower() + if t in REWIND_TARGET_TO_SID: + target = t + return (status, target) + +def has_evidence(body, stage_id="problem-review"): + """=== EVIDENCE === 블록 + stage별 필수 필드 확인""" + lower = body.lower() + if "=== evidence ===" not in lower: + return False + base = ["commands run", "files checked"] + if stage_id in ("problem-review", "code-edit"): + required = base + elif stage_id == "simulation-plan": + required = base + ["test results"] + elif stage_id == "test-verify": + required = base + ["test results"] + elif stage_id == "commit-push": + required = base + ["commit"] + elif stage_id == "final-close": + required = base + ["verified facts"] + else: + required = base + return all(r in lower for r in required) + +def detect_agent(body): + # P0-1 (2026-05-18) — 첫 줄 (header) 만 검사. + # 이전: body 전체 검색 → Codex 가 evidence 안 [Claude #N] 인용 시 'claude' 오판 + # 결과: Stage 2 NO 가 "응답 미감지" 로 처리되어 rewind 누락 → 무한 round loop. + first = (body or "").lstrip().splitlines() + head = first[0] if first else "" + if re.match(r"\[Claude[\s#]", head): return "claude" + if re.match(r"\[Codex[\s#]", head): return "codex" + return None + +def parse_remaining_units(body): + """Codex evidence 의 'Remaining units:' 줄 parse → set of unit ids. + 2026-05-17 Fix 1 — continue_same_count 의 progress-based 검증. + 2026-05-17 Fix 4 — unit id 패턴 = u 만 매칭 (Stage 2 schema 와 일치). + Returns set | None (parse 불가 / 줄 없음). 빈 set = 완료 신호.""" + if not body: + return None + m = re.search(r"^\s*Remaining[\s_]*units?\s*:\s*(.*)$", body, re.IGNORECASE | re.MULTILINE) + if not m: + return None + raw = m.group(1).strip() + # explicit empty markers + raw_clean = raw.strip().lower() + if raw_clean in ("", "[]", "none", "(none)", "n/a", "-"): + return set() + # Stage 2 schema = id: u1 / u2 / ... → u 만 매칭. sentence noise (예: "remaining work") 무시. + ids = re.findall(r"\bu\d+\b", raw, re.IGNORECASE) + return set(i.lower() for i in ids) if ids else None + +# ═══════════════════════════════════════════════════════════════ +# Verification Failure → Rewind Classification +# ═══════════════════════════════════════════════════════════════ + +# classify_failure 제거 (2026-05-16) — agent 가 rewind_target 을 직접 명시하는 방식. +# 키워드 기반 자동 분류는 잘못 추정 위험 → CONSENSUS_RULE 에서 agent 가 strict 명시 강제. + +def save_failure_report(n, from_stage, target_stage, body): + """검증 실패 보고서 저장""" + ISSUES_DIR.mkdir(parents=True, exist_ok=True) + report = ( + f"[Verification Failure Report]\n" + f"Issue: #{n}\n" + f"Failed stage: {from_stage}\n" + f"Rewind to: {target_stage}\n" + f"Failure evidence:\n{body[:2000]}\n" + ) + path = ISSUES_DIR / f"{n}_stage_{from_stage}_failed.md" + path.write_text(report, encoding="utf-8") + return path + +# ═══════════════════════════════════════════════════════════════ +# Rules + Roles (compact) +# ═══════════════════════════════════════════════════════════════ + +RULES = """=== WORK PRINCIPLES === +RULE 0 — PIPELINE-CONSTRUCTION (overrides all) +Build GENERAL Phase Z pipeline, NOT sample-passing. Never hardcode MDX 03/04/05. +Evaluate against all 32 frames. Failure must be explainable. +RULE 1: English only. RULE 2: Auto pipeline. RULE 3: Status=3-axis. +RULE 4: Scope-qualified. pytest -q tests. COMMIT SCOPE only. +RULE 5: Factual: value+path+upstream. RULE 6: git add specific files only. +RULE 7: No hardcoding. RULE 8: AI finds 1px first. RULE 9: LLM classifies, code composes. +RULE 10: Don't uncritically accept. RULE 11: Checkpoint. RULE 12: Full paths. RULE 13: Anchor sync. +PZ-1: AI=0 normal. PZ-2: 1turn=1step. PZ-3: No speculative. PZ-4: No silent shrink. + +=== CONSENSUS + REWIND (2026-05-16 lock) === +Final line of every Codex review comment MUST be exactly one of: + FINAL_CONSENSUS: YES + FINAL_CONSENSUS: NO + +YES REQUIRES === EVIDENCE === block (commands run, files checked, tests/commit/verified facts as stage requires). +NO evidence = REJECTED. + +If NO, the comment MUST also include a line BEFORE FINAL_CONSENSUS: + rewind_target: +Allowed ENUM: + retry_same # technical_fail ONLY (push network/permission, hook reject). FORBIDDEN for test-verify / final-close. + continue_same # Stage 3 ONLY — current unit verified OK, but more planned units remain. Not a failure. + stage_1_review # rewind to problem-review (root cause / scope-lock wrong) + stage_2_plan # rewind to simulation-plan (plan wrong / missing files / tests) + stage_3_edit # rewind to code-edit (implementation incomplete / scope creep) + stage_4_verify # rewind to test-verify (commit ok but missed regression) + stage_5_push # rewind to commit-push (post-push remote anomaly — rare) + +stage_4_verify (test-verify) NO and stage_6 final-close NO: retry_same / continue_same FORBIDDEN. Must rewind to earlier stage. +Stage 3 (code-edit) unit progress: use continue_same per unit; FINAL_CONSENSUS: YES only when ALL implementation_units complete. + +TEMP / LOCAL ARTIFACTS: .orchestrator/tmp/ and .orchestrator/drafts/ only.""" + +C_ROLE = """Claude (analyzer+implementer). Don't blindly agree. Verify code. Think first. [Claude #N].""" +X_ROLE = """Codex (verifier). Verify EVERY claim. Use the verification level required by the CURRENT STAGE. +Do NOT run full pytest unless the stage task explicitly requires it. + +With FINAL_CONSENSUS: YES, include === EVIDENCE === block : + === EVIDENCE === + Commands run: (list) + Files checked: (list) + Test results: (if Stage 2/4) + Commit SHA: (if Stage 5) + Verified facts: (list, if Stage 6) + +With FINAL_CONSENSUS: NO, include rewind_target line BEFORE the consensus line : + rewind_target: stage_1_review | stage_2_plan | stage_3_edit | stage_4_verify | stage_5_push | retry_same | continue_same + FINAL_CONSENSUS: NO +Note: retry_same / continue_same FORBIDDEN for test-verify and final-close stages. +Stage 3 (code-edit): if the single executed unit is correct AND remaining_units is non-empty → continue_same. +Stage 3: if ALL implementation_units complete and verified → FINAL_CONSENSUS: YES. + +Stage 3 EVIDENCE block MUST include EXACTLY one of these lines (strict format): + Remaining units: [u2, u3, u4] (bracketed list when units remain) + Remaining units: none (when all units complete) + +[Codex #N].""" + +# D-axis 2026-05-18 (D-2 dual-write phase) — transition instruction. +# Injected into every agent context pack alongside the existing Gitea POST api hint. +# Goal: agents write their comment body to a local draft file IN ADDITION to (not instead of) +# the normal Gitea POST. Orchestrator will start consuming the drafts in D-3 (comparison), +# D-4 (next-agent context source), and D-5 (full cutover — agent POST forbidden). +# Removed at D-5. +DUAL_WRITE_INSTRUCTION = """=== ORCHESTRATOR DRAFT OUTPUT === +After posting your normal Gitea comment, save the same comment body to: + {draft_path} +Do not summarize or shorten the draft; it must mirror the Gitea comment body. +The Gitea comment remains required in this transition phase.""" + +# ═══════════════════════════════════════════════════════════════ +# Stages +# ═══════════════════════════════════════════════════════════════ + +STAGES = [ + {"id":"problem-review","label":"Stage 1: 문제 검토","tag":"stage:problem-review", + "c":"Identify root cause. Read issue body + related files. Verify assumptions. Draft scope-lock + guardrails.", + "x":"Verify root cause + scope-lock. grep/find. Flag missing files / wrong assumptions. NO pytest.", + "ef":"root_cause, key_files, scope_lock, out_of_scope, guardrails"}, + {"id":"simulation-plan","label":"Stage 2: 시뮬 기반 계획 수립","tag":"stage:simulation-plan", + "c":("Concrete plan covering EVERY axis/item explicitly mentioned in the issue body. " + "First enumerate ALL scoped axes/items from the issue body in a checklist/table. " + "Partial coverage is invalid. " + "For each axis/item include: expected before/after behavior, files to change, " + "per-file changes, tests to add/update, rollback plan, side effects / follow-up issue candidates.\n\n" + "Additionally, MUST include a structured implementation_units block (YAML) that Stage 3 will execute one unit per turn:\n" + " === IMPLEMENTATION_UNITS ===\n" + " - id: u1\n summary: \n files: [, ...]\n tests: [, ...]\n estimate_lines: \n" + " - id: u2\n ...\n" + "Each unit MUST be atomic — estimate_lines ≤ 50 AND files ≤ 3. If larger, SPLIT into multiple units."), + "x":("Verify plan completeness — every axis/item from the issue body covered with full per-axis details. " + "Partial coverage = NO. Missing files? Tests? Rollback? " + "Run baseline pytest -q tests. Side effects? Cross-check against issue body axes. " + "Verify implementation_units block exists AND each unit ≤ 50 lines / ≤ 3 files. Oversized unit = NO with rewind_target: stage_2_plan."), + "ef":"enumerated_axes, files, per_file_changes, test_plan, rollback, baseline_tests, follow_up_candidates, implementation_units"}, + {"id":"code-edit","label":"Stage 3: 코드 수정 / 이슈 분기","tag":"stage:code-edit", + "c":("Implement exactly ONE implementation_unit from the Stage 2 exit report per turn. " + "Do NOT implement multiple units in one turn. " + "FIRST LINE of your Gitea comment: 'Executing unit: '. " + "After editing the files for that single unit, POST a Gitea comment with: " + "unit_executed (id), files_changed (list), diff_summary, remaining_units (list of remaining unit ids — from Stage 2 plan minus units already executed), " + "follow_up_issue_candidates (if scope-lock 외 axis 발견). Then STOP. " + "DO NOT commit or push. The Gitea comment IS the deliverable — stdout is not."), + "x":("Verify only the SINGLE unit executed in this turn. " + "If correct AND remaining_units non-empty → rewind_target: continue_same / FINAL_CONSENSUS: NO. " + "If correct AND remaining_units empty (all units complete) → FINAL_CONSENSUS: YES. " + "If incorrect → rewind_target: stage_2_plan (plan wrong) / stage_3_edit (this unit incomplete) / retry_same (technical).\n\n" + "MANDATORY EVIDENCE LINE FORMAT — include EXACTLY one of:\n" + " Remaining units: [u2, u3, u4] (bracketed list when units remain)\n" + " Remaining units: none (when all units complete)\n" + "Free-form sentences mixing other words on this line will fail orchestrator parse."), + "ef":"unit_executed, files_changed, diff_summary, remaining_units, follow_up_issues_drafted"}, + {"id":"test-verify","label":"Stage 4: 테스트 및 검증","tag":"stage:test-verify", + "c":"Run targeted tests + pytest. Verify diff matches plan. Check hardcoding. Regression check. Decide PASS / rewind.", + "x":"Independent test re-run + diff verify. PASS = commit OK signal. FAIL = rewind_target required (no retry_same).", + "ef":"tests_run, test_results, regression_check, diff_summary, pass_decision"}, + {"id":"commit-push","label":"Stage 5: 커밋 및 푸쉬","tag":"stage:commit-push", + "c":"git add SPECIFIC files only. git diff --staged. Commit per plan message. Push. Verify remote.", + "x":"Verify commit_sha on origin. Unintended files in commit? Push success? Remote reflects.", + "ef":"commit_sha, push_result, staged_files, remote_verification"}, + {"id":"final-close","label":"Stage 6: 최종 확인 / close","tag":"stage:final-close", + "c":"Re-read issue body. Verify commit on origin. Goal vs result. Follow-up links. Labels. Close decision.", + "x":"Final independent verify. PASS = close OK signal. FAIL = rewind_target required (no retry_same).", + "ef":"goal_vs_result, commit_evidence, follow_ups, close_status"}, +] +STAGE_IDS = [s["id"] for s in STAGES] + +# ═══════════════════════════════════════════════════════════════ +# Context Pack +# ═══════════════════════════════════════════════════════════════ + +def _is_execution_issue(title): + """P1-4 (2026-05-18) — title 에 '실행-N' 또는 '[IMP-NN 실행-N]' 패턴 있으면 execution sub-issue. + Decomposition 의 child issue 는 parent 가 이미 분석/계획한 작은 axis 만 처리. + Stage 1/2 가 짧고 compact 해야 함 (full design issue 처럼 처리 X).""" + if not title: return False + return bool(re.search(r"\b실행[-\s]\d+\b", title)) or bool(re.search(r"\bexec[-\s]?\d+\b", title, re.IGNORECASE)) + +# P1-5 (2026-05-18) — Stage 2 compact rule (모든 issue 적용). +# Stage 2 의 c-role 에 size budget + code snippet 금지 명시. 29 KB plan 차단. +COMPACT_PLAN_RULE = """ + +COMPACT PLAN REQUIREMENTS (strict): +- Total Stage 2 plan body MUST be ≤ 5,000 chars (4,000 chars target). +- NO code snippets in this comment. Code goes in Stage 3 (code-edit), not Stage 2 plan. + References to file:line locations are fine. Inline code blocks are forbidden. +- The Stage 2 plan body MUST contain ONLY: + a) === IMPLEMENTATION_UNITS === YAML block (units with id/summary/files/tests/estimate_lines) + b) Brief per-unit rationale (≤ 3 lines per unit, no full code) + c) Out-of-scope notes + d) Rollback strategy (1-2 lines) + e) === EVIDENCE === block + f) FINAL_CONSENSUS marker (if you are confident; else expect Codex review) +- Long analysis / rationale / code samples → write to a local file (.orchestrator/drafts/) and reference path, do NOT inline.""" + +# P1-4 (2026-05-18) — Execution-issue Stage 1/2 prompts (parent body 이미 분석 / 계획됨). +EXECUTION_ISSUE_NOTE = """ + +EXECUTION-ISSUE MODE (this issue title contains '실행-N' or 'exec-N'): +- This is a child execution issue. The PARENT issue already analyzed scope/plan. +- DO NOT re-derive root cause from scratch. Trust the issue body's scope + acceptance criteria. +- Stage 1 (problem-review): confirm scope-lock matches issue body. ≤ 2,500 chars. +- Stage 2 (simulation-plan): produce IMPLEMENTATION_UNITS YAML only. ≤ 3,500 chars. + Do NOT enumerate parent's axes; focus on THIS issue's single axis. +- Skip deep architectural analysis already done in the parent.""" + + +def build_context_pack(n, title, body, sid, agent, rnd, start_cnt, compact=None): + idx = STAGE_IDS.index(sid); si = STAGES[idx] + role = C_ROLE if agent == "claude" else X_ROLE + task = si["c"] if agent == "claude" else si["x"] + prior = load_all_exit_reports(n, idx) + + # P1-4/P1-5 (2026-05-18) — execution-issue + Stage 2 compact rule + extras = [] + if sid == "simulation-plan": + extras.append(COMPACT_PLAN_RULE) + if _is_execution_issue(title): + extras.append(EXECUTION_ISSUE_NOTE) + extras_text = "".join(extras) + + # 검증 실패 보고서 (rewind 시 이전 실패 맥락 전달). + # 2026-05-16 — issue state 의 failure_report_path 를 source-of-truth 로. + # 모든 stage NO (test-verify/final-close 뿐 아니라 code-edit 등) 의 from_stage 캐치. + failure_ctx = "" + ist_fc = get_issue_state(n) + fr_path_str = ist_fc.get("failure_report_path") + if fr_path_str: + fail_path = Path(fr_path_str) + if fail_path.exists(): + from_sid = ist_fc.get("failure_from_stage", "?") + failure_ctx = ( + f"\n\n=== REWIND: FAILURE REPORT (from {from_sid}) ===\n" + f"{fail_path.read_text(encoding='utf-8')[:1500]}\n" + f"Fix the issues above before re-attempting.\n" + ) + + # D-4 (2026-05-18) — local draft transcript with Gitea fallback. + # 1. 우선 local drafts 수집 (현재 stage, 현재 호출 이전까지) + # 2. drafts 존재 → local transcript 사용 (속도 + outage 무관) + # 3. drafts 비어있음 (D-2 prompt 무시 / 첫 round 등) → 기존 Gitea path fallback + drafts = _collect_stage_drafts(n, sid, agent, rnd) + if drafts: + # local draft path — limit to last N entries (mirror existing recent[-8:] semantic) + window = COMPACT_EVERY * 2 if compact else 8 + recent_drafts = drafts[-window:] + c_text = "\n---\n".join([f"[{ag} r{r}] {body}" for r, ag, body in recent_drafts]) + else: + # fallback — original Gitea-based recent comments (기존 흐름 그대로) + all_c = get_comments(n) + stage_c = all_c[start_cnt:] + if compact: + recent = stage_c[-(COMPACT_EVERY*2):] + else: + recent = stage_c[-8:] + c_text = "\n---\n".join([ + f"[{detect_agent(c['body']) or '?'}] {c['body'][:3000]}" for c in recent + ]) or "(none)" + + api = f"POST comment: {GITEA_URL}/api/v1/repos/{GITEA_REPO}/issues/{n}/comments | token $GITEA_TOKEN" + + # D-axis 2026-05-18 (D-2 dual-write) — draft path for this (agent, round). + # Agent must write the same comment body to this path AND POST to Gitea (existing flow). + draft_path = _draft_path(n, sid, agent, rnd) + dual_write = DUAL_WRITE_INSTRUCTION.format(draft_path=str(draft_path)) + + pack = ( + f"ISSUE #{n}: {title}\nURL: {GITEA_URL}/{GITEA_REPO}/issues/{n}\n\n" + f"=== ISSUE BODY ===\n{body}\n\n" + f"=== COMPLETED STAGE EXIT REPORTS (binding contracts) ===\n{prior}\n\n" + f"{failure_ctx}" + f"=== CURRENT: {si['label']} Round #{rnd} ===\nTask: {task}{extras_text}\n\n" + f"{('=== MID-STAGE COMPACTION ==='+chr(10)+compact+chr(10)*2) if compact else ''}" + f"=== RECENT COMMENTS (current stage) ===\n{c_text}\n\n" + f"DO NOT read all Gitea comments. Exit reports are binding contracts.\n\n" + f"{RULES}\n{role}\n{api}\n\n{dual_write}\n" + ) + log(f" context pack: {len(pack):,} chars") + return pack + +# ═══════════════════════════════════════════════════════════════ +# Compaction / Exit Report +# ═══════════════════════════════════════════════════════════════ + +def generate_compaction(n, sid, comments, rnd): + text = "\n---\n".join([f"[{detect_agent(c['body']) or '?'}] {c['body'][:2000]}" for c in comments]) + prompt = f"Summarize this discussion (under 500 words). Agreed, rejected, open, evidence.\n\n{text}" + try: + # P3-1 — _run_with_tree_kill: parent/grandchild cleanup 보장. + r = _run_with_tree_kill( + [CLAUDE_EXE, "-p", "--dangerously-skip-permissions", prompt], + encoding="utf-8", timeout=300, cwd=PROJECT_DIR) + if r.returncode == 0 and r.stdout and r.stdout.strip(): + p = ISSUES_DIR / f"{n}_stage_{sid}_compact_r{rnd}.md" + p.parent.mkdir(parents=True, exist_ok=True) + p.write_text(r.stdout.strip(), encoding="utf-8") + return r.stdout.strip() + except: pass + return None + +def generate_and_post_exit_report(n, sid): + si = STAGES[STAGE_IDS.index(sid)] + comments = get_comments(n) + codex_yes = claude_last = None + for c in reversed(comments): + a = detect_agent(c["body"]) + if a == "codex" and has_consensus(c["body"]) and not codex_yes: codex_yes = c + if a == "claude" and not claude_last: claude_last = c + if codex_yes and claude_last: break + + basis = "" + if claude_last: basis += f"[Claude]\n{claude_last['body'][:2000]}\n\n" + if codex_yes: basis += f"[Codex YES]\n{codex_yes['body'][:2000]}\n" + + # Fix 2 (2026-05-17) — Stage 2 의 IMPLEMENTATION_UNITS YAML block 은 2000 char + # truncation 에 잘릴 수 있음. 별도 추출해서 *항상* prompt 에 verbatim 포함. + # 2026-05-17 (Codex 추가 fix B) — 검색 범위 = current stage comments (state.stage_start_count + # 이후). YES guard 와 기준 통일. 이전 stage / round 의 stale block 회수 방지. + iu_block = "" + if sid == "simulation-plan": + iu_pat_re = re.compile(r"===\s*IMPLEMENTATION_UNITS\s*===\s*\n(.*?)(?=\n===\s|\Z)", + re.DOTALL | re.IGNORECASE) + # current stage comments 범위 — state 의 stage_start_count source-of-truth. + ist_g = get_issue_state(n) + sc = ist_g.get("stage_start_count") + scope = comments[sc:] if isinstance(sc, int) else comments[-10:] # fallback + # 최신 match 우선 — reverse iteration. 가장 최근 IU block 채택. + for src_comment in reversed(scope): + full = src_comment.get("body", "") + m = iu_pat_re.search(full) + if m: + iu_block = m.group(0).strip() + break + + # Fix 3 (2026-05-17) — Stage 2 의 exit report 는 implementation_units YAML block 을 + # *verbatim* 보존해야 Stage 3 가 unit-per-turn 으로 동작 가능. summary 금지. + stage2_extra = "" + if sid == "simulation-plan": + stage2_extra = ( + "\nCRITICAL — preserve the === IMPLEMENTATION_UNITS === YAML block VERBATIM " + "from the agreed Claude/Codex comments. Do NOT summarize, paraphrase, or " + "compress that block. Stage 3 will parse it unit-by-unit.\n" + ) + if iu_block: + stage2_extra += ( + f"\nReference (use this exact block verbatim in the exit report) :\n" + f"{iu_block}\n" + ) + prompt = ( + f"Generate EXIT REPORT for {si['label']} issue #{n}.\n" + f"Format:\n" + f"📌 **[오케스트레이터] {si['label']} 완료**\n" + f"■ 핵심 결정 (Korean 3-5줄)\n■ 범위 제외\n■ 다음 단계\n\n" + f"=== EXIT REPORT (English, binding contract) ===\n" + f"Fields: {si['ef']}\n" + f"Include: unresolved_questions, guardrails, evidence, source_comment_ids, commit_sha\n" + f"{stage2_extra}\n" + f"=== BASIS ===\n{basis}\n" + f"Under 600 words for non-block prose (the IMPLEMENTATION_UNITS YAML block does NOT count). Facts only.\n" + ) + log(" Exit report 생성...") + try: + # P3-1 — tree-safe subprocess. + r = _run_with_tree_kill( + [CLAUDE_EXE, "-p", "--dangerously-skip-permissions", prompt], + encoding="utf-8", timeout=300, cwd=PROJECT_DIR) + if r.returncode == 0 and r.stdout and r.stdout.strip(): + report = r.stdout.strip() + save_exit_report(n, sid, report) + # Fix 9 (Phase A-3a) — Gitea POST 제거. local *_exit.md 가 binding contract. + log_orchestrator_event(n, f"exit report saved: stage={sid} ({len(report)} chars)") + log(f" Exit report 완료") + return report + except Exception as e: + log(f" (exit report failed: {e})") + fb = f"📌 **[오케스트레이터]** {si['label']} 완료\n\n{basis[:1000]}" + # Codex last fix (2026-05-17) — Stage 2 fallback 도 IU block 포함. + # exit report Claude 생성 실패 시 fallback path 진입 → Stage 3 binding contract 에 + # IU block 누락 = unit-per-turn 깨짐. 정상 path 와 동일하게 iu_block 보존. + if sid == "simulation-plan" and iu_block: + fb += f"\n\n{iu_block}\n" + save_exit_report(n, sid, fb) + # Fix 9 (Phase A-3a) — fallback path. local *_exit.md 가 truth. Gitea POST 제거. + log_orchestrator_event(n, f"exit report saved (fallback): stage={sid} ({len(fb)} chars)") + return fb + +# ═══════════════════════════════════════════════════════════════ +# Agents +# ═══════════════════════════════════════════════════════════════ + +def _save_agent_stdout(agent, stdout, stderr): + """2026-05-17 stdout 캡처 — 진단 도구. + Claude 가 stdout 으로 답하고 Gitea POST 안 한 case 의 원인 분석용. + file = .orchestrator/tmp/{agent}_last_stdout.txt / _last_stderr.txt. + bytes / str 둘 다 받음 (TimeoutExpired 는 bytes).""" + def _norm(x): + if x is None: return "" + if isinstance(x, bytes): + try: return x.decode("utf-8", "replace") + except: return repr(x) + return str(x) + TMP_DIR.mkdir(parents=True, exist_ok=True) + try: + (TMP_DIR / f"{agent}_last_stdout.txt").write_text(_norm(stdout), encoding="utf-8") + (TMP_DIR / f"{agent}_last_stderr.txt").write_text(_norm(stderr), encoding="utf-8") + except Exception as e: + log(f" (stdout capture failed: {e})") + +def run_claude(prompt): + log(" Claude...") + # Fix 5 (2026-05-17) — Windows CreateProcess command-line limit (≈32,767 chars). + # context_pack 이 35 KB+ 일 때 [WinError 206] 파일 이름이나 확장명이 너무 깁니다. + # prompt 를 stdin 으로 전달 → arg length 제한 무관. + # P3-1 (2026-05-18) — _run_with_tree_kill 로 변경. orphan grandchild 정리 보장. + try: + r = _run_with_tree_kill( + [CLAUDE_EXE, "-p", "--dangerously-skip-permissions"], + input=prompt, + encoding="utf-8", errors="replace", + timeout=AGENT_TIMEOUT, cwd=PROJECT_DIR, + ) + _save_agent_stdout("claude", r.stdout, r.stderr) + if r.returncode != 0: + log(f" Claude FAILED: returncode={r.returncode}") + if r.stderr: log(f" stderr: {r.stderr[-500:]}") + if r.stdout: log(f" stdout(tail): {r.stdout[-500:]}") + return False + # 성공 시에도 stdout tail 출력 — POST 명령 시도 흔적 확인용. + if r.stdout: + lines = r.stdout.strip().splitlines() + log(f" Claude OK: {len(lines)} lines, {len(r.stdout):,} chars") + log(f" stdout(tail): {r.stdout.strip()[-300:]}") + return True + except subprocess.TimeoutExpired as e: + # Fix 4 (2026-05-17) — partial stdout/stderr 저장. 진단 데이터 보존. + _save_agent_stdout("claude", e.stdout, e.stderr) + partial = len(e.stdout) if e.stdout else 0 + log(f" Claude TIMEOUT ({AGENT_TIMEOUT}s) — partial stdout {partial} bytes saved") + return False + except Exception as e: + log(f" Claude EXCEPTION: {type(e).__name__}: {e}") + return False + +def run_codex(prompt): + log(" Codex...") + pf = TMP_DIR / "codex_prompt.txt" + pf.parent.mkdir(parents=True, exist_ok=True) + pf.write_text(prompt, encoding="utf-8") + # P3-1 (2026-05-18) — _run_with_tree_kill 로 변경. Codex CLI 가 띄우는 grandchild 정리. + try: + r = _run_with_tree_kill( + [CODEX_CMD, "exec", "--sandbox", "danger-full-access", + f"Read the file {pf} and follow the instructions inside it exactly."], + encoding="utf-8", errors="replace", + timeout=AGENT_TIMEOUT, cwd=PROJECT_DIR, + ) + _save_agent_stdout("codex", r.stdout, r.stderr) + if r.returncode != 0: + log(f" Codex FAILED: returncode={r.returncode}") + if r.stderr: log(f" stderr: {r.stderr[-500:]}") + if r.stdout: log(f" stdout(tail): {r.stdout[-500:]}") + return False + if r.stdout: + lines = r.stdout.strip().splitlines() + log(f" Codex OK: {len(lines)} lines, {len(r.stdout):,} chars") + log(f" stdout(tail): {r.stdout.strip()[-300:]}") + return True + except subprocess.TimeoutExpired as e: + # Fix 4 (2026-05-17) — partial stdout/stderr 저장. + _save_agent_stdout("codex", e.stdout, e.stderr) + partial = len(e.stdout) if e.stdout else 0 + log(f" Codex TIMEOUT ({AGENT_TIMEOUT}s) — partial stdout {partial} bytes saved") + return False + except Exception as e: + log(f" Codex EXCEPTION: {type(e).__name__}: {e}") + return False + +def wait_comment(n, prev, timeout=1800): + # Fix 6 (2026-05-17) — gitea API transient timeout/error 시 polling 유지. + # 이전: get_comments 가 raise → wait_comment 가 crash → main 종료. + elapsed = 0 + while elapsed < timeout: + time.sleep(POLL_INTERVAL); elapsed += POLL_INTERVAL + try: + cs = get_comments(n) + except Exception as e: + log(f" ⚠️ get_comments error: {type(e).__name__}: {e} — retry next poll") + continue + if len(cs) > prev: return cs + if elapsed % 60 == 0: log(f" ... {elapsed}s") + return None + +# ═══════════════════════════════════════════════════════════════ +# Stage Runner +# ═══════════════════════════════════════════════════════════════ + +def run_stage(n, title, body, sid): + si = STAGES[STAGE_IDS.index(sid)] + header(f"#{n}: {title}\n {si['label']}") + set_label(n, si["tag"]) + + # 재시작 복구: stage_start_stage가 현재 stage와 일치할 때만 재사용 + # P0-2 (2026-05-18) — slicing sanity. 외부에서 comment 삭제됐을 때 start_cnt > 실제 count + # 상태 가능 → comments[start_cnt:] = 빈 slice → "현재 stage comment 없음" 으로 오판. + # 대응: 실제 comment count 와 비교해서 stale 이면 재산정. + ist = get_issue_state(n) + if (ist.get("stage") == sid + and ist.get("stage_start_stage") == sid + and ist.get("stage_start_count") is not None): + start_cnt = ist["stage_start_count"] + actual = len(get_comments(n)) + if start_cnt > actual: + log(f" ⚠️ stage_start_count={start_cnt} > actual comments={actual} — 외부 삭제 감지. resetting to {actual}.") + start_cnt = actual + update_issue_state(n, stage_start_count=start_cnt) + else: + log(f" (resumed: stage_start_count={start_cnt})") + else: + comments = get_comments(n) + start_cnt = len(comments) + update_issue_state(n, stage=sid, stage_start_stage=sid, stage_start_count=start_cnt) + + cr = xr = 0 + compact = load_latest_compaction(n, sid) + if compact: + log(f" (loaded compaction: {len(compact):,} chars)") + backoff = 30 # exponential: 30→60→120→300 cap + + while True: + comments = get_comments(n); count = len(comments) + if get_issue(n)["state"] == "closed": log("Closed externally"); return False + + rnd = cr + 1 + log(f" round={rnd} stage_comments={count - start_cnt}") + + # Mid-stage compaction + if rnd > 1 and (rnd - 1) % COMPACT_EVERY == 0: + log(" Compaction...") + compact = generate_compaction(n, sid, comments[start_cnt:], rnd) + if compact: log(f" Compacted: {len(compact):,} chars") + + # Claude + cr += 1; divider(f"{si['label']} — Claude #{cr}") + p = build_context_pack(n, title, body, sid, "claude", cr, start_cnt, compact) + if not run_claude(p): + log(f" retry in {backoff}s"); time.sleep(backoff); backoff = min(backoff * 2, 300); continue + backoff = 30 + updated = wait_comment(n, count) + if not updated: + log(f" no comment, retry in {backoff}s"); time.sleep(backoff); backoff = min(backoff * 2, 300); continue + backoff = 30 + comments = updated; count = len(comments) + # D-3 (2026-05-18) — log-only dual-write verification. Does NOT interrupt flow. + _verify_dual_write(n, sid, "claude", cr, comments[-1].get("body", "")) + + # Codex + xr += 1; divider(f"{si['label']} — Codex #{xr}") + p = build_context_pack(n, title, body, sid, "codex", xr, start_cnt, compact) + if not run_codex(p): + log(f" retry in {backoff}s"); time.sleep(backoff); backoff = min(backoff * 2, 300); continue + backoff = 30 + updated = wait_comment(n, count) + if not updated: + log(f" no comment, retry in {backoff}s"); time.sleep(backoff); backoff = min(backoff * 2, 300); continue + backoff = 30 + comments = updated + # D-3 (2026-05-18) — log-only dual-write verification. Does NOT interrupt flow. + _verify_dual_write(n, sid, "codex", xr, comments[-1].get("body", "")) + + # Consensus + Evidence check (2026-05-16 rewind dispatcher) + last = comments[-1]["body"] + is_codex = detect_agent(last) == "codex" + if not is_codex: + log(" Codex 응답 미감지 — continuing") + continue + + status, target = parse_consensus(last) + + # YES 처리 — evidence 검증 + if status == "YES": + if has_evidence(last, sid): + # Fix 1 (2026-05-17 A안) — Stage 3 YES 는 Remaining units: none 강제. + # remaining_units 가 비어있어야 모든 unit 완료. non-empty/parse-fail YES = 모순. + if sid == "code-edit": + cur_remaining_yes = parse_remaining_units(last) + if cur_remaining_yes is None: + log("⚠️ Stage 3 YES but Remaining units line missing — supplement requested") + try: gitea(f"issues/{n}/comments", "POST", {"body": + "⚠️ **[Orchestrator]** Stage 3 FINAL_CONSENSUS: YES requires a parseable line in the EVIDENCE block:\n\n" + " Remaining units: none (when all implementation_units complete)\n\n" + "Without this, orchestrator cannot verify all units were executed."}) + except: pass + continue + if cur_remaining_yes: + log(f"⚠️ Stage 3 YES but Remaining units non-empty ({sorted(cur_remaining_yes)}) — contradiction") + try: gitea(f"issues/{n}/comments", "POST", {"body": + f"⚠️ **[Orchestrator]** Contradiction: FINAL_CONSENSUS: YES but Remaining units: {sorted(cur_remaining_yes)}.\n\n" + "If units remain → rewind_target: continue_same / FINAL_CONSENSUS: NO.\n" + "If all complete → `Remaining units: none`."}) + except: pass + continue + + # Fix 2 (2026-05-17 A안) — Stage 2 YES 는 IMPLEMENTATION_UNITS YAML block 존재 강제. + # Stage 3 의 unit-per-turn 동작은 이 block parse 에 의존. + # 2026-05-17 (Codex 추가 fix A) — 검색 범위 = current stage comments 만 + # (comments[start_cnt:]). 이전 round / stage 의 stale block 으로 통과 방지. + # 2026-05-17 (Codex 추가 fix B) — 헤더만으로는 부족. block body 안에 + # 최소 1 개의 `- id: u\d+` entry 가 있어야 통과. 빈 block silent pass 차단. + if sid == "simulation-plan": + iu_block_pat = re.compile( + r"===\s*IMPLEMENTATION_UNITS\s*===\s*\n(.*?)(?=\n===\s|\Z)", + re.IGNORECASE | re.DOTALL, + ) + iu_unit_pat = re.compile(r"^\s*-\s*id:\s*u\d+", re.IGNORECASE | re.MULTILINE) + # P1-6 (2026-05-18) — tests:[] 단위 금지 직접 강제. + # #45 Codex #2 가 catch 한 violation 을 orchestrator 가 *Codex 가기 전에* 차단. + # 패턴: 'tests: []' 또는 'tests:[]' (whitespace 변형 포함) + iu_tests_empty_pat = re.compile( + r"^\s*tests\s*:\s*\[\s*\]\s*$", re.IGNORECASE | re.MULTILINE) + def _iu_valid(text): + m = iu_block_pat.search(text or "") + if not m: return (False, "block missing") + block_body = m.group(1) + if not iu_unit_pat.search(block_body): + return (False, "no `- id: u` entry") + if iu_tests_empty_pat.search(block_body): + return (False, "unit with `tests: []` (forbidden — implementation + tests = same unit)") + return (True, "") + ok, reason = _iu_valid(last) + if not ok: + # current stage 의 comments 만 검색 (start_cnt 이후) + for c in comments[start_cnt:]: + ok2, _ = _iu_valid(c.get("body", "")) + if ok2: + ok = True; break + if not ok: + log(f"⚠️ Stage 2 YES but IMPLEMENTATION_UNITS invalid ({reason}) — supplement requested") + try: gitea(f"issues/{n}/comments", "POST", {"body": + f"⚠️ **[Orchestrator]** Stage 2 FINAL_CONSENSUS: YES rejected: {reason}.\n\n" + "Requirements (strict):\n" + "- `=== IMPLEMENTATION_UNITS ===` block with at least one `- id: u` entry\n" + "- Each unit MUST include `tests: [, ...]` (NOT `tests: []`)\n" + "- Implementation + tests = same unit (no deferring tests to later units)\n\n" + "Example:\n" + " === IMPLEMENTATION_UNITS ===\n" + " - id: u1\n summary: ...\n files: [...]\n tests: [tests/.../test_xxx.py]\n estimate_lines: \n"}) + except: pass + continue + + log(f"✅ {si['label']} — YES (evidence verified)") + # stage 완료 = unit counter + remaining tracker 모두 reset + update_issue_state(n, continue_same_count=0, last_remaining_units=None) + return True + else: + log("⚠️ YES without sufficient evidence — supplement requested") + try: gitea(f"issues/{n}/comments", "POST", {"body": + "⚠️ **[Orchestrator]** FINAL_CONSENSUS: YES was not accepted yet.\n\n" + "Reason: The comment did not include the required evidence block for this stage.\n\n" + "Please supplement:\n" + "- === EVIDENCE === block header\n" + "- Commands run\n" + "- Files checked\n" + "- Test results (if Stage 2/4)\n" + "- Commit SHA (if Stage 5)\n" + "- Verified facts (if Stage 6)\n\n" + "The stage remains open and will continue."}) + except: pass + continue + + # NO 처리 — rewind dispatcher + if status == "NO": + # (a) NO 도 evidence 필요 (Codex fix #2 — RULE: NO evidence = REJECTED) + if not has_evidence(last, sid): + log("⚠️ NO without sufficient evidence — supplement requested") + try: gitea(f"issues/{n}/comments", "POST", {"body": + "⚠️ **[Orchestrator]** FINAL_CONSENSUS: NO also requires an === EVIDENCE === block.\n\n" + "Please supplement evidence (commands run, files checked, tests/commit/verified facts as stage requires) " + "BEFORE the rewind_target line."}) + except: pass + continue + + # (b) rewind_target 누락 → supplement 요청 + if not target: + log("⚠️ NO without rewind_target — supplement requested") + try: gitea(f"issues/{n}/comments", "POST", {"body": + "⚠️ **[Orchestrator]** FINAL_CONSENSUS: NO requires a rewind_target line.\n\n" + "Add one of:\n" + " rewind_target: retry_same (technical_fail ONLY — push network/permission)\n" + " rewind_target: stage_1_review\n" + " rewind_target: stage_2_plan\n" + " rewind_target: stage_3_edit\n" + " rewind_target: stage_4_verify\n" + " rewind_target: stage_5_push\n\n" + "Stage 4 (test-verify) and Stage 6 (final-close) FORBID retry_same — must rewind to earlier stage."}) + except: pass + continue + + # (c) retry_same — verification stage 에서는 금지 (사용자 lock F) + if target == "retry_same": + if sid in VERIFY_STAGES: + log(f"⚠️ retry_same forbidden for {sid} — supplement requested") + try: gitea(f"issues/{n}/comments", "POST", {"body": + f"⚠️ **[Orchestrator]** retry_same is forbidden for Stage {sid}.\n\n" + "Verification stage NO must rewind to an earlier stage:\n" + " stage_1_review / stage_2_plan / stage_3_edit / stage_5_push"}) + except: pass + continue + log(f"🔁 retry_same — same stage round (technical retry)") + continue + + # (c2) continue_same — Stage 3 (code-edit) ONLY (2026-05-17 lock). + # RULES 의 "Stage 3 ONLY" spec 와 정합 — code-edit 가 아니면 supplement 요청. + # progress-based counter (Fix 1) : remaining_units 가 줄지 않을 때만 증가. + if target == "continue_same": + if sid != "code-edit": + log(f"⚠️ continue_same forbidden for {sid} (Stage 3 only) — supplement requested") + try: gitea(f"issues/{n}/comments", "POST", {"body": + f"⚠️ **[Orchestrator]** continue_same is allowed ONLY for Stage 3 (code-edit).\n\n" + f"Current stage: {sid}. Choose another rewind_target:\n" + " stage_1_review / stage_2_plan / stage_3_edit / stage_5_push / retry_same"}) + except: pass + continue + # Fix 1 — counter 는 *progress-based*. remaining_units 가 줄지 않을 때만 증가. + # 정상 진행 (u1→u2→u3 …) 은 매 round remaining 줄어듦 → counter reset. + # u1 stuck (3 round remaining 동일) = 진짜 progress 없음 → escalate. + ist_cs = get_issue_state(n) + cur_remaining = parse_remaining_units(last) + prev_remaining_list = ist_cs.get("last_remaining_units") + prev_remaining = set(prev_remaining_list) if prev_remaining_list is not None else None + + if cur_remaining is None: + # parse fail — Codex evidence 에 'Remaining units:' 줄 없음/잘못된 format + log("⚠️ continue_same but Remaining units line not parseable — supplement requested") + try: gitea(f"issues/{n}/comments", "POST", {"body": + "⚠️ **[Orchestrator]** continue_same requires a parseable line in the EVIDENCE block:\n\n" + " Remaining units: [u2, u3, u4] (or comma list / `none` if all complete)\n\n" + "Without this, orchestrator cannot verify progress between rounds."}) + except: pass + continue + + # Fix 1 (2026-05-17) — empty set + continue_same = 모순. + # 모든 unit 완료 = FINAL_CONSENSUS: YES 여야 함. continue_same X. + if not cur_remaining: + log("⚠️ continue_same with empty Remaining units — contradiction, supplement requested") + try: gitea(f"issues/{n}/comments", "POST", {"body": + "⚠️ **[Orchestrator]** Contradiction: rewind_target: continue_same but Remaining units: none.\n\n" + "If all implementation_units complete → FINAL_CONSENSUS: YES (not NO + continue_same).\n" + "If units remain → list them: `Remaining units: [uN, ...]`."}) + except: pass + continue + + # progress 판정 : remaining 가 *prev 와 같으면* 진행 없음 → counter+1 + if prev_remaining is not None and cur_remaining == prev_remaining: + cnt = (ist_cs.get("continue_same_count") or 0) + 1 + else: + cnt = 0 # remaining 변화 = progress. counter reset. + + update_issue_state(n, + continue_same_count=cnt, + last_remaining_units=sorted(cur_remaining)) + + if cnt >= 3: + log(f"⚠️ continue_same stuck — remaining_units unchanged {cnt}× → auto-escalate to stage_2_plan") + update_issue_state(n, continue_same_count=0, last_remaining_units=None) + fp = save_failure_report(n, sid, "simulation-plan", + last + f"\n\n[Auto-escalate: continue_same×{cnt} with remaining_units unchanged = plan/implementation stuck]") + update_issue_state(n, + failure_report_path=str(fp), + failure_from_stage=sid) + # Fix 9 (Phase A-3a) — Gitea POST 제거. state + failure_report 가 truth. + log_orchestrator_event(n, + f"AUTO-ESCALATE: continue_same stuck in {sid}, " + f"remaining_units={sorted(cur_remaining)} unchanged for {cnt} rounds. " + f"Rewinding to simulation-plan.") + return "rewind:simulation-plan" + + log(f"➡️ continue_same — remaining_units={sorted(cur_remaining)} (counter={cnt})") + continue + + # (d) target stage 로 rewind + rewind_sid = REWIND_TARGET_TO_SID.get(target) + if not rewind_sid: + log(f"⚠️ unknown rewind_target: {target}") + continue + + # (e) verification stage 의 self-rewind 금지 (Codex fix #3) + # test-verify → stage_4_verify / final-close → 자기자신 = 같은 stage 반복. + # retry_same 금지의 spirit 위배. + if sid in VERIFY_STAGES and rewind_sid == sid: + log(f"⚠️ self-rewind forbidden for verification stage {sid} — supplement requested") + try: gitea(f"issues/{n}/comments", "POST", {"body": + f"⚠️ **[Orchestrator]** rewind_target pointing to the same verification stage ({sid}) is forbidden.\n\n" + "Choose an EARLIER stage:\n" + " stage_1_review / stage_2_plan / stage_3_edit / stage_5_push (for final-close)"}) + except: pass + continue + + log(f"🔄 Codex NO → rewind to {rewind_sid}") + fp = save_failure_report(n, sid, rewind_sid, last) + log(f" Failure report: {fp}") + # state 에 failure_report_path 기록 (Codex fix #1 — context pack 가 read) + # rewind 시 continue_same_count reset (다른 stage 로 이동 = 새 cycle) + update_issue_state(n, + failure_report_path=str(fp), + failure_from_stage=sid, + continue_same_count=0, + last_remaining_units=None) + return f"rewind:{rewind_sid}" + + # status 가 None — 합의 마커 자체 미부착 + log(" no FINAL_CONSENSUS marker — continuing") + +# ═══════════════════════════════════════════════════════════════ +# Issue / Batch / Status / Main +# ═══════════════════════════════════════════════════════════════ + +def run_issue(n, until=None): + issue = get_issue(n) + if issue["state"] == "closed": log(f"#{n} closed, skip"); return + title = issue["title"]; body = issue.get("body", "") + header(f"Issue #{n}: {title}") + st = get_issue_state(n); cur = st.get("stage", "problem-review") + si = STAGE_IDS.index(cur) if cur in STAGE_IDS else 0 + ei = STAGE_IDS.index(until)+1 if until and until in STAGE_IDS else len(STAGES) + + i = si + while i < ei: + s = STAGES[i] + result = run_stage(n, title, body, s["id"]) + + # 외부 close + if result is False: + log(" Stage interrupted (issue closed externally)"); return + + # 되감기 (검증 실패) — Codex fix #4: comment English only + if isinstance(result, str) and result.startswith("rewind:"): + target_stage = result.split(":")[1] + target_idx = STAGE_IDS.index(target_stage) if target_stage in STAGE_IDS else si + + fp = ISSUES_DIR / f"{n}_stage_{s['id']}_failed.md" + if fp.exists(): + # Fix 9 (Phase A-3a) — Gitea POST 제거. state + failure_report 가 truth. + log_orchestrator_event(n, + f"STAGE FAILED — rewinding: " + f"{s['id']} ({s['label']}) → {STAGES[target_idx]['id']} ({STAGES[target_idx]['label']}). " + f"Failure report: {fp.name}") + + # state 갱신 — stage 이동, stage_start 초기화. failure_report_path 는 유지 + # (다음 stage 의 context pack 에서 read). + update_issue_state(n, stage=target_stage, stage_start_count=None, stage_start_stage=None) + log(f" Rewind: {s['label']} → {STAGES[target_idx]['label']}") + i = target_idx + continue + + # 정상 완료 — failure_report_path + counter / remaining tracker 모두 clear + generate_and_post_exit_report(n, s["id"]) + nxt = STAGE_IDS[i+1] if i+1 < len(STAGE_IDS) else "done" + update_issue_state(n, stage=nxt, stage_start_count=None, stage_start_stage=None, + failure_report_path=None, failure_from_stage=None, + continue_same_count=0, last_remaining_units=None) + + if s["id"] == "final-close": + try: gitea(f"issues/{n}", "PATCH", {"state": "closed"}); log("Closed") + except: pass + + i += 1 + + log(f"#{n} done: {STAGE_IDS[min(ei-1, len(STAGE_IDS)-1)]}") + +def run_all(start_from=None, until=None): + issues = get_open_issues() + if start_from: issues = [i for i in issues if i["number"] >= start_from] + if not issues: log("No issues"); return + header(f"Running {len(issues)} issues") + for i in issues: run_issue(i["number"], until); log(f"#{i['number']} → next") + header("Complete") + +def show_status(n=None): + state = load_state() + if n: + issue = get_issue(n); cs = get_comments(n); ist = state.get(str(n), {}) + exits = [s["id"] for s in STAGES if _erp(n, s["id"]).exists()] + print(f"\n #{n}: {issue['title']}\n stage={ist.get('stage','problem-review')} comments={len(cs)}") + print(f" exits: {', '.join(exits) or 'none'}\n"); return + issues = get_open_issues() + header(f"{GITEA_REPO} — {len(issues)} open") + for i in issues: + ist = state.get(str(i["number"]), {}); stage = ist.get("stage", "problem-review") + exits = sum(1 for s in STAGES if _erp(i["number"], s["id"]).exists()) + print(f" #{i['number']:>3} {i['title'][:40]:<40} [{stage}] exits:{exits}") + print() + +def main(): + if not GITEA_TOKEN: print("\n GITEA_TOKEN required\n"); sys.exit(1) + for d in [ORCH_DIR, ISSUES_DIR, TMP_DIR, DRAFTS_DIR]: d.mkdir(parents=True, exist_ok=True) + # P3-3 (2026-05-18) — orchestrator 종료 시 _SPAWNED 잔여 정리 안전망. + global _ORCH_CREATE + try: _ORCH_CREATE = psutil.Process(_ORCH_PID).create_time() + except Exception: _ORCH_CREATE = None + atexit.register(_orchestrator_exit_cleanup) + try: signal.signal(signal.SIGINT, _sigint_handler) + except (ValueError, AttributeError): pass # non-main thread or platform 미지원 + log(f"Claude: {CLAUDE_EXE}"); log(f"Codex: {CODEX_CMD}"); log(f"Repo: {GITEA_REPO}"); print() + p = argparse.ArgumentParser(description="Orchestrator v6") + p.add_argument("--issue", "-i", type=int); p.add_argument("--status", "-s", action="store_true") + p.add_argument("--from", dest="sf", type=int); p.add_argument("--until", choices=STAGE_IDS) + p.add_argument("--reset", type=int, metavar="N"); p.add_argument("--reset-all", action="store_true") + a = p.parse_args() + if a.reset: clear_state(a.reset); log(f"Cleared #{a.reset}") + elif a.reset_all: clear_state(); log("All cleared") + elif a.status: show_status(a.issue) + elif a.issue: run_issue(a.issue, a.until) + elif a.sf: run_all(a.sf, a.until) + else: run_all(until=a.until) + +if __name__ == "__main__": main() \ No newline at end of file diff --git a/tests/orchestrator_unit/__init__.py b/tests/orchestrator_unit/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/orchestrator_unit/test_orchestrator_core.py b/tests/orchestrator_unit/test_orchestrator_core.py new file mode 100644 index 0000000..1460071 --- /dev/null +++ b/tests/orchestrator_unit/test_orchestrator_core.py @@ -0,0 +1,247 @@ +"""P0-3 (2026-05-18) — orchestrator self-test minimum set. + +Covers detect_agent (the bug that caused #45 infinite loop), parse_consensus, +parse_remaining_units, IMPLEMENTATION_UNITS parsing, dual-write normalize. + +Run: pytest -q tests/orchestrator_unit/ +""" +import sys +from pathlib import Path + +# Add design_agent root to sys.path so we can import orchestrator.py +ROOT = Path(__file__).parent.parent.parent +sys.path.insert(0, str(ROOT)) + +from orchestrator import ( + detect_agent, + parse_consensus, + parse_remaining_units, + _is_execution_issue, +) +import re + + +class TestExecutionIssueDetection: + """P1-4 — execution sub-issue title detection.""" + + def test_execution_korean_pattern(self): + assert _is_execution_issue("[IMP-15 실행-1] image_aspect_mismatch") is True + assert _is_execution_issue("[IMP-15 실행-2] table overflow") is True + assert _is_execution_issue("[IMP-15 실행 3] something") is True + + def test_execution_english_pattern(self): + assert _is_execution_issue("[IMP-15 exec-1] image") is True + assert _is_execution_issue("[IMP-15 EXEC 2] table") is True + + def test_non_execution_title(self): + assert _is_execution_issue("IMP-15 Step 14 visual_check 보강") is False + assert _is_execution_issue("IMP-09 B-4 다른 layout zone-geometry") is False + + def test_empty_title(self): + assert _is_execution_issue("") is False + assert _is_execution_issue(None) is False + + +# ───────────────────────────────────────────────────────────────── +# detect_agent — the bug that caused #45 infinite loop +# ───────────────────────────────────────────────────────────────── + +class TestDetectAgent: + def test_claude_header(self): + assert detect_agent("[Claude #1] Stage 1 ...") == "claude" + + def test_codex_header(self): + assert detect_agent("[Codex #1] Stage 1 review") == "codex" + + def test_codex_body_with_claude_citation(self): + """The exact bug from #45 — Codex body contains [Claude #N] citation in + EVIDENCE section. Old detect_agent returned 'claude' (wrong).""" + body = """[Codex #2] Stage 2 Round #1 simulation-plan verification + +Verdict: NO. + +=== EVIDENCE === +- Read current-stage Gitea comment `[Claude #2] Stage 2 Round #1 - Plan` only +""" + assert detect_agent(body) == "codex", \ + "Codex body containing [Claude #N] citation must still detect as codex" + + def test_claude_body_with_codex_citation(self): + body = """[Claude #3] Stage 2 Round #2 - Plan + +Addressing [Codex #2] findings ... +""" + assert detect_agent(body) == "claude" + + def test_empty_body(self): + assert detect_agent("") is None + assert detect_agent(None) is None + assert detect_agent(" \n ") is None + + def test_no_agent_header(self): + assert detect_agent("This is some random text without any agent marker") is None + + def test_leading_whitespace_before_header(self): + body = " \n[Codex #1] header after whitespace" + assert detect_agent(body) == "codex" + + def test_header_must_be_at_start(self): + """Body that doesn't start with [Agent header should return None.""" + body = "Some intro text.\n[Codex #1] header on second line" + # P0-1 fix: only first non-empty line is checked. + # First line = "Some intro text." → no match → None + assert detect_agent(body) is None + + def test_header_with_hash_immediately(self): + """[Codex#1] (no space) should still match per regex \\[Codex[\\s#].""" + assert detect_agent("[Codex#1] hello") == "codex" + assert detect_agent("[Claude#5] hi") == "claude" + + +# ───────────────────────────────────────────────────────────────── +# parse_consensus — YES/NO + rewind_target +# ───────────────────────────────────────────────────────────────── + +class TestParseConsensus: + def test_yes_only(self): + body = "Some text.\nFINAL_CONSENSUS: YES" + assert parse_consensus(body) == ("YES", None) + + def test_no_with_rewind_target(self): + body = "Some text.\nrewind_target: stage_2_plan\nFINAL_CONSENSUS: NO" + assert parse_consensus(body) == ("NO", "stage_2_plan") + + def test_no_with_continue_same(self): + body = "blah\nrewind_target: continue_same\nFINAL_CONSENSUS: NO" + assert parse_consensus(body) == ("NO", "continue_same") + + def test_no_target_only_in_last_10_lines(self): + """parse_consensus only scans last 10 lines.""" + body = "rewind_target: stage_1_review\n" + "\n".join(["filler"] * 20) + "\nFINAL_CONSENSUS: NO" + status, target = parse_consensus(body) + assert status == "NO" + assert target is None # too far from end to be picked up + + def test_no_consensus_marker(self): + assert parse_consensus("just text, no marker") == (None, None) + + def test_empty_body(self): + assert parse_consensus("") == (None, None) + assert parse_consensus(None) == (None, None) + + def test_unknown_rewind_target_ignored(self): + body = "rewind_target: bogus_target\nFINAL_CONSENSUS: NO" + status, target = parse_consensus(body) + assert status == "NO" + assert target is None # bogus is not in REWIND_TARGET_TO_SID + + +# ───────────────────────────────────────────────────────────────── +# parse_remaining_units — Stage 3 continue_same progress detection +# ───────────────────────────────────────────────────────────────── + +class TestParseRemainingUnits: + def test_bracketed_list(self): + body = "Remaining units: [u2, u3, u4]" + assert parse_remaining_units(body) == {"u2", "u3", "u4"} + + def test_comma_list_no_brackets(self): + body = "Remaining units: u5, u6, u7" + assert parse_remaining_units(body) == {"u5", "u6", "u7"} + + def test_none_explicit(self): + assert parse_remaining_units("Remaining units: none") == set() + assert parse_remaining_units("Remaining units: []") == set() + assert parse_remaining_units("Remaining units: (none)") == set() + assert parse_remaining_units("Remaining units: -") == set() + + def test_line_not_present(self): + assert parse_remaining_units("no remaining units mentioned here") is None + + def test_case_insensitive(self): + body = "REMAINING UNITS: [U1, U2]" + assert parse_remaining_units(body) == {"u1", "u2"} + + def test_only_u_prefixed_digits(self): + """Sentence noise ignored — only u\\d+ pattern matched.""" + body = "Remaining units: I still need to do u3 and u7 work" + assert parse_remaining_units(body) == {"u3", "u7"} + + def test_empty_body(self): + assert parse_remaining_units("") is None + assert parse_remaining_units(None) is None + + +# ───────────────────────────────────────────────────────────────── +# IMPLEMENTATION_UNITS block parsing (used in Stage 2 YES guard) +# ───────────────────────────────────────────────────────────────── + +class TestImplementationUnitsBlock: + """Reproduces the parser in run_stage Stage 2 YES guard (line ~810).""" + + def _parse(self, body): + iu_block_pat = re.compile( + r"===\s*IMPLEMENTATION_UNITS\s*===\s*\n(.*?)(?=\n===\s|\Z)", + re.IGNORECASE | re.DOTALL, + ) + iu_unit_pat = re.compile(r"^\s*-\s*id:\s*u\d+", re.IGNORECASE | re.MULTILINE) + m = iu_block_pat.search(body or "") + return bool(m and iu_unit_pat.search(m.group(1))) + + def test_valid_block(self): + body = """text + +=== IMPLEMENTATION_UNITS === +- id: u1 + summary: ... +- id: u2 + summary: ... +""" + assert self._parse(body) is True + + def test_empty_block(self): + body = "=== IMPLEMENTATION_UNITS ===\n(no entries)\n" + assert self._parse(body) is False # header but no - id: uN entry + + def test_block_missing(self): + body = "just text, no implementation_units" + assert self._parse(body) is False + + def test_block_with_only_non_u_entries(self): + body = """=== IMPLEMENTATION_UNITS === +- id: alpha + summary: ... +""" + assert self._parse(body) is False # 'alpha' is not 'u\\d+' + + +# ───────────────────────────────────────────────────────────────── +# Direct integration check — the #45 bug case +# ───────────────────────────────────────────────────────────────── + +class TestRegressionForIssue45Bug: + """Verify the exact body shape that caused #45 infinite loop is now handled.""" + + def test_codex_no_with_claude_citation_full_flow(self): + body = """[Codex #3] Stage 2 Round #2 simulation-plan verification for issue #45 + +Verdict: NO. The plan covers main axes but violates two Stage 2 requirements. + +Findings: +- Unit u1 declares tests: [] in === IMPLEMENTATION_UNITS === +- xfail-strict mechanism unclear + +=== EVIDENCE === +Commands run: +- git rev-parse HEAD +- Read current-stage Gitea comment `[Claude #3] Stage 2 Round #2 - Plan` + +rewind_target: stage_2_plan +FINAL_CONSENSUS: NO +""" + # P0-1 fix: detect_agent reads only first line → "[Codex #3]" → codex + assert detect_agent(body) == "codex", "P0-1 regression test" + # parse_consensus: NO + rewind_target stage_2_plan + status, target = parse_consensus(body) + assert status == "NO" + assert target == "stage_2_plan" diff --git a/tests/orchestrator_unit/test_subprocess_cleanup.py b/tests/orchestrator_unit/test_subprocess_cleanup.py new file mode 100644 index 0000000..522b63b --- /dev/null +++ b/tests/orchestrator_unit/test_subprocess_cleanup.py @@ -0,0 +1,264 @@ +"""P3-5 (2026-05-18) — subprocess cleanup hardening verification. + +Covers: + C1: 정상 종료 → tree 잔류 0 + C2: timeout → TimeoutExpired raise + 자손 0 + C3: grandchild spawn 후 parent timeout → grandchild 정리 + C4: 외부 (orchestrator 가 spawn 안한) 프로세스 보호 + C5: _kill_process_tree(self.pid) 호출해도 orchestrator 자살 안 함 + C6 (CORE): parent 정상 종료 후 grandchild orphan 정리 — PID 2780 regression + +Run: pytest -q tests/orchestrator_unit/test_subprocess_cleanup.py +""" +import os +import sys +import time +import subprocess +from pathlib import Path + +import psutil +import pytest + +ROOT = Path(__file__).parent.parent.parent +sys.path.insert(0, str(ROOT)) + +from orchestrator import ( + _kill_process_tree, + _kill_tracked, + _run_with_tree_kill, + _proc_signature, + _is_same_process, + _SPAWNED, +) + + +# ───────────────────────────────────────────────────────────────── +# Helpers +# ───────────────────────────────────────────────────────────────── + +def _py(): + """Path to current Python interpreter — used to spawn dummy subprocesses.""" + return sys.executable + +def _alive(pid): + try: + return psutil.Process(pid).is_running() and psutil.Process(pid).status() != psutil.STATUS_ZOMBIE + except psutil.NoSuchProcess: + return False + + +# ───────────────────────────────────────────────────────────────── +# Signature helpers +# ───────────────────────────────────────────────────────────────── + +class TestSignatureHelpers: + def test_proc_signature_alive(self): + p = psutil.Process(os.getpid()) + sig = _proc_signature(p) + assert sig is not None + assert sig[0] == os.getpid() + assert isinstance(sig[1], float) + + def test_is_same_process_orch_self_blocked(self): + """C5 prep — orchestrator 자기 자신은 절대 same-process true 안 됨.""" + p = psutil.Process(os.getpid()) + sig = _proc_signature(p) + # _is_same_process 가 _ORCH_PID 체크로 False 반환해야 함. + assert _is_same_process(sig[0], sig[1]) is False + + def test_is_same_process_dead_pid(self): + # 사용 가능성 낮은 PID 999999 — 거의 확실히 죽음. + assert _is_same_process(999999, time.time()) is False + + def test_is_same_process_wrong_create_time(self): + """PID 재사용 회피 검증 — 같은 PID 라도 create_time 안 맞으면 False.""" + # 살아있는 외부 프로세스 빌려서 일부러 어긋난 create_time 으로 호출. + # System Idle 같은 특수 프로세스 (create_time=0) 회피 — 우리가 띄운 dummy 사용. + dummy = subprocess.Popen( + [_py(), "-c", "import time; time.sleep(5)"], + stdin=subprocess.DEVNULL, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, + ) + try: + # 실제 create_time 보다 1 년 전 시각 → 명백한 mismatch. + far_past = time.time() - 365 * 24 * 3600 + assert _is_same_process(dummy.pid, far_past) is False + # 맞는 create_time 으로는 True 여야 함 (sanity). + real_ct = psutil.Process(dummy.pid).create_time() + assert _is_same_process(dummy.pid, real_ct) is True + finally: + dummy.kill() + dummy.wait(timeout=5) + + +# ───────────────────────────────────────────────────────────────── +# C1: 정상 종료 — tree 잔류 0 +# ───────────────────────────────────────────────────────────────── + +class TestC1_NormalExit: + def test_dummy_short_run_no_residue(self): + r = _run_with_tree_kill( + [_py(), "-c", "import time; time.sleep(0.3)"], + timeout=10, + ) + assert r.returncode == 0 + # 호출 후 _SPAWNED 에 우리 호출 잔재가 남으면 안 됨 (wrapper 가 discard). + # 다른 테스트 영향 가능성 있어서 set 전체가 0 이 아니어도 됨, 단 우리 잔재 없으면 OK. + # 보수적으로 — 우리 호출 직전에 _SPAWNED 가 비어있었으면 직후에도 비어있어야 함. + assert len(_SPAWNED) == 0 + + +# ───────────────────────────────────────────────────────────────── +# C2: Timeout — TimeoutExpired raise + 자손 정리 +# ───────────────────────────────────────────────────────────────── + +class TestC2_Timeout: + def test_dummy_long_sleep_times_out(self): + with pytest.raises(subprocess.TimeoutExpired): + _run_with_tree_kill( + [_py(), "-c", "import time; time.sleep(60)"], + timeout=1.5, + ) + # raise 후에도 _SPAWNED 우리 잔재 없어야 함 (wrapper finally 가 discard). + assert len(_SPAWNED) == 0 + + +# ───────────────────────────────────────────────────────────────── +# C3: grandchild orphan 정리 — parent timeout path +# ───────────────────────────────────────────────────────────────── + +class TestC3_GrandchildTimeoutPath: + def test_grandchild_killed_on_parent_timeout(self): + # parent 가 grandchild 띄우고 자기는 sleep — timeout 으로 강제 종료. + # grandchild 도 정리돼야 함. + # PID 캡처를 위해 grandchild 가 자기 PID 를 파일에 기록. + marker = ROOT / ".orchestrator" / "tmp" / "test_c3_gc_pid.txt" + marker.parent.mkdir(parents=True, exist_ok=True) + if marker.exists(): marker.unlink() + + # grandchild 의 stdin/stdout/stderr 를 DEVNULL 로 분리 — production 의 claude.exe→python.exe - + # 케이스와 동일 (grandchild 가 wrapper 의 pipe 핸들 안 상속). 안 그러면 pipe inheritance 로 + # communicate() 가 hang. + spawn_code = ( + f"import subprocess, time, sys, os; " + f"gc = subprocess.Popen(" + f" [sys.executable, '-c', 'import time; time.sleep(60)'], " + f" stdin=subprocess.DEVNULL, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL); " + f"open(r'{marker}', 'w').write(str(gc.pid)); " + f"time.sleep(60)" + ) + + with pytest.raises(subprocess.TimeoutExpired): + _run_with_tree_kill( + [_py(), "-c", spawn_code], + timeout=3, + ) + + # marker 파일에서 grandchild PID 읽기. + assert marker.exists(), "grandchild marker not written — parent died too early" + gc_pid = int(marker.read_text().strip()) + + # 잠시 대기 (cleanup 비동기 가능성) 후 grandchild 죽었는지 확인. + deadline = time.time() + 5 + while time.time() < deadline and _alive(gc_pid): + time.sleep(0.2) + assert not _alive(gc_pid), f"grandchild PID {gc_pid} still alive after parent timeout" + + +# ───────────────────────────────────────────────────────────────── +# C4: 외부 프로세스 보호 +# ───────────────────────────────────────────────────────────────── + +class TestC4_ExternalProcessProtection: + def test_outsider_not_killed(self): + # 사용자가 직접 띄운 척하는 외부 프로세스 (orchestrator 가 spawn 안 함). + outsider = subprocess.Popen([_py(), "-c", "import time; time.sleep(10)"]) + try: + # _kill_tracked 에 외부 PID 의 (잘못된) signature 넘기면 무시돼야 함. + # signature 일치 안 하면 _is_same_process False → kill 안 됨. + wrong_sig = [(outsider.pid, 0.0)] # create_time 안 맞음 + cleaned = _kill_tracked(wrong_sig) + assert cleaned == 0 + assert _alive(outsider.pid), "outsider killed despite wrong create_time" + finally: + outsider.kill() + outsider.wait(timeout=5) + + +# ───────────────────────────────────────────────────────────────── +# C5: orchestrator 자살 방지 +# ───────────────────────────────────────────────────────────────── + +class TestC5_SelfKillProtection: + def test_kill_process_tree_self_pid_noop(self): + """orchestrator(=pytest) PID 로 _kill_process_tree 호출해도 죽으면 안 됨.""" + result = _kill_process_tree(os.getpid()) + assert result == 0 # ORCH_PID 검사로 즉시 0 반환 + + def test_kill_tracked_with_orch_pid_noop(self): + # 일부러 self signature 를 tracked 에 넣어도 _is_same_process False → skip. + self_p = psutil.Process(os.getpid()) + self_sig = _proc_signature(self_p) + cleaned = _kill_tracked([self_sig]) + assert cleaned == 0 # 자기 자신 보호 + + +# ───────────────────────────────────────────────────────────────── +# C6 (CORE): parent 정상 종료 후 grandchild orphan 정리 +# — PID 2780 regression test +# ───────────────────────────────────────────────────────────────── + +class TestC6_OrphanGrandchildAfterNormalExit: + """PID 2780 path: parent 가 정상 exit 했는데 grandchild 만 살아남는 케이스. + monitor thread 가 parent 살아있을 때 grandchild 를 미리 추적해서 finally 에서 정리해야 함.""" + + def test_grandchild_killed_after_parent_normal_exit(self): + marker = ROOT / ".orchestrator" / "tmp" / "test_c6_gc_pid.txt" + marker.parent.mkdir(parents=True, exist_ok=True) + if marker.exists(): marker.unlink() + + # parent 가: + # 1. grandchild 띄움 (DEVNULL 격리 — production claude.exe→python.exe - 과 동등). + # 2. PID 마커에 기록. + # 3. monitor 가 1초 polling 으로 catch 할 시간 확보 (2.5초 sleep). + # 4. 정상 종료. + spawn_code = ( + f"import subprocess, time, sys, os; " + f"gc = subprocess.Popen(" + f" [sys.executable, '-c', 'import time; time.sleep(60)'], " + f" stdin=subprocess.DEVNULL, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL); " + f"open(r'{marker}', 'w').write(str(gc.pid)); " + f"time.sleep(2.5); " + f"sys.exit(0)" + ) + + # 정상 종료 (timeout 안 걸림) — wrapper 의 finally cleanup 만으로 grandchild 잡혀야 함. + r = _run_with_tree_kill( + [_py(), "-c", spawn_code], + timeout=15, + ) + assert r.returncode == 0, "parent did not exit normally" + + # marker 에서 grandchild PID. + assert marker.exists(), "grandchild marker missing" + gc_pid = int(marker.read_text().strip()) + + # 정리 비동기 가능성 → 짧게 대기 후 확인. + deadline = time.time() + 5 + while time.time() < deadline and _alive(gc_pid): + time.sleep(0.2) + assert not _alive(gc_pid), ( + f"REGRESSION: grandchild PID {gc_pid} survived parent normal exit " + f"(PID 2780 path not fixed)" + ) + + +# ───────────────────────────────────────────────────────────────── +# Bonus: _SPAWNED discipline — 다중 호출 후 누적 안 됨 +# ───────────────────────────────────────────────────────────────── + +class TestSpawnedDiscipline: + def test_spawned_drained_between_calls(self): + for _ in range(3): + _run_with_tree_kill([_py(), "-c", "pass"], timeout=10) + # 3 회 호출 후에도 우리 잔재 없음 (wrapper finally 가 discard). + assert len(_SPAWNED) == 0