feat(orchestrator): initial orchestrator + subprocess cleanup hardening

Pre-existing P0+P1 fixes (verified via #45 pilot 2026-05-18):
- P0-1: detect_agent first-line only (fixes #45 infinite loop)
- P0-2: stage_start_count sanity reset on external comment delete
- P0-3: 32 pytest cases for parse/detect regressions
- P1-4: execution-issue mode prompt (compact scope-tight)
- P1-5: Stage 2 COMPACT_PLAN_RULE (size budget, no code snippets)
- P1-6: tests:[] orchestrator-level enforcement at Stage 2 YES guard
- P1-7: dual-write CRLF/trailing-whitespace normalize

P3 subprocess cleanup (PID 2780 orphan grandchild regression):
- (pid, create_time) signature tracking — Windows PID reuse safe
- _kill_process_tree: parent-alive traversal path
- _kill_tracked: parent-dead orphan path
- _run_with_tree_kill: 1s monitor thread captures descendants live
- atexit + SIGINT safety net via _SPAWNED set
- 4 subprocess.run sites switched to wrapper (compaction/exit_report/
  run_claude/run_codex)
- 12 cleanup pytest cases incl. C6 PID 2780 regression test

Selenium boundary unchanged — driver.quit() in phase_z2_pipeline.py
and slide_measurer.py already protected by try/finally.

Total: 44/44 pytest pass (32 core + 12 cleanup).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
2026-05-18 18:56:06 +09:00
parent e9b3d2e9c0
commit f3bff898fb
4 changed files with 1991 additions and 0 deletions

1480
orchestrator.py Normal file

File diff suppressed because it is too large Load Diff

View File

View File

@@ -0,0 +1,247 @@
"""P0-3 (2026-05-18) — orchestrator self-test minimum set.
Covers detect_agent (the bug that caused #45 infinite loop), parse_consensus,
parse_remaining_units, IMPLEMENTATION_UNITS parsing, dual-write normalize.
Run: pytest -q tests/orchestrator_unit/
"""
import sys
from pathlib import Path
# Add design_agent root to sys.path so we can import orchestrator.py
ROOT = Path(__file__).parent.parent.parent
sys.path.insert(0, str(ROOT))
from orchestrator import (
detect_agent,
parse_consensus,
parse_remaining_units,
_is_execution_issue,
)
import re
class TestExecutionIssueDetection:
"""P1-4 — execution sub-issue title detection."""
def test_execution_korean_pattern(self):
assert _is_execution_issue("[IMP-15 실행-1] image_aspect_mismatch") is True
assert _is_execution_issue("[IMP-15 실행-2] table overflow") is True
assert _is_execution_issue("[IMP-15 실행 3] something") is True
def test_execution_english_pattern(self):
assert _is_execution_issue("[IMP-15 exec-1] image") is True
assert _is_execution_issue("[IMP-15 EXEC 2] table") is True
def test_non_execution_title(self):
assert _is_execution_issue("IMP-15 Step 14 visual_check 보강") is False
assert _is_execution_issue("IMP-09 B-4 다른 layout zone-geometry") is False
def test_empty_title(self):
assert _is_execution_issue("") is False
assert _is_execution_issue(None) is False
# ─────────────────────────────────────────────────────────────────
# detect_agent — the bug that caused #45 infinite loop
# ─────────────────────────────────────────────────────────────────
class TestDetectAgent:
def test_claude_header(self):
assert detect_agent("[Claude #1] Stage 1 ...") == "claude"
def test_codex_header(self):
assert detect_agent("[Codex #1] Stage 1 review") == "codex"
def test_codex_body_with_claude_citation(self):
"""The exact bug from #45 — Codex body contains [Claude #N] citation in
EVIDENCE section. Old detect_agent returned 'claude' (wrong)."""
body = """[Codex #2] Stage 2 Round #1 simulation-plan verification
Verdict: NO.
=== EVIDENCE ===
- Read current-stage Gitea comment `[Claude #2] Stage 2 Round #1 - Plan` only
"""
assert detect_agent(body) == "codex", \
"Codex body containing [Claude #N] citation must still detect as codex"
def test_claude_body_with_codex_citation(self):
body = """[Claude #3] Stage 2 Round #2 - Plan
Addressing [Codex #2] findings ...
"""
assert detect_agent(body) == "claude"
def test_empty_body(self):
assert detect_agent("") is None
assert detect_agent(None) is None
assert detect_agent(" \n ") is None
def test_no_agent_header(self):
assert detect_agent("This is some random text without any agent marker") is None
def test_leading_whitespace_before_header(self):
body = " \n[Codex #1] header after whitespace"
assert detect_agent(body) == "codex"
def test_header_must_be_at_start(self):
"""Body that doesn't start with [Agent header should return None."""
body = "Some intro text.\n[Codex #1] header on second line"
# P0-1 fix: only first non-empty line is checked.
# First line = "Some intro text." → no match → None
assert detect_agent(body) is None
def test_header_with_hash_immediately(self):
"""[Codex#1] (no space) should still match per regex \\[Codex[\\s#]."""
assert detect_agent("[Codex#1] hello") == "codex"
assert detect_agent("[Claude#5] hi") == "claude"
# ─────────────────────────────────────────────────────────────────
# parse_consensus — YES/NO + rewind_target
# ─────────────────────────────────────────────────────────────────
class TestParseConsensus:
def test_yes_only(self):
body = "Some text.\nFINAL_CONSENSUS: YES"
assert parse_consensus(body) == ("YES", None)
def test_no_with_rewind_target(self):
body = "Some text.\nrewind_target: stage_2_plan\nFINAL_CONSENSUS: NO"
assert parse_consensus(body) == ("NO", "stage_2_plan")
def test_no_with_continue_same(self):
body = "blah\nrewind_target: continue_same\nFINAL_CONSENSUS: NO"
assert parse_consensus(body) == ("NO", "continue_same")
def test_no_target_only_in_last_10_lines(self):
"""parse_consensus only scans last 10 lines."""
body = "rewind_target: stage_1_review\n" + "\n".join(["filler"] * 20) + "\nFINAL_CONSENSUS: NO"
status, target = parse_consensus(body)
assert status == "NO"
assert target is None # too far from end to be picked up
def test_no_consensus_marker(self):
assert parse_consensus("just text, no marker") == (None, None)
def test_empty_body(self):
assert parse_consensus("") == (None, None)
assert parse_consensus(None) == (None, None)
def test_unknown_rewind_target_ignored(self):
body = "rewind_target: bogus_target\nFINAL_CONSENSUS: NO"
status, target = parse_consensus(body)
assert status == "NO"
assert target is None # bogus is not in REWIND_TARGET_TO_SID
# ─────────────────────────────────────────────────────────────────
# parse_remaining_units — Stage 3 continue_same progress detection
# ─────────────────────────────────────────────────────────────────
class TestParseRemainingUnits:
def test_bracketed_list(self):
body = "Remaining units: [u2, u3, u4]"
assert parse_remaining_units(body) == {"u2", "u3", "u4"}
def test_comma_list_no_brackets(self):
body = "Remaining units: u5, u6, u7"
assert parse_remaining_units(body) == {"u5", "u6", "u7"}
def test_none_explicit(self):
assert parse_remaining_units("Remaining units: none") == set()
assert parse_remaining_units("Remaining units: []") == set()
assert parse_remaining_units("Remaining units: (none)") == set()
assert parse_remaining_units("Remaining units: -") == set()
def test_line_not_present(self):
assert parse_remaining_units("no remaining units mentioned here") is None
def test_case_insensitive(self):
body = "REMAINING UNITS: [U1, U2]"
assert parse_remaining_units(body) == {"u1", "u2"}
def test_only_u_prefixed_digits(self):
"""Sentence noise ignored — only u\\d+ pattern matched."""
body = "Remaining units: I still need to do u3 and u7 work"
assert parse_remaining_units(body) == {"u3", "u7"}
def test_empty_body(self):
assert parse_remaining_units("") is None
assert parse_remaining_units(None) is None
# ─────────────────────────────────────────────────────────────────
# IMPLEMENTATION_UNITS block parsing (used in Stage 2 YES guard)
# ─────────────────────────────────────────────────────────────────
class TestImplementationUnitsBlock:
"""Reproduces the parser in run_stage Stage 2 YES guard (line ~810)."""
def _parse(self, body):
iu_block_pat = re.compile(
r"===\s*IMPLEMENTATION_UNITS\s*===\s*\n(.*?)(?=\n===\s|\Z)",
re.IGNORECASE | re.DOTALL,
)
iu_unit_pat = re.compile(r"^\s*-\s*id:\s*u\d+", re.IGNORECASE | re.MULTILINE)
m = iu_block_pat.search(body or "")
return bool(m and iu_unit_pat.search(m.group(1)))
def test_valid_block(self):
body = """text
=== IMPLEMENTATION_UNITS ===
- id: u1
summary: ...
- id: u2
summary: ...
"""
assert self._parse(body) is True
def test_empty_block(self):
body = "=== IMPLEMENTATION_UNITS ===\n(no entries)\n"
assert self._parse(body) is False # header but no - id: uN entry
def test_block_missing(self):
body = "just text, no implementation_units"
assert self._parse(body) is False
def test_block_with_only_non_u_entries(self):
body = """=== IMPLEMENTATION_UNITS ===
- id: alpha
summary: ...
"""
assert self._parse(body) is False # 'alpha' is not 'u\\d+'
# ─────────────────────────────────────────────────────────────────
# Direct integration check — the #45 bug case
# ─────────────────────────────────────────────────────────────────
class TestRegressionForIssue45Bug:
"""Verify the exact body shape that caused #45 infinite loop is now handled."""
def test_codex_no_with_claude_citation_full_flow(self):
body = """[Codex #3] Stage 2 Round #2 simulation-plan verification for issue #45
Verdict: NO. The plan covers main axes but violates two Stage 2 requirements.
Findings:
- Unit u1 declares tests: [] in === IMPLEMENTATION_UNITS ===
- xfail-strict mechanism unclear
=== EVIDENCE ===
Commands run:
- git rev-parse HEAD
- Read current-stage Gitea comment `[Claude #3] Stage 2 Round #2 - Plan`
rewind_target: stage_2_plan
FINAL_CONSENSUS: NO
"""
# P0-1 fix: detect_agent reads only first line → "[Codex #3]" → codex
assert detect_agent(body) == "codex", "P0-1 regression test"
# parse_consensus: NO + rewind_target stage_2_plan
status, target = parse_consensus(body)
assert status == "NO"
assert target == "stage_2_plan"

View File

@@ -0,0 +1,264 @@
"""P3-5 (2026-05-18) — subprocess cleanup hardening verification.
Covers:
C1: 정상 종료 → tree 잔류 0
C2: timeout → TimeoutExpired raise + 자손 0
C3: grandchild spawn 후 parent timeout → grandchild 정리
C4: 외부 (orchestrator 가 spawn 안한) 프로세스 보호
C5: _kill_process_tree(self.pid) 호출해도 orchestrator 자살 안 함
C6 (CORE): parent 정상 종료 후 grandchild orphan 정리 — PID 2780 regression
Run: pytest -q tests/orchestrator_unit/test_subprocess_cleanup.py
"""
import os
import sys
import time
import subprocess
from pathlib import Path
import psutil
import pytest
ROOT = Path(__file__).parent.parent.parent
sys.path.insert(0, str(ROOT))
from orchestrator import (
_kill_process_tree,
_kill_tracked,
_run_with_tree_kill,
_proc_signature,
_is_same_process,
_SPAWNED,
)
# ─────────────────────────────────────────────────────────────────
# Helpers
# ─────────────────────────────────────────────────────────────────
def _py():
"""Path to current Python interpreter — used to spawn dummy subprocesses."""
return sys.executable
def _alive(pid):
try:
return psutil.Process(pid).is_running() and psutil.Process(pid).status() != psutil.STATUS_ZOMBIE
except psutil.NoSuchProcess:
return False
# ─────────────────────────────────────────────────────────────────
# Signature helpers
# ─────────────────────────────────────────────────────────────────
class TestSignatureHelpers:
def test_proc_signature_alive(self):
p = psutil.Process(os.getpid())
sig = _proc_signature(p)
assert sig is not None
assert sig[0] == os.getpid()
assert isinstance(sig[1], float)
def test_is_same_process_orch_self_blocked(self):
"""C5 prep — orchestrator 자기 자신은 절대 same-process true 안 됨."""
p = psutil.Process(os.getpid())
sig = _proc_signature(p)
# _is_same_process 가 _ORCH_PID 체크로 False 반환해야 함.
assert _is_same_process(sig[0], sig[1]) is False
def test_is_same_process_dead_pid(self):
# 사용 가능성 낮은 PID 999999 — 거의 확실히 죽음.
assert _is_same_process(999999, time.time()) is False
def test_is_same_process_wrong_create_time(self):
"""PID 재사용 회피 검증 — 같은 PID 라도 create_time 안 맞으면 False."""
# 살아있는 외부 프로세스 빌려서 일부러 어긋난 create_time 으로 호출.
# System Idle 같은 특수 프로세스 (create_time=0) 회피 — 우리가 띄운 dummy 사용.
dummy = subprocess.Popen(
[_py(), "-c", "import time; time.sleep(5)"],
stdin=subprocess.DEVNULL, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL,
)
try:
# 실제 create_time 보다 1 년 전 시각 → 명백한 mismatch.
far_past = time.time() - 365 * 24 * 3600
assert _is_same_process(dummy.pid, far_past) is False
# 맞는 create_time 으로는 True 여야 함 (sanity).
real_ct = psutil.Process(dummy.pid).create_time()
assert _is_same_process(dummy.pid, real_ct) is True
finally:
dummy.kill()
dummy.wait(timeout=5)
# ─────────────────────────────────────────────────────────────────
# C1: 정상 종료 — tree 잔류 0
# ─────────────────────────────────────────────────────────────────
class TestC1_NormalExit:
def test_dummy_short_run_no_residue(self):
r = _run_with_tree_kill(
[_py(), "-c", "import time; time.sleep(0.3)"],
timeout=10,
)
assert r.returncode == 0
# 호출 후 _SPAWNED 에 우리 호출 잔재가 남으면 안 됨 (wrapper 가 discard).
# 다른 테스트 영향 가능성 있어서 set 전체가 0 이 아니어도 됨, 단 우리 잔재 없으면 OK.
# 보수적으로 — 우리 호출 직전에 _SPAWNED 가 비어있었으면 직후에도 비어있어야 함.
assert len(_SPAWNED) == 0
# ─────────────────────────────────────────────────────────────────
# C2: Timeout — TimeoutExpired raise + 자손 정리
# ─────────────────────────────────────────────────────────────────
class TestC2_Timeout:
def test_dummy_long_sleep_times_out(self):
with pytest.raises(subprocess.TimeoutExpired):
_run_with_tree_kill(
[_py(), "-c", "import time; time.sleep(60)"],
timeout=1.5,
)
# raise 후에도 _SPAWNED 우리 잔재 없어야 함 (wrapper finally 가 discard).
assert len(_SPAWNED) == 0
# ─────────────────────────────────────────────────────────────────
# C3: grandchild orphan 정리 — parent timeout path
# ─────────────────────────────────────────────────────────────────
class TestC3_GrandchildTimeoutPath:
def test_grandchild_killed_on_parent_timeout(self):
# parent 가 grandchild 띄우고 자기는 sleep — timeout 으로 강제 종료.
# grandchild 도 정리돼야 함.
# PID 캡처를 위해 grandchild 가 자기 PID 를 파일에 기록.
marker = ROOT / ".orchestrator" / "tmp" / "test_c3_gc_pid.txt"
marker.parent.mkdir(parents=True, exist_ok=True)
if marker.exists(): marker.unlink()
# grandchild 의 stdin/stdout/stderr 를 DEVNULL 로 분리 — production 의 claude.exe→python.exe -
# 케이스와 동일 (grandchild 가 wrapper 의 pipe 핸들 안 상속). 안 그러면 pipe inheritance 로
# communicate() 가 hang.
spawn_code = (
f"import subprocess, time, sys, os; "
f"gc = subprocess.Popen("
f" [sys.executable, '-c', 'import time; time.sleep(60)'], "
f" stdin=subprocess.DEVNULL, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL); "
f"open(r'{marker}', 'w').write(str(gc.pid)); "
f"time.sleep(60)"
)
with pytest.raises(subprocess.TimeoutExpired):
_run_with_tree_kill(
[_py(), "-c", spawn_code],
timeout=3,
)
# marker 파일에서 grandchild PID 읽기.
assert marker.exists(), "grandchild marker not written — parent died too early"
gc_pid = int(marker.read_text().strip())
# 잠시 대기 (cleanup 비동기 가능성) 후 grandchild 죽었는지 확인.
deadline = time.time() + 5
while time.time() < deadline and _alive(gc_pid):
time.sleep(0.2)
assert not _alive(gc_pid), f"grandchild PID {gc_pid} still alive after parent timeout"
# ─────────────────────────────────────────────────────────────────
# C4: 외부 프로세스 보호
# ─────────────────────────────────────────────────────────────────
class TestC4_ExternalProcessProtection:
def test_outsider_not_killed(self):
# 사용자가 직접 띄운 척하는 외부 프로세스 (orchestrator 가 spawn 안 함).
outsider = subprocess.Popen([_py(), "-c", "import time; time.sleep(10)"])
try:
# _kill_tracked 에 외부 PID 의 (잘못된) signature 넘기면 무시돼야 함.
# signature 일치 안 하면 _is_same_process False → kill 안 됨.
wrong_sig = [(outsider.pid, 0.0)] # create_time 안 맞음
cleaned = _kill_tracked(wrong_sig)
assert cleaned == 0
assert _alive(outsider.pid), "outsider killed despite wrong create_time"
finally:
outsider.kill()
outsider.wait(timeout=5)
# ─────────────────────────────────────────────────────────────────
# C5: orchestrator 자살 방지
# ─────────────────────────────────────────────────────────────────
class TestC5_SelfKillProtection:
def test_kill_process_tree_self_pid_noop(self):
"""orchestrator(=pytest) PID 로 _kill_process_tree 호출해도 죽으면 안 됨."""
result = _kill_process_tree(os.getpid())
assert result == 0 # ORCH_PID 검사로 즉시 0 반환
def test_kill_tracked_with_orch_pid_noop(self):
# 일부러 self signature 를 tracked 에 넣어도 _is_same_process False → skip.
self_p = psutil.Process(os.getpid())
self_sig = _proc_signature(self_p)
cleaned = _kill_tracked([self_sig])
assert cleaned == 0 # 자기 자신 보호
# ─────────────────────────────────────────────────────────────────
# C6 (CORE): parent 정상 종료 후 grandchild orphan 정리
# — PID 2780 regression test
# ─────────────────────────────────────────────────────────────────
class TestC6_OrphanGrandchildAfterNormalExit:
"""PID 2780 path: parent 가 정상 exit 했는데 grandchild 만 살아남는 케이스.
monitor thread 가 parent 살아있을 때 grandchild 를 미리 추적해서 finally 에서 정리해야 함."""
def test_grandchild_killed_after_parent_normal_exit(self):
marker = ROOT / ".orchestrator" / "tmp" / "test_c6_gc_pid.txt"
marker.parent.mkdir(parents=True, exist_ok=True)
if marker.exists(): marker.unlink()
# parent 가:
# 1. grandchild 띄움 (DEVNULL 격리 — production claude.exe→python.exe - 과 동등).
# 2. PID 마커에 기록.
# 3. monitor 가 1초 polling 으로 catch 할 시간 확보 (2.5초 sleep).
# 4. 정상 종료.
spawn_code = (
f"import subprocess, time, sys, os; "
f"gc = subprocess.Popen("
f" [sys.executable, '-c', 'import time; time.sleep(60)'], "
f" stdin=subprocess.DEVNULL, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL); "
f"open(r'{marker}', 'w').write(str(gc.pid)); "
f"time.sleep(2.5); "
f"sys.exit(0)"
)
# 정상 종료 (timeout 안 걸림) — wrapper 의 finally cleanup 만으로 grandchild 잡혀야 함.
r = _run_with_tree_kill(
[_py(), "-c", spawn_code],
timeout=15,
)
assert r.returncode == 0, "parent did not exit normally"
# marker 에서 grandchild PID.
assert marker.exists(), "grandchild marker missing"
gc_pid = int(marker.read_text().strip())
# 정리 비동기 가능성 → 짧게 대기 후 확인.
deadline = time.time() + 5
while time.time() < deadline and _alive(gc_pid):
time.sleep(0.2)
assert not _alive(gc_pid), (
f"REGRESSION: grandchild PID {gc_pid} survived parent normal exit "
f"(PID 2780 path not fixed)"
)
# ─────────────────────────────────────────────────────────────────
# Bonus: _SPAWNED discipline — 다중 호출 후 누적 안 됨
# ─────────────────────────────────────────────────────────────────
class TestSpawnedDiscipline:
def test_spawned_drained_between_calls(self):
for _ in range(3):
_run_with_tree_kill([_py(), "-c", "pass"], timeout=10)
# 3 회 호출 후에도 우리 잔재 없음 (wrapper finally 가 discard).
assert len(_SPAWNED) == 0