INTEGRATION-AUDIT-01 (#50) §10.4 / §10.5 housekeeping carry-over. F-4: annotate 14 remaining legacy Phase R'/Q sample-text hits across 10 src/ files with inline marker `# [legacy Phase R'/Q example -- INTEGRATION-AUDIT-01 §10.4]`. Comment-only. No string-literal / regex / sample dict value mutated. fit_verifier.py L612 marker keeps Phase Z partial-live import graph (FitAnalysis / RoleFit / redistribute / salvage) byte-precise. F-5: docs-only addendum -- §10.5.1 in INTEGRATION-AUDIT-01-REPORT.md + tests/CLAUDE.md fixture convention note. No root tests/fixtures/ dir created; existing tests/phase_z2/fixtures/ convention preserved. Documents test-only sample-reference allowance vs src/** runtime prohibition. Out of scope: Phase Z source 11 hits (phase_z2_content_extractor / failure_router / mapper / retry), production behavior change, #19 work. Verified: pytest -q tests/phase_z2/ = 157 PASS. git diff +210/-0 (35 src/docs lines + 175 new tests/CLAUDE.md). No behavioral delta. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
187 lines
6.6 KiB
Python
187 lines
6.6 KiB
Python
"""TF-IDF 기반 블록 매칭 엔진.
|
|
|
|
texts.md의 원본 텍스트를 직접 사용 — keywords 수동 생성 불필요.
|
|
frame_extractor가 텍스트를 추출하고, 여기서 TF-IDF 유사도를 계산.
|
|
|
|
사용법:
|
|
matcher = TfidfBlockMatcher()
|
|
result = matcher.match("DX 시행을 위한 필수 요건", ["기술(디지털)", "사람(역량)"])
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
import logging
|
|
import math
|
|
import re
|
|
from collections import Counter
|
|
from pathlib import Path
|
|
from typing import Any
|
|
|
|
from src.frame_extractor import extract_all_frames
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class TfidfBlockMatcher:
|
|
"""TF-IDF 기반 블록 매칭기. texts.md 직접 사용."""
|
|
|
|
def __init__(
|
|
self,
|
|
blocks_dir: str | Path = "figma_to_html_agent/blocks",
|
|
catalog_path: str | Path = "templates/catalog/blocks.yaml",
|
|
):
|
|
self.frames: list[dict] = extract_all_frames(blocks_dir)
|
|
self.catalog = self._load_catalog(catalog_path)
|
|
|
|
# 프레임별 전체 텍스트 + catalog 텍스트 합침
|
|
self.doc_texts: list[str] = []
|
|
self.doc_ids: list[str] = []
|
|
for frame in self.frames:
|
|
# texts.md 원본 텍스트 사용
|
|
text = frame.get("all_text", "")
|
|
# catalog에서 추가 정보 (when, description)
|
|
cat_entry = self._find_catalog_entry(frame["frame_id"])
|
|
if cat_entry:
|
|
text += " " + cat_entry.get("when", "")
|
|
text += " " + cat_entry.get("description", "")
|
|
self.doc_ids.append(cat_entry.get("id", frame["frame_id"]))
|
|
else:
|
|
self.doc_ids.append(frame["frame_id"])
|
|
self.doc_texts.append(text)
|
|
|
|
# IDF 사전 계산
|
|
self.idf = self._compute_idf(self.doc_texts)
|
|
logger.info(f"[tfidf] {len(self.frames)}개 프레임 인덱싱 완료 (texts.md 직접 사용)")
|
|
|
|
def _load_catalog(self, path: Path | str) -> list[dict]:
|
|
"""catalog 로드 (있으면)."""
|
|
path = Path(path)
|
|
if not path.exists():
|
|
return []
|
|
import yaml
|
|
try:
|
|
data = yaml.safe_load(path.read_text(encoding="utf-8"))
|
|
return data if isinstance(data, list) else data.get("blocks", [])
|
|
except Exception:
|
|
return []
|
|
|
|
def _find_catalog_entry(self, frame_id: str) -> dict | None:
|
|
"""frame_id로 catalog 항목 찾기."""
|
|
for entry in self.catalog:
|
|
if entry.get("source_frame") == frame_id:
|
|
return entry
|
|
return None
|
|
|
|
def _compute_idf(self, documents: list[str]) -> dict[str, float]:
|
|
"""IDF 계산."""
|
|
N = len(documents)
|
|
doc_freq = Counter()
|
|
for doc in documents:
|
|
words = set(doc.split())
|
|
for w in words:
|
|
doc_freq[w] += 1
|
|
return {w: math.log(N / (freq + 1)) for w, freq in doc_freq.items()}
|
|
|
|
def _tfidf_vec(self, text: str) -> dict[str, float]:
|
|
"""텍스트 → TF-IDF 벡터."""
|
|
words = text.split()
|
|
tf = Counter(words)
|
|
total = len(words) if words else 1
|
|
vec = {}
|
|
for w in tf:
|
|
idf = self.idf.get(w, math.log(len(self.doc_texts) + 1))
|
|
vec[w] = (tf[w] / total) * idf
|
|
return vec
|
|
|
|
def _cosine(self, a: dict, b: dict) -> float:
|
|
"""cosine similarity."""
|
|
keys = set(a) | set(b)
|
|
dot = sum(a.get(k, 0) * b.get(k, 0) for k in keys)
|
|
mag_a = math.sqrt(sum(v ** 2 for v in a.values())) if a else 0
|
|
mag_b = math.sqrt(sum(v ** 2 for v in b.values())) if b else 0
|
|
if mag_a == 0 or mag_b == 0:
|
|
return 0.0
|
|
return dot / (mag_a * mag_b)
|
|
|
|
def _preprocess_query(self, text: str) -> str:
|
|
"""MDX 쿼리 텍스트 전처리 (프레임 전처리와 동일 규칙)."""
|
|
text = text.replace("S/W", "SW 소프트웨어")
|
|
text = text.replace("H/W", "HW 하드웨어")
|
|
text = re.sub(r'\bDX\b', 'DX 디지털전환', text)
|
|
# [legacy Phase R'/Q example — INTEGRATION-AUDIT-01 §10.4]
|
|
text = re.sub(r'\bBIM\b', 'BIM 건설정보모델링', text)
|
|
text = text.replace("(", " ").replace(")", " ")
|
|
text = text.replace("[", " ").replace("]", " ")
|
|
text = re.sub(r'[·•→←↔×+/]', ' ', text)
|
|
text = re.sub(r'\s+', ' ', text).strip()
|
|
return text
|
|
|
|
def match(
|
|
self,
|
|
zone_title: str,
|
|
sub_titles: list[str] | None = None,
|
|
d1_items: list[str] | None = None,
|
|
top_k: int = 3,
|
|
) -> list[dict]:
|
|
"""중목차/소목차 텍스트로 프레임 매칭.
|
|
|
|
Returns:
|
|
[{"block_id": str, "frame_id": str, "score": float,
|
|
"title_text": str, "rough_structure": str}]
|
|
"""
|
|
if not self.doc_texts:
|
|
return []
|
|
|
|
# 쿼리 구성
|
|
parts = [zone_title]
|
|
if sub_titles:
|
|
parts.extend(sub_titles)
|
|
if d1_items:
|
|
parts.extend(d1_items)
|
|
query = self._preprocess_query(" ".join(parts))
|
|
|
|
# TF-IDF 유사도 계산
|
|
query_vec = self._tfidf_vec(query)
|
|
scores = []
|
|
for i, doc_text in enumerate(self.doc_texts):
|
|
doc_vec = self._tfidf_vec(doc_text)
|
|
score = self._cosine(query_vec, doc_vec)
|
|
scores.append((i, score))
|
|
|
|
# 상위 K개
|
|
scores.sort(key=lambda x: -x[1])
|
|
results = []
|
|
for idx, score in scores[:top_k]:
|
|
if score <= 0:
|
|
continue
|
|
frame = self.frames[idx]
|
|
results.append({
|
|
"block_id": self.doc_ids[idx],
|
|
"frame_id": frame["frame_id"],
|
|
"score": round(score, 4),
|
|
"method": "tfidf",
|
|
"title_text": frame.get("title_text", ""),
|
|
"rough_structure": frame.get("rough_structure", ""),
|
|
"item_count": frame.get("item_count", 0),
|
|
})
|
|
|
|
if results:
|
|
logger.info(
|
|
f"[tfidf] '{zone_title}' → top: {results[0]['block_id']} "
|
|
f"(score={results[0]['score']}, frame={results[0]['frame_id']})"
|
|
)
|
|
|
|
return results
|
|
|
|
def match_with_threshold(
|
|
self,
|
|
zone_title: str,
|
|
sub_titles: list[str] | None = None,
|
|
d1_items: list[str] | None = None,
|
|
threshold: float = 0.10,
|
|
) -> dict | None:
|
|
"""threshold 이상이면 best match, 아니면 None → recipe 경로."""
|
|
results = self.match(zone_title, sub_titles, d1_items, top_k=1)
|
|
if results and results[0]["score"] >= threshold:
|
|
return results[0]
|
|
return None
|