from __future__ import annotations import json import re from pathlib import Path from typing import Any def _read_text(path: Path) -> str: return path.read_text(encoding="utf-8-sig") def _write_json(path: Path, data: dict[str, Any]) -> None: path.write_text(json.dumps(data, ensure_ascii=False, indent=2), encoding="utf-8") def _write_text(path: Path, text: str) -> None: path.write_text(text, encoding="utf-8") def _compact(text: str, max_len: int) -> str: normalized = re.sub(r"\s+", " ", text).strip() if len(normalized) <= max_len: return normalized cut = normalized[:max_len].rsplit(" ", 1)[0].strip() return (cut or normalized[:max_len]).rstrip(" ,.;:") + "..." def _preserve_len(text: str, ratio: float = 0.85, floor: int = 180, ceiling: int = 900) -> int: normalized = re.sub(r"\s+", " ", text).strip() if not normalized: return floor return max(floor, min(ceiling, int(len(normalized) * ratio))) def _strip_frontmatter_and_imports(raw: str) -> str: text = raw.replace("\r\n", "\n") if text.startswith("---\n"): end = text.find("\n---", 4) if end != -1: text = text[end + 4 :] text = re.sub(r"^import\s+.+?$", "", text, flags=re.M) return text.strip() def _dx_effect_lines(repo_root: Path) -> list[str]: path = repo_root / "components" / "dx.astro" if not path.exists(): return [] text = _read_text(path) text = re.sub(r"", "", text, flags=re.S) text = text.replace("
", " ") text = re.sub(r"]*>", "\n", text) text = re.sub(r"]*>", "- ", text) text = re.sub(r"", "\n", text) text = re.sub(r"<[^>]+>", " ", text) lines: list[str] = [] for raw in text.splitlines(): line = re.sub(r"\s+", " ", raw).strip() if not line: continue if line.startswith("/*") or line.startswith("["): continue if len(line) < 6: continue lines.append(line) deduped: list[str] = [] for line in lines: if line not in deduped: deduped.append(line) return deduped[:24] def _normalize_block_for_storage(text: str, repo_root: Path) -> str: dx_lines = _dx_effect_lines(repo_root) if "", replacement, text) text = re.sub(r"]*>(.*?)", lambda m: f"**{re.sub(r'<[^>]+>', ' ', m.group(1)).strip()}**", text, flags=re.S) text = text.replace("
", "").replace("
", "") text = re.sub(r"", "\n", text, flags=re.I) text = re.sub(r"]*>", "", text) text = re.sub(r":::\s*note\[(.*?)\]", r"**\1**", text) text = text.replace(":::", "") text = re.sub(r"!\[([^\]]+)\]\(([^\)]+)\)", r"[???] \1", text) text = re.sub(r"\n{3,}", "\n\n", text) return text.strip() def _first_nonempty_lines(text: str, limit: int = 8) -> list[str]: lines: list[str] = [] for raw in text.splitlines(): line = raw.strip() if not line: continue if line.startswith("---"): continue lines.append(line) if len(lines) >= limit: break return lines def _extract_detail_topics(block: str, start_id: int, repo_root: Path) -> tuple[list[dict[str, Any]], str, int]: topics: list[dict[str, Any]] = [] next_id = start_id def repl(match: re.Match[str]) -> str: nonlocal next_id inner = match.group(1) summary_match = re.search(r"]*>(.*?)", inner, flags=re.S) summary = re.sub(r"<[^>]+>", " ", summary_match.group(1)).strip() if summary_match else "?? ??" detail_body = re.sub(r"]*>.*?", "", inner, flags=re.S) detail_source = _normalize_block_for_storage(detail_body, repo_root) if detail_source: topics.append({ "id": next_id, "title": summary, "purpose": "?? ?? ??", "role": "reference", "layer": "supporting", "source_hint": summary, "summary": _compact(detail_source, _preserve_len(detail_source, floor=220, ceiling=560)), "source_data": detail_source, }) next_id += 1 return f"\n* **{summary}**\n" stripped = re.sub(r"
(.*?)
", repl, block, flags=re.S) return topics, stripped, next_id def _extract_title_from_intro(block: str) -> str: m = re.search(r"\*\s+\*\*(.+?)\*\*", block) if m: return m.group(1).strip() return "도입" def _section_chunks(text: str) -> list[tuple[str, str]]: matches = list(re.finditer(r"^##\s+(.+)$", text, flags=re.M)) chunks: list[tuple[str, str]] = [] for idx, match in enumerate(matches): title = match.group(1).strip() start = match.end() end = matches[idx + 1].start() if idx + 1 < len(matches) else len(text) chunks.append((title, text[start:end].strip())) return chunks def _subsection_chunks(text: str) -> list[tuple[str, str]]: matches = list(re.finditer(r"^###\s+(.+)$", text, flags=re.M)) chunks: list[tuple[str, str]] = [] for idx, match in enumerate(matches): title = match.group(1).strip() start = match.end() end = matches[idx + 1].start() if idx + 1 < len(matches) else len(text) chunks.append((title, text[start:end].strip())) return chunks def _classify(title: str, layer_hint: str = "core") -> tuple[str, str, str]: clean = title.strip() if "혼용" in clean: return "problem", "flow", "intro" if "정의" in clean: return "definition", "flow", "core" if "상호관계" in clean or "관계" in clean: return "hierarchy", "flow", "core" if "구분" in clean or "비교" in clean: return "comparison", "reference", "supporting" if "사례" in clean: return "evidence", "reference", "supporting" if "궁극적 목표" in clean: return "goal", "flow", "core" if "기대효과" in clean: return "stakeholder_effect", "flow", "core" if "필수 요건" in clean: return "requirements", "flow", "core" if "Process" in clean or "과정" in clean: return "process", "flow", "core" if "Product" in clean or "결과" in clean: return "product", "flow", "core" if "핵심 요약" in clean or "결론" in clean: return "conclusion", "flow", "conclusion" if layer_hint == "supporting": return "support", "reference", "supporting" return "section", "flow", "core" def _extract_conclusion(text: str, repo_root: Path) -> tuple[str, str]: m = re.search(r":::\s*note\[(.*?)\](.*?):::", text, flags=re.S) if not m: return text, "" note_title = re.sub(r"\s+", " ", m.group(1)).strip() or "\ud575\uc2ec \uc694\uc57d" note_body = _normalize_block_for_storage(m.group(2), repo_root) note_source = f"**{note_title}**\n{note_body}".strip() stripped = text[: m.start()] + text[m.end() :] return stripped.strip(), note_source def extract_topics_from_raw(raw: str, repo_root: Path) -> tuple[str, list[dict[str, Any]]]: title_match = re.search(r"^title:\s*(.+)$", raw, flags=re.M) doc_title = title_match.group(1).strip() if title_match else "Document" clean = _strip_frontmatter_and_imports(raw) clean, conclusion_source = _extract_conclusion(clean, repo_root) topics: list[dict[str, Any]] = [] next_id = 1 first_section = re.search(r"^##\s+", clean, flags=re.M) intro_block = clean[: first_section.start()].strip() if first_section else clean.strip() if intro_block: detail_topics, intro_stripped, _ = _extract_detail_topics(intro_block, next_id + 1, repo_root) intro_source = _normalize_block_for_storage(intro_stripped, repo_root) if intro_source: title = _extract_title_from_intro(intro_source) relation, role, layer = _classify(title, "intro") topics.append({ "id": next_id, "title": title, "purpose": "?? ?? ?? ??", "role": role, "layer": layer, "source_hint": title, "summary": _compact(intro_source, _preserve_len(intro_source, floor=260, ceiling=760)), "source_data": intro_source, }) next_id += 1 topics.extend(detail_topics) next_id = max([t["id"] for t in topics], default=0) + 1 for section_title, section_body in _section_chunks(clean): detail_topics, section_stripped, next_id = _extract_detail_topics(section_body, next_id, repo_root) subsections = _subsection_chunks(section_stripped) lead = re.split(r"^###\s+.+$", section_stripped, maxsplit=1, flags=re.M)[0].strip() if subsections else section_stripped if lead: source = _normalize_block_for_storage(lead, repo_root) if source: relation, role, layer = _classify(section_title) topics.append({ "id": next_id, "title": section_title, "purpose": f"{section_title} ?? ??", "role": role, "layer": layer, "source_hint": section_title, "summary": _compact(source, _preserve_len(source, floor=240, ceiling=780)), "source_data": source, }) next_id += 1 for sub_title, sub_body in subsections: source = _normalize_block_for_storage(sub_body, repo_root) if source: relation, role, layer = _classify(sub_title) topics.append({ "id": next_id, "title": sub_title, "purpose": f"{sub_title} ?? ??", "role": role, "layer": layer, "source_hint": sub_title, "summary": _compact(source, _preserve_len(source, floor=220, ceiling=760)), "source_data": source, }) next_id += 1 topics.extend(detail_topics) next_id = max([t["id"] for t in topics], default=0) + 1 if conclusion_source: topics.append({ "id": next_id, "title": "\ud575\uc2ec \uc694\uc57d", "purpose": "?? ?? ??", "role": "flow", "layer": "conclusion", "source_hint": "\ud575\uc2ec \uc694\uc57d", "summary": _compact(conclusion_source, _preserve_len(conclusion_source, floor=140, ceiling=360)), "source_data": conclusion_source, }) return doc_title, topics def _page_structure(topics: list[dict[str, Any]]) -> dict[str, Any]: intro_ids = [t["id"] for t in topics if t["layer"] == "intro"] core_ids = [t["id"] for t in topics if t["layer"] == "core"] support_ids = [t["id"] for t in topics if t["layer"] == "supporting"] conclusion_ids = [t["id"] for t in topics if t["layer"] == "conclusion"] structure: dict[str, Any] = {} if intro_ids: structure["background"] = {"topic_ids": intro_ids, "weight": 0.24} if core_ids: structure["body"] = {"topic_ids": core_ids, "weight": 0.48 if support_ids else 0.58} if support_ids: structure["support"] = {"topic_ids": support_ids, "weight": 0.18} if conclusion_ids: structure["key_message"] = {"topic_ids": conclusion_ids, "weight": 0.10} return structure def rebuild_run_from_raw(repo_root: Path, run_dir: Path, input_file: Path) -> dict[str, Any]: raw = _read_text(input_file) doc_title, topics = extract_topics_from_raw(raw, repo_root) core_topic = next((t for t in topics if t["layer"] == "conclusion"), topics[-1] if topics else {"source_data": ""}) stage1a = { "analysis": { "title": doc_title, "core_message": re.sub(r"\s+", " ", str(core_topic.get("source_data", ""))).strip(), "total_pages": 1, }, "page_structure": _page_structure(topics), "topics": topics, } stage1b = { "concepts": [ { "topic_id": t["id"], "relation_type": _classify(t["title"], t["layer"])[0], "expression_hint": "?? ??? ??? ???. ??? ? ?? ??? ??? popup?? ???. visible ??? ?? ???? 85% ??? ?? ???.", "summary": t["summary"], } for t in topics ] } plan_dir = run_dir / "04-plan" plan_dir.mkdir(parents=True, exist_ok=True) _write_json(plan_dir / "stage-1a-topics.json", stage1a) _write_json(plan_dir / "stage-1b-refined-concepts.json", stage1b) input_dir = run_dir / "01-input" input_dir.mkdir(parents=True, exist_ok=True) input_lines = [ "# Input Review", "", f"- ?? ???: {input_file.name}", f"- ?? ??: {doc_title}", "- ?? ?? ??: ?? block? ???? ?? ???? ???.", "- ?? ??: ???? ?? 85% ?? ????, ? ?/?? ??? popup ??? ???.", "", "## ?? ??", ] for topic in topics: input_lines.append(f"- {topic['title']}: { _compact(re.sub(r'\s+', ' ', topic['source_data']), 160) }") _write_text(input_dir / "input-review.md", "\n".join(input_lines) + "\n") interp_dir = run_dir / "02-kei-interpretation" interp_dir.mkdir(parents=True, exist_ok=True) interp_lines = [ "# Interpretation", "", "- ?? ??: ????? ?? ??? ???.", "- ?? ??: ?? ??? ????, ??/??/popup ???? ???.", "- popup ??: ? ?, ?? ??, ? ??? ??? popup?? ?? ???.", "", "## Topic Classification", ] for topic in topics: interp_lines.append(f"- {topic['title']}: layer={topic['layer']} / role={topic['role']}") _write_text(interp_dir / "kei-interpretation.md", "\n".join(interp_lines) + "\n") structure_dir = run_dir / "03-structure" structure_dir.mkdir(parents=True, exist_ok=True) structure_lines = [ "# Content Structure", "", "- ??? ??: ?? ?? ??? ???.", "- ??? ??: ?? ? ???? ????, ?? ???? ????.", "- popup ??: ??? ? ?? ??? ? ?/? ??? popup?? ???.", "", "## Ordered Blocks", ] for idx, topic in enumerate(topics, start=1): structure_lines.append(f"{idx}. {topic['title']} ({topic['layer']})") _write_text(structure_dir / "content-structure.md", "\n".join(structure_lines) + "\n") plan_lines = [ "# Execution Plan", "", "- ??? raw mdx?? ?? ???? stage-1a/stage-1b? ???.", "- ?? ??? ??? ???.", "- ?? ??, ? ?, ??? ?? ??? popup?? ?? ???.", "- visible ??? section title + ?? bullet + ?? ?? ???? ???.", ] _write_text(plan_dir / "execution-plan.md", "\n".join(plan_lines) + "\n") return {"title": doc_title, "topics": topics}