Compare commits

...

2 Commits

Author SHA1 Message Date
2827622858 feat(IMP-16): Step 14 table_self_overflow detection
Add table self-overflow detection with element-identity wrapper dedup,
mirroring the image_aspect_mismatch axis pattern (#45).

JS layer: TABLE_SCROLL_TOL_PX=5 module constant; clippedWrapperMap
built as Map<Element,int> keyed by DOM node reference (NOT className)
so two wrappers with identical class strings remain distinguishable;
table_events collected via querySelectorAll('table').forEach with
closest()-ancestor walk resolving wrapper_clipped_index = int|null.

Py layer: aggregate result['table_events'] and append fail_reason
'table_self_overflow' only when (excess_x>TOL OR excess_y>TOL)
AND wrapper_clipped_index is None; wrapper-clipped path continues
to fail via existing clipped_inner reporting.

Tests (Selenium, chromedriver guard mirrored from image_check):
- Fixture D: standalone <table> overflow → table_self_overflow fail
- Fixture E: <table> in clipped wrapper → dedup suppresses table fail
- Fixture F (F1 acceptance): two wrappers with identical className
  f13b-cell, W1 clipped by non-table child, W2 hosts self-overflow
  <table> with W2 itself NOT clipped → element-identity ensures W2's
  table is not suppressed by W1's class; both fails emitted.

Out of scope: image_events behavior (intact from #45), classifier
pass/fail consumer (→실행-3), debug.json surfacing (→실행-4).

Refs: #46

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-18 21:06:01 +09:00
f3bff898fb feat(orchestrator): initial orchestrator + subprocess cleanup hardening
Pre-existing P0+P1 fixes (verified via #45 pilot 2026-05-18):
- P0-1: detect_agent first-line only (fixes #45 infinite loop)
- P0-2: stage_start_count sanity reset on external comment delete
- P0-3: 32 pytest cases for parse/detect regressions
- P1-4: execution-issue mode prompt (compact scope-tight)
- P1-5: Stage 2 COMPACT_PLAN_RULE (size budget, no code snippets)
- P1-6: tests:[] orchestrator-level enforcement at Stage 2 YES guard
- P1-7: dual-write CRLF/trailing-whitespace normalize

P3 subprocess cleanup (PID 2780 orphan grandchild regression):
- (pid, create_time) signature tracking — Windows PID reuse safe
- _kill_process_tree: parent-alive traversal path
- _kill_tracked: parent-dead orphan path
- _run_with_tree_kill: 1s monitor thread captures descendants live
- atexit + SIGINT safety net via _SPAWNED set
- 4 subprocess.run sites switched to wrapper (compaction/exit_report/
  run_claude/run_codex)
- 12 cleanup pytest cases incl. C6 PID 2780 regression test

Selenium boundary unchanged — driver.quit() in phase_z2_pipeline.py
and slide_measurer.py already protected by try/finally.

Total: 44/44 pytest pass (32 core + 12 cleanup).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-18 18:56:06 +09:00
6 changed files with 2412 additions and 1 deletions

1480
orchestrator.py Normal file

File diff suppressed because it is too large Load Diff

View File

@@ -133,6 +133,11 @@ DEFAULT_ZONE_MIN_HEIGHT_PX = 100
# Spec doc row (PHASE-Z-FIT-CLASSIFIER-ROUTER-SPEC) update deferred to IMP-15 실행-4.
IMAGE_ASPECT_DELTA_TOL = 0.05
# Step 14 table_self_overflow tolerance — scrollWclientW or scrollHclientH > TOL ⇒ fail.
# Local anchor : IMP-15 실행-2 (Gitea issue #46) — table axis acceptance criteria.
# Mirrors existing inline 5px tolerance used by slide/zone/clipped scans in run_overflow_check.
TABLE_SCROLL_TOL_PX = 5
# content_weight 계산 가중치
CONTENT_WEIGHT_COEFFS = {
"text_per_chars": 800, # text_len / 800 = score
@@ -2131,6 +2136,14 @@ def run_overflow_check(html_path: Path) -> dict:
const zones = [];
const zone_geometries_px = [];
// IMP-15 실행-2 (issue #46) — element-identity dedup map for table_events.
// Map<Element, integer> keyed by DOM node reference (NOT class string) so that
// two wrappers sharing identical className resolve to distinct map entries.
// Populated alongside the existing per-zone clipped_inner scan below.
const clippedWrapperMap = new Map();
let clippedIdxCounter = 0;
slide.querySelectorAll('.zone').forEach((z) => {
const pos = z.getAttribute('data-zone-position') || 'unknown';
const tid = z.getAttribute('data-template-id') || '?';
@@ -2178,6 +2191,13 @@ def run_overflow_check(html_path: Path) -> dict:
scrollWidth: el.scrollWidth,
scrollHeight: el.scrollHeight,
});
// IMP-15 실행-2 (issue #46) — element-identity registration.
// Key by DOM node `el`, NOT className: two wrappers with identical
// class string still hash to distinct Map entries.
if (!clippedWrapperMap.has(el)) {
clippedWrapperMap.set(el, clippedIdxCounter);
clippedIdxCounter++;
}
}
});
m.clipped_inner = clipped;
@@ -2259,7 +2279,52 @@ def run_overflow_check(html_path: Path) -> dict:
});
});
return { slide: slideM, slide_body: bodyM, zones, frame_slot_metrics, zone_geometries_px, image_events };
// IMP-15 실행-2 (issue #46) — table_events[] for table_self_overflow detection.
// One entry per <table> under .slide. wrapper_clipped_index is the integer index
// (from clippedWrapperMap) of the nearest ancestor that is itself in the clipped
// wrapper set, or null. Element-identity walk (NOT className) so that two same-class
// wrappers (W1 clipped, W2 not) resolve independently for any contained <table>.
const table_events = [];
slide.querySelectorAll('table').forEach((tbl) => {
const parentZone = tbl.closest('.zone');
const zonePos = parentZone
? (parentZone.getAttribute('data-zone-position') || 'unknown')
: 'unknown';
const zoneTid = parentZone
? (parentZone.getAttribute('data-template-id') || '?')
: '?';
let wrapper_clipped_index = null;
let node = tbl.parentElement;
while (node && node !== slide) {
if (clippedWrapperMap.has(node)) {
wrapper_clipped_index = clippedWrapperMap.get(node);
break;
}
node = node.parentElement;
}
const tblRect = tbl.getBoundingClientRect();
const dx = tbl.scrollWidth - tbl.clientWidth;
const dy = tbl.scrollHeight - tbl.clientHeight;
table_events.push({
zone_position: zonePos,
zone_template_id: zoneTid,
clientWidth: tbl.clientWidth,
clientHeight: tbl.clientHeight,
scrollWidth: tbl.scrollWidth,
scrollHeight: tbl.scrollHeight,
excess_x: Math.max(0, dx),
excess_y: Math.max(0, dy),
wrapper_clipped_index: wrapper_clipped_index,
bbox: {
x: Math.round(tblRect.left - slideRect.left),
y: Math.round(tblRect.top - slideRect.top),
w: Math.round(tblRect.width),
h: Math.round(tblRect.height),
},
});
});
return { slide: slideM, slide_body: bodyM, zones, frame_slot_metrics, zone_geometries_px, image_events, table_events };
""")
screenshot_path = html_path.parent / "preview.png"
@@ -2318,6 +2383,27 @@ def run_overflow_check(html_path: Path) -> dict:
f"(template={tid}, tol={IMAGE_ASPECT_DELTA_TOL}, src={src})"
)
# IMP-15 실행-2 (issue #46) — table_self_overflow aggregation.
# Emit fail_reason only when (excess_x>TOL OR excess_y>TOL) AND wrapper_clipped_index is None.
# The clipped-wrapper case is already accounted for by the clipped_inner fail_reason above;
# element-identity dedup (clippedWrapperMap keyed by DOM node ref, NOT className) prevents
# double-counting and—critically—prevents two same-class wrappers from masking each other.
for ev in result.get("table_events", []):
if ev.get("wrapper_clipped_index") is not None:
continue
excess_x = ev.get("excess_x", 0) or 0
excess_y = ev.get("excess_y", 0) or 0
if excess_x > TABLE_SCROLL_TOL_PX or excess_y > TABLE_SCROLL_TOL_PX:
pos = ev.get("zone_position", "unknown")
tid = ev.get("zone_template_id", "?")
fail_reasons.append(
f"table self-overflow in zone--{pos}: "
f"excess {excess_y}px vert / {excess_x}px horiz "
f"(content {ev.get('scrollWidth')}x{ev.get('scrollHeight')} vs "
f"container {ev.get('clientWidth')}x{ev.get('clientHeight')}, "
f"template={tid}, tol={TABLE_SCROLL_TOL_PX})"
)
result["passed"] = len(fail_reasons) == 0
result["fail_reasons"] = fail_reasons
return result

View File

View File

@@ -0,0 +1,247 @@
"""P0-3 (2026-05-18) — orchestrator self-test minimum set.
Covers detect_agent (the bug that caused #45 infinite loop), parse_consensus,
parse_remaining_units, IMPLEMENTATION_UNITS parsing, dual-write normalize.
Run: pytest -q tests/orchestrator_unit/
"""
import sys
from pathlib import Path
# Add design_agent root to sys.path so we can import orchestrator.py
ROOT = Path(__file__).parent.parent.parent
sys.path.insert(0, str(ROOT))
from orchestrator import (
detect_agent,
parse_consensus,
parse_remaining_units,
_is_execution_issue,
)
import re
class TestExecutionIssueDetection:
"""P1-4 — execution sub-issue title detection."""
def test_execution_korean_pattern(self):
assert _is_execution_issue("[IMP-15 실행-1] image_aspect_mismatch") is True
assert _is_execution_issue("[IMP-15 실행-2] table overflow") is True
assert _is_execution_issue("[IMP-15 실행 3] something") is True
def test_execution_english_pattern(self):
assert _is_execution_issue("[IMP-15 exec-1] image") is True
assert _is_execution_issue("[IMP-15 EXEC 2] table") is True
def test_non_execution_title(self):
assert _is_execution_issue("IMP-15 Step 14 visual_check 보강") is False
assert _is_execution_issue("IMP-09 B-4 다른 layout zone-geometry") is False
def test_empty_title(self):
assert _is_execution_issue("") is False
assert _is_execution_issue(None) is False
# ─────────────────────────────────────────────────────────────────
# detect_agent — the bug that caused #45 infinite loop
# ─────────────────────────────────────────────────────────────────
class TestDetectAgent:
def test_claude_header(self):
assert detect_agent("[Claude #1] Stage 1 ...") == "claude"
def test_codex_header(self):
assert detect_agent("[Codex #1] Stage 1 review") == "codex"
def test_codex_body_with_claude_citation(self):
"""The exact bug from #45 — Codex body contains [Claude #N] citation in
EVIDENCE section. Old detect_agent returned 'claude' (wrong)."""
body = """[Codex #2] Stage 2 Round #1 simulation-plan verification
Verdict: NO.
=== EVIDENCE ===
- Read current-stage Gitea comment `[Claude #2] Stage 2 Round #1 - Plan` only
"""
assert detect_agent(body) == "codex", \
"Codex body containing [Claude #N] citation must still detect as codex"
def test_claude_body_with_codex_citation(self):
body = """[Claude #3] Stage 2 Round #2 - Plan
Addressing [Codex #2] findings ...
"""
assert detect_agent(body) == "claude"
def test_empty_body(self):
assert detect_agent("") is None
assert detect_agent(None) is None
assert detect_agent(" \n ") is None
def test_no_agent_header(self):
assert detect_agent("This is some random text without any agent marker") is None
def test_leading_whitespace_before_header(self):
body = " \n[Codex #1] header after whitespace"
assert detect_agent(body) == "codex"
def test_header_must_be_at_start(self):
"""Body that doesn't start with [Agent header should return None."""
body = "Some intro text.\n[Codex #1] header on second line"
# P0-1 fix: only first non-empty line is checked.
# First line = "Some intro text." → no match → None
assert detect_agent(body) is None
def test_header_with_hash_immediately(self):
"""[Codex#1] (no space) should still match per regex \\[Codex[\\s#]."""
assert detect_agent("[Codex#1] hello") == "codex"
assert detect_agent("[Claude#5] hi") == "claude"
# ─────────────────────────────────────────────────────────────────
# parse_consensus — YES/NO + rewind_target
# ─────────────────────────────────────────────────────────────────
class TestParseConsensus:
def test_yes_only(self):
body = "Some text.\nFINAL_CONSENSUS: YES"
assert parse_consensus(body) == ("YES", None)
def test_no_with_rewind_target(self):
body = "Some text.\nrewind_target: stage_2_plan\nFINAL_CONSENSUS: NO"
assert parse_consensus(body) == ("NO", "stage_2_plan")
def test_no_with_continue_same(self):
body = "blah\nrewind_target: continue_same\nFINAL_CONSENSUS: NO"
assert parse_consensus(body) == ("NO", "continue_same")
def test_no_target_only_in_last_10_lines(self):
"""parse_consensus only scans last 10 lines."""
body = "rewind_target: stage_1_review\n" + "\n".join(["filler"] * 20) + "\nFINAL_CONSENSUS: NO"
status, target = parse_consensus(body)
assert status == "NO"
assert target is None # too far from end to be picked up
def test_no_consensus_marker(self):
assert parse_consensus("just text, no marker") == (None, None)
def test_empty_body(self):
assert parse_consensus("") == (None, None)
assert parse_consensus(None) == (None, None)
def test_unknown_rewind_target_ignored(self):
body = "rewind_target: bogus_target\nFINAL_CONSENSUS: NO"
status, target = parse_consensus(body)
assert status == "NO"
assert target is None # bogus is not in REWIND_TARGET_TO_SID
# ─────────────────────────────────────────────────────────────────
# parse_remaining_units — Stage 3 continue_same progress detection
# ─────────────────────────────────────────────────────────────────
class TestParseRemainingUnits:
def test_bracketed_list(self):
body = "Remaining units: [u2, u3, u4]"
assert parse_remaining_units(body) == {"u2", "u3", "u4"}
def test_comma_list_no_brackets(self):
body = "Remaining units: u5, u6, u7"
assert parse_remaining_units(body) == {"u5", "u6", "u7"}
def test_none_explicit(self):
assert parse_remaining_units("Remaining units: none") == set()
assert parse_remaining_units("Remaining units: []") == set()
assert parse_remaining_units("Remaining units: (none)") == set()
assert parse_remaining_units("Remaining units: -") == set()
def test_line_not_present(self):
assert parse_remaining_units("no remaining units mentioned here") is None
def test_case_insensitive(self):
body = "REMAINING UNITS: [U1, U2]"
assert parse_remaining_units(body) == {"u1", "u2"}
def test_only_u_prefixed_digits(self):
"""Sentence noise ignored — only u\\d+ pattern matched."""
body = "Remaining units: I still need to do u3 and u7 work"
assert parse_remaining_units(body) == {"u3", "u7"}
def test_empty_body(self):
assert parse_remaining_units("") is None
assert parse_remaining_units(None) is None
# ─────────────────────────────────────────────────────────────────
# IMPLEMENTATION_UNITS block parsing (used in Stage 2 YES guard)
# ─────────────────────────────────────────────────────────────────
class TestImplementationUnitsBlock:
"""Reproduces the parser in run_stage Stage 2 YES guard (line ~810)."""
def _parse(self, body):
iu_block_pat = re.compile(
r"===\s*IMPLEMENTATION_UNITS\s*===\s*\n(.*?)(?=\n===\s|\Z)",
re.IGNORECASE | re.DOTALL,
)
iu_unit_pat = re.compile(r"^\s*-\s*id:\s*u\d+", re.IGNORECASE | re.MULTILINE)
m = iu_block_pat.search(body or "")
return bool(m and iu_unit_pat.search(m.group(1)))
def test_valid_block(self):
body = """text
=== IMPLEMENTATION_UNITS ===
- id: u1
summary: ...
- id: u2
summary: ...
"""
assert self._parse(body) is True
def test_empty_block(self):
body = "=== IMPLEMENTATION_UNITS ===\n(no entries)\n"
assert self._parse(body) is False # header but no - id: uN entry
def test_block_missing(self):
body = "just text, no implementation_units"
assert self._parse(body) is False
def test_block_with_only_non_u_entries(self):
body = """=== IMPLEMENTATION_UNITS ===
- id: alpha
summary: ...
"""
assert self._parse(body) is False # 'alpha' is not 'u\\d+'
# ─────────────────────────────────────────────────────────────────
# Direct integration check — the #45 bug case
# ─────────────────────────────────────────────────────────────────
class TestRegressionForIssue45Bug:
"""Verify the exact body shape that caused #45 infinite loop is now handled."""
def test_codex_no_with_claude_citation_full_flow(self):
body = """[Codex #3] Stage 2 Round #2 simulation-plan verification for issue #45
Verdict: NO. The plan covers main axes but violates two Stage 2 requirements.
Findings:
- Unit u1 declares tests: [] in === IMPLEMENTATION_UNITS ===
- xfail-strict mechanism unclear
=== EVIDENCE ===
Commands run:
- git rev-parse HEAD
- Read current-stage Gitea comment `[Claude #3] Stage 2 Round #2 - Plan`
rewind_target: stage_2_plan
FINAL_CONSENSUS: NO
"""
# P0-1 fix: detect_agent reads only first line → "[Codex #3]" → codex
assert detect_agent(body) == "codex", "P0-1 regression test"
# parse_consensus: NO + rewind_target stage_2_plan
status, target = parse_consensus(body)
assert status == "NO"
assert target == "stage_2_plan"

View File

@@ -0,0 +1,264 @@
"""P3-5 (2026-05-18) — subprocess cleanup hardening verification.
Covers:
C1: 정상 종료 → tree 잔류 0
C2: timeout → TimeoutExpired raise + 자손 0
C3: grandchild spawn 후 parent timeout → grandchild 정리
C4: 외부 (orchestrator 가 spawn 안한) 프로세스 보호
C5: _kill_process_tree(self.pid) 호출해도 orchestrator 자살 안 함
C6 (CORE): parent 정상 종료 후 grandchild orphan 정리 — PID 2780 regression
Run: pytest -q tests/orchestrator_unit/test_subprocess_cleanup.py
"""
import os
import sys
import time
import subprocess
from pathlib import Path
import psutil
import pytest
ROOT = Path(__file__).parent.parent.parent
sys.path.insert(0, str(ROOT))
from orchestrator import (
_kill_process_tree,
_kill_tracked,
_run_with_tree_kill,
_proc_signature,
_is_same_process,
_SPAWNED,
)
# ─────────────────────────────────────────────────────────────────
# Helpers
# ─────────────────────────────────────────────────────────────────
def _py():
"""Path to current Python interpreter — used to spawn dummy subprocesses."""
return sys.executable
def _alive(pid):
try:
return psutil.Process(pid).is_running() and psutil.Process(pid).status() != psutil.STATUS_ZOMBIE
except psutil.NoSuchProcess:
return False
# ─────────────────────────────────────────────────────────────────
# Signature helpers
# ─────────────────────────────────────────────────────────────────
class TestSignatureHelpers:
def test_proc_signature_alive(self):
p = psutil.Process(os.getpid())
sig = _proc_signature(p)
assert sig is not None
assert sig[0] == os.getpid()
assert isinstance(sig[1], float)
def test_is_same_process_orch_self_blocked(self):
"""C5 prep — orchestrator 자기 자신은 절대 same-process true 안 됨."""
p = psutil.Process(os.getpid())
sig = _proc_signature(p)
# _is_same_process 가 _ORCH_PID 체크로 False 반환해야 함.
assert _is_same_process(sig[0], sig[1]) is False
def test_is_same_process_dead_pid(self):
# 사용 가능성 낮은 PID 999999 — 거의 확실히 죽음.
assert _is_same_process(999999, time.time()) is False
def test_is_same_process_wrong_create_time(self):
"""PID 재사용 회피 검증 — 같은 PID 라도 create_time 안 맞으면 False."""
# 살아있는 외부 프로세스 빌려서 일부러 어긋난 create_time 으로 호출.
# System Idle 같은 특수 프로세스 (create_time=0) 회피 — 우리가 띄운 dummy 사용.
dummy = subprocess.Popen(
[_py(), "-c", "import time; time.sleep(5)"],
stdin=subprocess.DEVNULL, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL,
)
try:
# 실제 create_time 보다 1 년 전 시각 → 명백한 mismatch.
far_past = time.time() - 365 * 24 * 3600
assert _is_same_process(dummy.pid, far_past) is False
# 맞는 create_time 으로는 True 여야 함 (sanity).
real_ct = psutil.Process(dummy.pid).create_time()
assert _is_same_process(dummy.pid, real_ct) is True
finally:
dummy.kill()
dummy.wait(timeout=5)
# ─────────────────────────────────────────────────────────────────
# C1: 정상 종료 — tree 잔류 0
# ─────────────────────────────────────────────────────────────────
class TestC1_NormalExit:
def test_dummy_short_run_no_residue(self):
r = _run_with_tree_kill(
[_py(), "-c", "import time; time.sleep(0.3)"],
timeout=10,
)
assert r.returncode == 0
# 호출 후 _SPAWNED 에 우리 호출 잔재가 남으면 안 됨 (wrapper 가 discard).
# 다른 테스트 영향 가능성 있어서 set 전체가 0 이 아니어도 됨, 단 우리 잔재 없으면 OK.
# 보수적으로 — 우리 호출 직전에 _SPAWNED 가 비어있었으면 직후에도 비어있어야 함.
assert len(_SPAWNED) == 0
# ─────────────────────────────────────────────────────────────────
# C2: Timeout — TimeoutExpired raise + 자손 정리
# ─────────────────────────────────────────────────────────────────
class TestC2_Timeout:
def test_dummy_long_sleep_times_out(self):
with pytest.raises(subprocess.TimeoutExpired):
_run_with_tree_kill(
[_py(), "-c", "import time; time.sleep(60)"],
timeout=1.5,
)
# raise 후에도 _SPAWNED 우리 잔재 없어야 함 (wrapper finally 가 discard).
assert len(_SPAWNED) == 0
# ─────────────────────────────────────────────────────────────────
# C3: grandchild orphan 정리 — parent timeout path
# ─────────────────────────────────────────────────────────────────
class TestC3_GrandchildTimeoutPath:
def test_grandchild_killed_on_parent_timeout(self):
# parent 가 grandchild 띄우고 자기는 sleep — timeout 으로 강제 종료.
# grandchild 도 정리돼야 함.
# PID 캡처를 위해 grandchild 가 자기 PID 를 파일에 기록.
marker = ROOT / ".orchestrator" / "tmp" / "test_c3_gc_pid.txt"
marker.parent.mkdir(parents=True, exist_ok=True)
if marker.exists(): marker.unlink()
# grandchild 의 stdin/stdout/stderr 를 DEVNULL 로 분리 — production 의 claude.exe→python.exe -
# 케이스와 동일 (grandchild 가 wrapper 의 pipe 핸들 안 상속). 안 그러면 pipe inheritance 로
# communicate() 가 hang.
spawn_code = (
f"import subprocess, time, sys, os; "
f"gc = subprocess.Popen("
f" [sys.executable, '-c', 'import time; time.sleep(60)'], "
f" stdin=subprocess.DEVNULL, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL); "
f"open(r'{marker}', 'w').write(str(gc.pid)); "
f"time.sleep(60)"
)
with pytest.raises(subprocess.TimeoutExpired):
_run_with_tree_kill(
[_py(), "-c", spawn_code],
timeout=3,
)
# marker 파일에서 grandchild PID 읽기.
assert marker.exists(), "grandchild marker not written — parent died too early"
gc_pid = int(marker.read_text().strip())
# 잠시 대기 (cleanup 비동기 가능성) 후 grandchild 죽었는지 확인.
deadline = time.time() + 5
while time.time() < deadline and _alive(gc_pid):
time.sleep(0.2)
assert not _alive(gc_pid), f"grandchild PID {gc_pid} still alive after parent timeout"
# ─────────────────────────────────────────────────────────────────
# C4: 외부 프로세스 보호
# ─────────────────────────────────────────────────────────────────
class TestC4_ExternalProcessProtection:
def test_outsider_not_killed(self):
# 사용자가 직접 띄운 척하는 외부 프로세스 (orchestrator 가 spawn 안 함).
outsider = subprocess.Popen([_py(), "-c", "import time; time.sleep(10)"])
try:
# _kill_tracked 에 외부 PID 의 (잘못된) signature 넘기면 무시돼야 함.
# signature 일치 안 하면 _is_same_process False → kill 안 됨.
wrong_sig = [(outsider.pid, 0.0)] # create_time 안 맞음
cleaned = _kill_tracked(wrong_sig)
assert cleaned == 0
assert _alive(outsider.pid), "outsider killed despite wrong create_time"
finally:
outsider.kill()
outsider.wait(timeout=5)
# ─────────────────────────────────────────────────────────────────
# C5: orchestrator 자살 방지
# ─────────────────────────────────────────────────────────────────
class TestC5_SelfKillProtection:
def test_kill_process_tree_self_pid_noop(self):
"""orchestrator(=pytest) PID 로 _kill_process_tree 호출해도 죽으면 안 됨."""
result = _kill_process_tree(os.getpid())
assert result == 0 # ORCH_PID 검사로 즉시 0 반환
def test_kill_tracked_with_orch_pid_noop(self):
# 일부러 self signature 를 tracked 에 넣어도 _is_same_process False → skip.
self_p = psutil.Process(os.getpid())
self_sig = _proc_signature(self_p)
cleaned = _kill_tracked([self_sig])
assert cleaned == 0 # 자기 자신 보호
# ─────────────────────────────────────────────────────────────────
# C6 (CORE): parent 정상 종료 후 grandchild orphan 정리
# — PID 2780 regression test
# ─────────────────────────────────────────────────────────────────
class TestC6_OrphanGrandchildAfterNormalExit:
"""PID 2780 path: parent 가 정상 exit 했는데 grandchild 만 살아남는 케이스.
monitor thread 가 parent 살아있을 때 grandchild 를 미리 추적해서 finally 에서 정리해야 함."""
def test_grandchild_killed_after_parent_normal_exit(self):
marker = ROOT / ".orchestrator" / "tmp" / "test_c6_gc_pid.txt"
marker.parent.mkdir(parents=True, exist_ok=True)
if marker.exists(): marker.unlink()
# parent 가:
# 1. grandchild 띄움 (DEVNULL 격리 — production claude.exe→python.exe - 과 동등).
# 2. PID 마커에 기록.
# 3. monitor 가 1초 polling 으로 catch 할 시간 확보 (2.5초 sleep).
# 4. 정상 종료.
spawn_code = (
f"import subprocess, time, sys, os; "
f"gc = subprocess.Popen("
f" [sys.executable, '-c', 'import time; time.sleep(60)'], "
f" stdin=subprocess.DEVNULL, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL); "
f"open(r'{marker}', 'w').write(str(gc.pid)); "
f"time.sleep(2.5); "
f"sys.exit(0)"
)
# 정상 종료 (timeout 안 걸림) — wrapper 의 finally cleanup 만으로 grandchild 잡혀야 함.
r = _run_with_tree_kill(
[_py(), "-c", spawn_code],
timeout=15,
)
assert r.returncode == 0, "parent did not exit normally"
# marker 에서 grandchild PID.
assert marker.exists(), "grandchild marker missing"
gc_pid = int(marker.read_text().strip())
# 정리 비동기 가능성 → 짧게 대기 후 확인.
deadline = time.time() + 5
while time.time() < deadline and _alive(gc_pid):
time.sleep(0.2)
assert not _alive(gc_pid), (
f"REGRESSION: grandchild PID {gc_pid} survived parent normal exit "
f"(PID 2780 path not fixed)"
)
# ─────────────────────────────────────────────────────────────────
# Bonus: _SPAWNED discipline — 다중 호출 후 누적 안 됨
# ─────────────────────────────────────────────────────────────────
class TestSpawnedDiscipline:
def test_spawned_drained_between_calls(self):
for _ in range(3):
_run_with_tree_kill([_py(), "-c", "pass"], timeout=10)
# 3 회 호출 후에도 우리 잔재 없음 (wrapper finally 가 discard).
assert len(_SPAWNED) == 0

View File

@@ -0,0 +1,334 @@
"""IMP-15 실행-2 (Gitea issue #46) — Step 14 table_self_overflow detection.
Tests Selenium-driven ``<table>`` self-overflow measurement and element-identity
wrapper dedup added to ``run_overflow_check``:
* Fixture D — standalone ``<table>`` self-overflow, no clipped wrapper ancestor →
``table_events`` entry reports ``wrapper_clipped_index = None`` and an
``excess_*`` exceeding ``TABLE_SCROLL_TOL_PX``; Python aggregation then emits
a ``table self-overflow`` fail_reason and flips ``result["passed"] = False``.
* Fixture E — ``<table>`` inside a clipped ``f13b`` wrapper. The wrapper itself
self-overflows (registers in ``clippedWrapperMap``) and the inner table also
self-overflows. Asserts dedup is honored: the table's ``wrapper_clipped_index``
resolves to the wrapper's map index (non-null) so the Python aggregation MUST
NOT emit a ``table self-overflow`` fail_reason — only the wrapper's pre-existing
``inner clipped`` fail line remains.
* Fixture F — two wrappers W1 / W2 share identical className ``f13b-cell``. W1
contains an overflowing inline-block child (no ``<table>``) → W1 self-overflows
and registers in ``clippedWrapperMap`` (emits ``inner clipped``). W2 contains
only a self-overflowing ``<table>``; W2's own scrollWidth equals its clientWidth
(the table's ``overflow:hidden`` keeps W2 itself uncliped). The element-identity
ancestor walk MUST resolve the W2 table's ``wrapper_clipped_index`` to ``None``
(W2 ≠ W1 by DOM reference, despite identical class string). A class-string
lookup would have falsely resolved the W2 table → W1 and suppressed the fail —
the test thereby proves ``Map<Element, int>`` distinguishes by node identity.
Chromedriver resolution mirrors the pipeline order
(``PROJECT_ROOT/chromedriver{,.exe}`` → PATH → Selenium Manager). When no driver
is resolvable the suite skips by default; under ``PHASE_Z_REQUIRE_SELENIUM=1``
the tests are marked ``xfail(strict=True)`` so CI cannot silently lose coverage.
"""
from __future__ import annotations
import os
import shutil
from pathlib import Path
import pytest
from src.phase_z2_pipeline import (
PROJECT_ROOT,
TABLE_SCROLL_TOL_PX,
run_overflow_check,
)
# ─── chromedriver skip / xfail guard ─────────────────────────────────
def _selenium_manager_resolvable() -> bool:
"""Probe ``webdriver.Chrome(options=...)`` — pipeline's third tier.
``src/phase_z2_pipeline.py`` (run_overflow_check) tries
``PROJECT_ROOT/chromedriver{,.exe}`` first, then falls back to
``webdriver.Chrome(options=options)`` which delegates to Selenium Manager
for driver auto-resolution. The test resolver must mirror that fallback
or ``PHASE_Z_REQUIRE_SELENIUM=1`` produces spurious strict-XPASS failures
on machines where Selenium Manager can satisfy the pipeline at runtime.
"""
try:
from selenium import webdriver
from selenium.webdriver.chrome.options import Options as _Opts
except Exception:
return False
opts = _Opts()
opts.add_argument("--headless=new")
opts.add_argument("--no-sandbox")
opts.add_argument("--disable-dev-shm-usage")
try:
drv = webdriver.Chrome(options=opts)
except Exception:
return False
try:
drv.quit()
except Exception:
pass
return True
def _chromedriver_resolvable() -> bool:
"""Mirror pipeline order: PROJECT_ROOT/chromedriver{,.exe} → PATH → Selenium Manager."""
for candidate in (PROJECT_ROOT / "chromedriver", PROJECT_ROOT / "chromedriver.exe"):
if candidate.is_file():
return True
if shutil.which("chromedriver") or shutil.which("chromedriver.exe"):
return True
return _selenium_manager_resolvable()
_REQUIRE_SELENIUM = os.environ.get("PHASE_Z_REQUIRE_SELENIUM") == "1"
_DRIVER_AVAILABLE = _chromedriver_resolvable()
if not _DRIVER_AVAILABLE:
if _REQUIRE_SELENIUM:
pytestmark = pytest.mark.xfail(
strict=True,
reason="PHASE_Z_REQUIRE_SELENIUM=1 but chromedriver is unresolvable",
)
else:
pytestmark = pytest.mark.skip(
reason=(
"chromedriver unresolvable (PROJECT_ROOT/chromedriver{,.exe} + PATH + Selenium Manager); "
"set PHASE_Z_REQUIRE_SELENIUM=1 to make this a hard failure"
),
)
# ─── HTML fixture helpers ────────────────────────────────────────────
_SLIDE_CSS = """
html, body { margin: 0; padding: 0; }
.slide { width: 1280px; height: 720px; position: relative; box-sizing: border-box; }
.zone { display: block; }
"""
def _write_slide_html(tmp_path: Path, body_inner: str, name: str = "slide.html") -> Path:
html = (
"<!doctype html><html><head><meta charset='utf-8'>"
f"<style>{_SLIDE_CSS}</style></head><body>"
'<div class="slide" data-page="1">'
f"{body_inner}"
"</div></body></html>"
)
path = tmp_path / name
path.write_text(html, encoding="utf-8")
return path
# ─── tests ───────────────────────────────────────────────────────────
def test_fixture_d_standalone_table_overflow(tmp_path: Path) -> None:
"""Fixture D — standalone ``<table>`` self-overflow, no clipped wrapper.
The table is forced into block layout with a fixed clientWidth (100px) and
``overflow: hidden``; the inner cell is 600px wide with ``white-space:nowrap``,
so the table's scrollWidth exceeds clientWidth by well over ``TABLE_SCROLL_TOL_PX``.
No ancestor carries an ``f13b/f29b/f16b`` class, so the element-identity walk
must report ``wrapper_clipped_index = None``. Python aggregation then emits a
``table self-overflow`` fail_reason and flips ``result["passed"]`` to ``False``.
"""
body = (
'<div class="zone" data-zone-position="primary" data-template-id="t_table">'
'<table style="display:block; width:100px; height:30px; overflow:hidden; '
'box-sizing:border-box; table-layout:fixed;">'
'<tr><td style="width:600px; white-space:nowrap;">'
'AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA'
'</td></tr>'
'</table>'
'</div>'
)
html_path = _write_slide_html(tmp_path, body, name="fixture_d.html")
result = run_overflow_check(html_path)
assert "error" not in result, result
assert "table_events" in result, "run_overflow_check must expose table_events"
table_events = result["table_events"]
assert len(table_events) == 1, f"expected one table_events entry, got: {table_events}"
ev = table_events[0]
assert ev["zone_position"] == "primary", ev
assert ev["zone_template_id"] == "t_table", ev
assert ev["wrapper_clipped_index"] is None, (
f"standalone table must have null wrapper_clipped_index; got {ev['wrapper_clipped_index']}"
)
assert ev["excess_x"] > TABLE_SCROLL_TOL_PX, (
f"expected excess_x > {TABLE_SCROLL_TOL_PX}; got {ev['excess_x']} "
f"(clientWidth={ev['clientWidth']}, scrollWidth={ev['scrollWidth']})"
)
# Python aggregation: emitted fail_reason + passed flipped to False.
fail_reasons = result.get("fail_reasons", [])
table_fails = [r for r in fail_reasons if "table self-overflow" in r]
assert len(table_fails) == 1, (
f"expected exactly one 'table self-overflow' fail_reason; got fail_reasons={fail_reasons}"
)
assert "zone--primary" in table_fails[0], table_fails[0]
assert f"tol={TABLE_SCROLL_TOL_PX}" in table_fails[0], table_fails[0]
assert result["passed"] is False, (
f"table self-overflow must flip passed=False; got result={result}"
)
def test_fixture_e_table_in_clipped_wrapper_dedup(tmp_path: Path) -> None:
"""Fixture E — ``<table>`` inside a clipped ``f13b`` wrapper (dedup honored).
The wrapper (clientWidth=300, ``overflow:hidden``) contains a ``display:block``
table forced to width=500px → wrapper.scrollWidth (≈500) clientWidth (300) > 5px,
so the wrapper is registered in ``clippedWrapperMap`` (emits ``inner clipped`` fail).
The inner table is itself self-overflowing (clientWidth=500, content nowrap-cell
width=900 → scrollWidth ≈ 900). The element-identity ancestor walk MUST resolve
the table's ``wrapper_clipped_index`` to the wrapper's integer map index, and the
Python aggregation MUST then SKIP emitting a ``table self-overflow`` fail_reason
(the clipped wrapper already accounts for this).
"""
body = (
'<div class="zone" data-zone-position="primary" data-template-id="t_table_wrap">'
'<div class="f13b-cell" style="width:300px; height:60px; overflow:hidden; '
'box-sizing:border-box; position:relative;">'
'<table style="display:block; width:500px; height:40px; overflow:hidden; '
'box-sizing:border-box; table-layout:fixed;">'
'<tr><td style="width:900px; white-space:nowrap;">'
'BBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBB'
'</td></tr>'
'</table>'
'</div>'
'</div>'
)
html_path = _write_slide_html(tmp_path, body, name="fixture_e.html")
result = run_overflow_check(html_path)
assert "error" not in result, result
table_events = result.get("table_events", [])
assert len(table_events) == 1, f"expected one table_events entry, got: {table_events}"
ev = table_events[0]
# Dedup signal: ancestor walk must hit the f13b wrapper via Map.has(node).
assert ev["wrapper_clipped_index"] is not None, (
f"table inside clipped wrapper must inherit wrapper index; got ev={ev}"
)
assert isinstance(ev["wrapper_clipped_index"], int), ev
# The inner table is itself overflowing — proves the dedup is the only thing
# suppressing the table_self_overflow fail (not absence of overflow).
assert ev["excess_x"] > TABLE_SCROLL_TOL_PX, (
f"inner table must be self-overflowing for this test to be meaningful; ev={ev}"
)
fail_reasons = result.get("fail_reasons", [])
table_fails = [r for r in fail_reasons if "table self-overflow" in r]
assert table_fails == [], (
f"dedup must suppress table self-overflow fail when wrapper is clipped; "
f"got table_fails={table_fails} fail_reasons={fail_reasons}"
)
# Wrapper's clipped_inner fail line must still be present.
clipped_fails = [r for r in fail_reasons if "inner clipped" in r and "f13b" in r]
assert len(clipped_fails) >= 1, (
f"wrapper clipped_inner fail must remain; got fail_reasons={fail_reasons}"
)
assert result["passed"] is False, result
def test_fixture_f_two_same_class_wrappers_element_identity(tmp_path: Path) -> None:
"""Fixture F (F1 acceptance) — two same-class wrappers, element-identity dedup.
W1 and W2 share the identical className ``f13b-cell``. W1 (clientWidth=300,
``overflow:hidden``) contains an inline-block ``<div>`` of width 600px →
W1.scrollWidth clientWidth ≈ 300 > 5; W1 is registered in
``clippedWrapperMap`` and emits an ``inner clipped`` fail line. W2
(clientWidth=600, ``overflow:hidden``) contains a 500px-wide block-display
``<table>`` (matching the Fixture E table shape so the table is itself
self-overflowing with excess_x > 5). W2's clientWidth (600) is larger than
the table's outer width (500), so W2's own scrollWidth ≈ 500 < clientWidth
and W2 is NOT registered in ``clippedWrapperMap``.
The element-identity ancestor walk in the pipeline (L2298L2304) walks from
the W2 table upward via ``parentElement`` and queries
``clippedWrapperMap.has(node)`` — keyed by DOM node, NOT className. W2 is
a different ``Element`` reference from W1 despite identical class string,
so the lookup returns false at W2 and the walk terminates at ``.slide`` with
``wrapper_clipped_index = null``. A class-substring keyed map (the F1
regression scenario described in issue #46) would have resolved any
``[class*="f13b"]`` ancestor of the W2 table → W1's index and falsely
suppressed the W2 table_self_overflow fail.
Asserts:
* Exactly ONE ``inner clipped`` fail line (for W1) — proves W1 is in the map.
* Exactly ONE ``table self-overflow`` fail line (for W2's table) — proves
the W2 table is NOT suppressed by W1's identical class string.
* W2 table's ``table_events`` entry reports ``wrapper_clipped_index = None``
(element-identity contract) and ``excess_x > TABLE_SCROLL_TOL_PX``.
"""
body = (
'<div class="zone" data-zone-position="primary" '
'data-template-id="t_table_same_class">'
# W1 — same className, overflowing non-table child.
'<div class="f13b-cell" id="w1" style="width:300px; height:60px; '
'overflow:hidden; box-sizing:border-box; position:relative; '
'margin-bottom:8px;">'
'<div style="display:inline-block; width:600px; white-space:nowrap;">'
'XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX'
'</div>'
'</div>'
# W2 — same className, NOT clipped (W2.clientWidth=600 > table.outer=500),
# but the inner table itself self-overflows (table width=500, td width=900).
'<div class="f13b-cell" id="w2" style="width:600px; height:60px; '
'overflow:hidden; box-sizing:border-box; position:relative;">'
'<table style="display:block; width:500px; height:40px; '
'overflow:hidden; box-sizing:border-box; table-layout:fixed;">'
'<tr><td style="width:900px; white-space:nowrap;">'
'YYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYY'
'</td></tr>'
'</table>'
'</div>'
'</div>'
)
html_path = _write_slide_html(tmp_path, body, name="fixture_f.html")
result = run_overflow_check(html_path)
assert "error" not in result, result
# Exactly one table_events entry (the W2 table — W1 has no <table>).
table_events = result.get("table_events", [])
assert len(table_events) == 1, f"expected one table_events entry, got: {table_events}"
ev = table_events[0]
# Element-identity contract: W2 ≠ W1, so the ancestor walk MUST NOT inherit
# W1's wrapper index merely because W2 shares W1's class string.
assert ev["wrapper_clipped_index"] is None, (
f"W2 (not itself clipped) must NOT inherit W1's index via class string; "
f"got wrapper_clipped_index={ev['wrapper_clipped_index']}. "
"This is the F1 regression — a class-substring map would have failed here."
)
assert ev["excess_x"] > TABLE_SCROLL_TOL_PX, (
f"W2's inner table must self-overflow for this test to be meaningful; ev={ev}"
)
fail_reasons = result.get("fail_reasons", [])
# W1: inner clipped fail emitted (W1 is in clippedWrapperMap, has overflowing inner div).
w1_clipped_fails = [r for r in fail_reasons if "inner clipped" in r and "f13b" in r]
assert len(w1_clipped_fails) == 1, (
f"expected exactly one W1 'inner clipped' fail; got fail_reasons={fail_reasons}"
)
# W2: table self-overflow fail emitted because element-identity dedup correctly
# reports wrapper_clipped_index=None for the W2 table (W2 ≠ W1 by DOM ref).
table_fails = [r for r in fail_reasons if "table self-overflow" in r]
assert len(table_fails) == 1, (
f"expected exactly one W2 'table self-overflow' fail (element-identity dedup); "
f"got fail_reasons={fail_reasons}"
)
assert "zone--primary" in table_fails[0], table_fails[0]
assert f"tol={TABLE_SCROLL_TOL_PX}" in table_fails[0], table_fails[0]
assert result["passed"] is False, result