"""TF-IDF 기반 블록 매칭 엔진. texts.md의 원본 텍스트를 직접 사용 — keywords 수동 생성 불필요. frame_extractor가 텍스트를 추출하고, 여기서 TF-IDF 유사도를 계산. 사용법: matcher = TfidfBlockMatcher() result = matcher.match("DX 시행을 위한 필수 요건", ["기술(디지털)", "사람(역량)"]) """ from __future__ import annotations import logging import math import re from collections import Counter from pathlib import Path from typing import Any from src.frame_extractor import extract_all_frames logger = logging.getLogger(__name__) class TfidfBlockMatcher: """TF-IDF 기반 블록 매칭기. texts.md 직접 사용.""" def __init__( self, blocks_dir: str | Path = "figma_to_html_agent/blocks", catalog_path: str | Path = "templates/catalog/blocks.yaml", ): self.frames: list[dict] = extract_all_frames(blocks_dir) self.catalog = self._load_catalog(catalog_path) # 프레임별 전체 텍스트 + catalog 텍스트 합침 self.doc_texts: list[str] = [] self.doc_ids: list[str] = [] for frame in self.frames: # texts.md 원본 텍스트 사용 text = frame.get("all_text", "") # catalog에서 추가 정보 (when, description) cat_entry = self._find_catalog_entry(frame["frame_id"]) if cat_entry: text += " " + cat_entry.get("when", "") text += " " + cat_entry.get("description", "") self.doc_ids.append(cat_entry.get("id", frame["frame_id"])) else: self.doc_ids.append(frame["frame_id"]) self.doc_texts.append(text) # IDF 사전 계산 self.idf = self._compute_idf(self.doc_texts) logger.info(f"[tfidf] {len(self.frames)}개 프레임 인덱싱 완료 (texts.md 직접 사용)") def _load_catalog(self, path: Path | str) -> list[dict]: """catalog 로드 (있으면).""" path = Path(path) if not path.exists(): return [] import yaml try: data = yaml.safe_load(path.read_text(encoding="utf-8")) return data if isinstance(data, list) else data.get("blocks", []) except Exception: return [] def _find_catalog_entry(self, frame_id: str) -> dict | None: """frame_id로 catalog 항목 찾기.""" for entry in self.catalog: if entry.get("source_frame") == frame_id: return entry return None def _compute_idf(self, documents: list[str]) -> dict[str, float]: """IDF 계산.""" N = len(documents) doc_freq = Counter() for doc in documents: words = set(doc.split()) for w in words: doc_freq[w] += 1 return {w: math.log(N / (freq + 1)) for w, freq in doc_freq.items()} def _tfidf_vec(self, text: str) -> dict[str, float]: """텍스트 → TF-IDF 벡터.""" words = text.split() tf = Counter(words) total = len(words) if words else 1 vec = {} for w in tf: idf = self.idf.get(w, math.log(len(self.doc_texts) + 1)) vec[w] = (tf[w] / total) * idf return vec def _cosine(self, a: dict, b: dict) -> float: """cosine similarity.""" keys = set(a) | set(b) dot = sum(a.get(k, 0) * b.get(k, 0) for k in keys) mag_a = math.sqrt(sum(v ** 2 for v in a.values())) if a else 0 mag_b = math.sqrt(sum(v ** 2 for v in b.values())) if b else 0 if mag_a == 0 or mag_b == 0: return 0.0 return dot / (mag_a * mag_b) def _preprocess_query(self, text: str) -> str: """MDX 쿼리 텍스트 전처리 (프레임 전처리와 동일 규칙).""" text = text.replace("S/W", "SW 소프트웨어") text = text.replace("H/W", "HW 하드웨어") text = re.sub(r'\bDX\b', 'DX 디지털전환', text) # [legacy Phase R'/Q example — INTEGRATION-AUDIT-01 §10.4] text = re.sub(r'\bBIM\b', 'BIM 건설정보모델링', text) text = text.replace("(", " ").replace(")", " ") text = text.replace("[", " ").replace("]", " ") text = re.sub(r'[·•→←↔×+/]', ' ', text) text = re.sub(r'\s+', ' ', text).strip() return text def match( self, zone_title: str, sub_titles: list[str] | None = None, d1_items: list[str] | None = None, top_k: int = 3, ) -> list[dict]: """중목차/소목차 텍스트로 프레임 매칭. Returns: [{"block_id": str, "frame_id": str, "score": float, "title_text": str, "rough_structure": str}] """ if not self.doc_texts: return [] # 쿼리 구성 parts = [zone_title] if sub_titles: parts.extend(sub_titles) if d1_items: parts.extend(d1_items) query = self._preprocess_query(" ".join(parts)) # TF-IDF 유사도 계산 query_vec = self._tfidf_vec(query) scores = [] for i, doc_text in enumerate(self.doc_texts): doc_vec = self._tfidf_vec(doc_text) score = self._cosine(query_vec, doc_vec) scores.append((i, score)) # 상위 K개 scores.sort(key=lambda x: -x[1]) results = [] for idx, score in scores[:top_k]: if score <= 0: continue frame = self.frames[idx] results.append({ "block_id": self.doc_ids[idx], "frame_id": frame["frame_id"], "score": round(score, 4), "method": "tfidf", "title_text": frame.get("title_text", ""), "rough_structure": frame.get("rough_structure", ""), "item_count": frame.get("item_count", 0), }) if results: logger.info( f"[tfidf] '{zone_title}' → top: {results[0]['block_id']} " f"(score={results[0]['score']}, frame={results[0]['frame_id']})" ) return results def match_with_threshold( self, zone_title: str, sub_titles: list[str] | None = None, d1_items: list[str] | None = None, threshold: float = 0.10, ) -> dict | None: """threshold 이상이면 best match, 아니면 None → recipe 경로.""" results = self.match(zone_title, sub_titles, d1_items, top_k=1) if results and results[0]["score"] >= threshold: return results[0] return None