Checkpoint raw-preservation pipeline and Type B refactor

This commit is contained in:
2026-04-07 08:54:10 +09:00
parent 4a71d7db88
commit 0cc2a997b6
109 changed files with 4721 additions and 2993 deletions

388
scripts/raw_bootstrap.py Normal file
View File

@@ -0,0 +1,388 @@
from __future__ import annotations
import json
import re
from pathlib import Path
from typing import Any
def _read_text(path: Path) -> str:
return path.read_text(encoding="utf-8-sig")
def _write_json(path: Path, data: dict[str, Any]) -> None:
path.write_text(json.dumps(data, ensure_ascii=False, indent=2), encoding="utf-8")
def _write_text(path: Path, text: str) -> None:
path.write_text(text, encoding="utf-8")
def _compact(text: str, max_len: int) -> str:
normalized = re.sub(r"\s+", " ", text).strip()
if len(normalized) <= max_len:
return normalized
cut = normalized[:max_len].rsplit(" ", 1)[0].strip()
return (cut or normalized[:max_len]).rstrip(" ,.;:") + "..."
def _preserve_len(text: str, ratio: float = 0.85, floor: int = 180, ceiling: int = 900) -> int:
normalized = re.sub(r"\s+", " ", text).strip()
if not normalized:
return floor
return max(floor, min(ceiling, int(len(normalized) * ratio)))
def _strip_frontmatter_and_imports(raw: str) -> str:
text = raw.replace("\r\n", "\n")
if text.startswith("---\n"):
end = text.find("\n---", 4)
if end != -1:
text = text[end + 4 :]
text = re.sub(r"^import\s+.+?$", "", text, flags=re.M)
return text.strip()
def _dx_effect_lines(repo_root: Path) -> list[str]:
path = repo_root / "components" / "dx.astro"
if not path.exists():
return []
text = _read_text(path)
text = re.sub(r"<style.*?</style>", "", text, flags=re.S)
text = text.replace("<br />", " ")
text = re.sub(r"</?(div|table|thead|tbody|tr|td|th|colgroup|col|ul|strong)[^>]*>", "\n", text)
text = re.sub(r"<li[^>]*>", "- ", text)
text = re.sub(r"</li>", "\n", text)
text = re.sub(r"<[^>]+>", " ", text)
lines: list[str] = []
for raw in text.splitlines():
line = re.sub(r"\s+", " ", raw).strip()
if not line:
continue
if line.startswith("/*") or line.startswith("["):
continue
if len(line) < 6:
continue
lines.append(line)
deduped: list[str] = []
for line in lines:
if line not in deduped:
deduped.append(line)
return deduped[:24]
def _normalize_block_for_storage(text: str, repo_root: Path) -> str:
dx_lines = _dx_effect_lines(repo_root)
if "<DxEffect" in text and dx_lines:
replacement = "\n".join(f"* {line}" for line in dx_lines)
text = re.sub(r"<DxEffect\s*/>", replacement, text)
text = re.sub(r"<summary[^>]*>(.*?)</summary>", lambda m: f"**{re.sub(r'<[^>]+>', ' ', m.group(1)).strip()}**", text, flags=re.S)
text = text.replace("<details>", "").replace("</details>", "")
text = re.sub(r"<br\s*/?>", "\n", text, flags=re.I)
text = re.sub(r"</?div[^>]*>", "", text)
text = re.sub(r":::\s*note\[(.*?)\]", r"**\1**", text)
text = text.replace(":::", "")
text = re.sub(r"!\[([^\]]+)\]\(([^\)]+)\)", r"[???] \1", text)
text = re.sub(r"\n{3,}", "\n\n", text)
return text.strip()
def _first_nonempty_lines(text: str, limit: int = 8) -> list[str]:
lines: list[str] = []
for raw in text.splitlines():
line = raw.strip()
if not line:
continue
if line.startswith("---"):
continue
lines.append(line)
if len(lines) >= limit:
break
return lines
def _extract_detail_topics(block: str, start_id: int, repo_root: Path) -> tuple[list[dict[str, Any]], str, int]:
topics: list[dict[str, Any]] = []
next_id = start_id
def repl(match: re.Match[str]) -> str:
nonlocal next_id
inner = match.group(1)
summary_match = re.search(r"<summary[^>]*>(.*?)</summary>", inner, flags=re.S)
summary = re.sub(r"<[^>]+>", " ", summary_match.group(1)).strip() if summary_match else "?? ??"
detail_body = re.sub(r"<summary[^>]*>.*?</summary>", "", inner, flags=re.S)
detail_source = _normalize_block_for_storage(detail_body, repo_root)
if detail_source:
topics.append({
"id": next_id,
"title": summary,
"purpose": "?? ?? ??",
"role": "reference",
"layer": "supporting",
"source_hint": summary,
"summary": _compact(detail_source, _preserve_len(detail_source, floor=220, ceiling=560)),
"source_data": detail_source,
})
next_id += 1
return f"\n* **{summary}**\n"
stripped = re.sub(r"<details>(.*?)</details>", repl, block, flags=re.S)
return topics, stripped, next_id
def _extract_title_from_intro(block: str) -> str:
m = re.search(r"\*\s+\*\*(.+?)\*\*", block)
if m:
return m.group(1).strip()
return "도입"
def _section_chunks(text: str) -> list[tuple[str, str]]:
matches = list(re.finditer(r"^##\s+(.+)$", text, flags=re.M))
chunks: list[tuple[str, str]] = []
for idx, match in enumerate(matches):
title = match.group(1).strip()
start = match.end()
end = matches[idx + 1].start() if idx + 1 < len(matches) else len(text)
chunks.append((title, text[start:end].strip()))
return chunks
def _subsection_chunks(text: str) -> list[tuple[str, str]]:
matches = list(re.finditer(r"^###\s+(.+)$", text, flags=re.M))
chunks: list[tuple[str, str]] = []
for idx, match in enumerate(matches):
title = match.group(1).strip()
start = match.end()
end = matches[idx + 1].start() if idx + 1 < len(matches) else len(text)
chunks.append((title, text[start:end].strip()))
return chunks
def _classify(title: str, layer_hint: str = "core") -> tuple[str, str, str]:
clean = title.strip()
if "혼용" in clean:
return "problem", "flow", "intro"
if "정의" in clean:
return "definition", "flow", "core"
if "상호관계" in clean or "관계" in clean:
return "hierarchy", "flow", "core"
if "구분" in clean or "비교" in clean:
return "comparison", "reference", "supporting"
if "사례" in clean:
return "evidence", "reference", "supporting"
if "궁극적 목표" in clean:
return "goal", "flow", "core"
if "기대효과" in clean:
return "stakeholder_effect", "flow", "core"
if "필수 요건" in clean:
return "requirements", "flow", "core"
if "Process" in clean or "과정" in clean:
return "process", "flow", "core"
if "Product" in clean or "결과" in clean:
return "product", "flow", "core"
if "핵심 요약" in clean or "결론" in clean:
return "conclusion", "flow", "conclusion"
if layer_hint == "supporting":
return "support", "reference", "supporting"
return "section", "flow", "core"
def _extract_conclusion(text: str, repo_root: Path) -> tuple[str, str]:
m = re.search(r":::\s*note\[(.*?)\](.*?):::", text, flags=re.S)
if not m:
return text, ""
note_title = re.sub(r"\s+", " ", m.group(1)).strip() or "\ud575\uc2ec \uc694\uc57d"
note_body = _normalize_block_for_storage(m.group(2), repo_root)
note_source = f"**{note_title}**\n{note_body}".strip()
stripped = text[: m.start()] + text[m.end() :]
return stripped.strip(), note_source
def extract_topics_from_raw(raw: str, repo_root: Path) -> tuple[str, list[dict[str, Any]]]:
title_match = re.search(r"^title:\s*(.+)$", raw, flags=re.M)
doc_title = title_match.group(1).strip() if title_match else "Document"
clean = _strip_frontmatter_and_imports(raw)
clean, conclusion_source = _extract_conclusion(clean, repo_root)
topics: list[dict[str, Any]] = []
next_id = 1
first_section = re.search(r"^##\s+", clean, flags=re.M)
intro_block = clean[: first_section.start()].strip() if first_section else clean.strip()
if intro_block:
detail_topics, intro_stripped, _ = _extract_detail_topics(intro_block, next_id + 1, repo_root)
intro_source = _normalize_block_for_storage(intro_stripped, repo_root)
if intro_source:
title = _extract_title_from_intro(intro_source)
relation, role, layer = _classify(title, "intro")
topics.append({
"id": next_id,
"title": title,
"purpose": "?? ?? ?? ??",
"role": role,
"layer": layer,
"source_hint": title,
"summary": _compact(intro_source, _preserve_len(intro_source, floor=260, ceiling=760)),
"source_data": intro_source,
})
next_id += 1
topics.extend(detail_topics)
next_id = max([t["id"] for t in topics], default=0) + 1
for section_title, section_body in _section_chunks(clean):
detail_topics, section_stripped, next_id = _extract_detail_topics(section_body, next_id, repo_root)
subsections = _subsection_chunks(section_stripped)
lead = re.split(r"^###\s+.+$", section_stripped, maxsplit=1, flags=re.M)[0].strip() if subsections else section_stripped
if lead:
source = _normalize_block_for_storage(lead, repo_root)
if source:
relation, role, layer = _classify(section_title)
topics.append({
"id": next_id,
"title": section_title,
"purpose": f"{section_title} ?? ??",
"role": role,
"layer": layer,
"source_hint": section_title,
"summary": _compact(source, _preserve_len(source, floor=240, ceiling=780)),
"source_data": source,
})
next_id += 1
for sub_title, sub_body in subsections:
source = _normalize_block_for_storage(sub_body, repo_root)
if source:
relation, role, layer = _classify(sub_title)
topics.append({
"id": next_id,
"title": sub_title,
"purpose": f"{sub_title} ?? ??",
"role": role,
"layer": layer,
"source_hint": sub_title,
"summary": _compact(source, _preserve_len(source, floor=220, ceiling=760)),
"source_data": source,
})
next_id += 1
topics.extend(detail_topics)
next_id = max([t["id"] for t in topics], default=0) + 1
if conclusion_source:
topics.append({
"id": next_id,
"title": "\ud575\uc2ec \uc694\uc57d",
"purpose": "?? ?? ??",
"role": "flow",
"layer": "conclusion",
"source_hint": "\ud575\uc2ec \uc694\uc57d",
"summary": _compact(conclusion_source, _preserve_len(conclusion_source, floor=140, ceiling=360)),
"source_data": conclusion_source,
})
return doc_title, topics
def _page_structure(topics: list[dict[str, Any]]) -> dict[str, Any]:
intro_ids = [t["id"] for t in topics if t["layer"] == "intro"]
core_ids = [t["id"] for t in topics if t["layer"] == "core"]
support_ids = [t["id"] for t in topics if t["layer"] == "supporting"]
conclusion_ids = [t["id"] for t in topics if t["layer"] == "conclusion"]
structure: dict[str, Any] = {}
if intro_ids:
structure["background"] = {"topic_ids": intro_ids, "weight": 0.24}
if core_ids:
structure["body"] = {"topic_ids": core_ids, "weight": 0.48 if support_ids else 0.58}
if support_ids:
structure["support"] = {"topic_ids": support_ids, "weight": 0.18}
if conclusion_ids:
structure["key_message"] = {"topic_ids": conclusion_ids, "weight": 0.10}
return structure
def rebuild_run_from_raw(repo_root: Path, run_dir: Path, input_file: Path) -> dict[str, Any]:
raw = _read_text(input_file)
doc_title, topics = extract_topics_from_raw(raw, repo_root)
core_topic = next((t for t in topics if t["layer"] == "conclusion"), topics[-1] if topics else {"source_data": ""})
stage1a = {
"analysis": {
"title": doc_title,
"core_message": re.sub(r"\s+", " ", str(core_topic.get("source_data", ""))).strip(),
"total_pages": 1,
},
"page_structure": _page_structure(topics),
"topics": topics,
}
stage1b = {
"concepts": [
{
"topic_id": t["id"],
"relation_type": _classify(t["title"], t["layer"])[0],
"expression_hint": "?? ??? ??? ???. ??? ? ?? ??? ??? popup?? ???. visible ??? ?? ???? 85% ??? ?? ???.",
"summary": t["summary"],
}
for t in topics
]
}
plan_dir = run_dir / "04-plan"
plan_dir.mkdir(parents=True, exist_ok=True)
_write_json(plan_dir / "stage-1a-topics.json", stage1a)
_write_json(plan_dir / "stage-1b-refined-concepts.json", stage1b)
input_dir = run_dir / "01-input"
input_dir.mkdir(parents=True, exist_ok=True)
input_lines = [
"# Input Review",
"",
f"- ?? ???: {input_file.name}",
f"- ?? ??: {doc_title}",
"- ?? ?? ??: ?? block? ???? ?? ???? ???.",
"- ?? ??: ???? ?? 85% ?? ????, ? ?/?? ??? popup ??? ???.",
"",
"## ?? ??",
]
for topic in topics:
input_lines.append(f"- {topic['title']}: { _compact(re.sub(r'\s+', ' ', topic['source_data']), 160) }")
_write_text(input_dir / "input-review.md", "\n".join(input_lines) + "\n")
interp_dir = run_dir / "02-kei-interpretation"
interp_dir.mkdir(parents=True, exist_ok=True)
interp_lines = [
"# Interpretation",
"",
"- ?? ??: ????? ?? ??? ???.",
"- ?? ??: ?? ??? ????, ??/??/popup ???? ???.",
"- popup ??: ? ?, ?? ??, ? ??? ??? popup?? ?? ???.",
"",
"## Topic Classification",
]
for topic in topics:
interp_lines.append(f"- {topic['title']}: layer={topic['layer']} / role={topic['role']}")
_write_text(interp_dir / "kei-interpretation.md", "\n".join(interp_lines) + "\n")
structure_dir = run_dir / "03-structure"
structure_dir.mkdir(parents=True, exist_ok=True)
structure_lines = [
"# Content Structure",
"",
"- ??? ??: ?? ?? ??? ???.",
"- ??? ??: ?? ? ???? ????, ?? ???? ????.",
"- popup ??: ??? ? ?? ??? ? ?/? ??? popup?? ???.",
"",
"## Ordered Blocks",
]
for idx, topic in enumerate(topics, start=1):
structure_lines.append(f"{idx}. {topic['title']} ({topic['layer']})")
_write_text(structure_dir / "content-structure.md", "\n".join(structure_lines) + "\n")
plan_lines = [
"# Execution Plan",
"",
"- ??? raw mdx?? ?? ???? stage-1a/stage-1b? ???.",
"- ?? ??? ??? ???.",
"- ?? ??, ? ?, ??? ?? ??? popup?? ?? ???.",
"- visible ??? section title + ?? bullet + ?? ?? ???? ???.",
]
_write_text(plan_dir / "execution-plan.md", "\n".join(plan_lines) + "\n")
return {"title": doc_title, "topics": topics}