"""여수로 수문 설치도 DXF 파서. 여수로(spillway) 수문 설치도(설계도)에서 수문 구조물의 핵심 파라미터를 자동 추출한다. 보통 1/2 도면(평면+정면), 2/2 도면(측면단면)의 쌍으로 제공됨. 추출되는 핵심 파라미터: - 수문 개수(n_gates), 수문 폭(gate_width), 수문 높이(gate_height) - 각 주요 표고(Gate Sill, Weir Crest, Gate Top, Trunnion Pin, 수위들) - 평면도 외곽(ogee 단면이 extrude될 span 방향) - 측면 단면 외곽(ogee profile polyline) - 교각(pier) 위치 목록 사용법: parser = GateParser() params = parser.parse(plan_dxf_path, section_dxf_path) """ from __future__ import annotations import contextlib import math import re from dataclasses import dataclass, field from itertools import pairwise from pathlib import Path from typing import ClassVar import ezdxf import numpy as np # --------------------------------------------------------------------------- # 데이터 클래스 # --------------------------------------------------------------------------- @dataclass class GateParams: """여수로 수문 구조물 파라미터 (단위: m, 표고는 해발기준 m).""" # 수문 본체 n_gates: int = 3 gate_width: float = 15.0 # 수문 폭 (clear span) gate_height: float = 7.0 # 수문 높이 (sill→top) # 교각 (pier) pier_count: int = 4 # 보통 n_gates + 1 pier_width: float = 3.0 # 교각 폭 (유출방향 수직) pier_length: float = 25.0 # 교각 길이 (유출방향) # 주요 표고 (m, 해발) el_gate_sill: float = 46.700 el_stoplog_sill: float = 46.000 # Stoplog sill (파서에서 추출; 현재 빌더 미사용) el_weir_crest: float = 47.000 el_gate_top: float = 53.700 el_trunnion_pin: float = 50.200 el_mwl: float = 53.830 # M.W.L 최대수위 el_nhwl: float = 52.500 # N.H.W.L 상시만수위 el_lwl: float = 45.000 # L.W.L 최저수위 el_downstream: float = 44.000 el_upstream_bed: float = 41.500 el_bridge_top: float = 56.000 # 공도교 상면 # 구조 전체 (평면) total_span: float = 75.0 # dam axis 방향 전체 폭 total_length: float = 25.0 # 유출방향 전체 길이 dam_axis_y: float = 0.0 # dam axis의 Y 좌표 (로컬) # 단면 프로파일 (ogee) - 로컬 좌표계로 정규화된 점 목록 [(x, z), ...] # x: 유출방향 (0=상류 시작), z: 표고 (해발) ogee_profile: list = field(default_factory=list) # 평면 외곽 (상류측/하류측 경계) plan_outline_upstream: list = field(default_factory=list) plan_outline_downstream: list = field(default_factory=list) # 수문 게이트 중심선 (평면 X 좌표, 순서대로) - 로컬 기준 gate_centers_x: list = field(default_factory=list) # 평면 방향 보정 (detail DXF 내에서 mesh의 +X축이 가리키는 각도) # PCA로 plan outline 주축을 찾아 mesh 빌더 좌표계와 detail DXF 좌표계의 # 상대 회전을 표현. 기본 0.0 = detail의 +X가 span 방향 (수평 그려진 경우). plan_frame_angle_deg: float = 0.0 # 부속 구조물 존재 여부 (도면 검출 결과로 갱신; 구조적 요소는 default False, 시각은 True) has_service_bridge: bool = False # 공도교 (수문 상부 service bridge) has_hoist_housings: bool = True # 여수로 개폐장치 — 래디얼 게이트엔 보통 필수 (파서가 hoist 관련 레이어/블록 검출 시 확정 유지, 없으면 False로 낮춤; 플래그 이름은 직렬화 호환 위해 보존) has_downstream_apron: bool = True # 하류 에이프런 — 시각 맥락, 사용자 토글 가능 has_water_surface: bool = True # 상류 수면 표시 — 시각 맥락, 사용자 토글 가능 # Phase B' — 실제 도면 기하 (parametric 폴백의 우선 대체) # plan 영역 CS-CONC 레이어의 폐합 폴리라인 (chamber-local 좌표, m 단위) plan_outline_polygon: list = field(default_factory=list) # 외곽: [(x,y), ...] pier_plan_polygons: list = field(default_factory=list) # 각 교각: [[(x,y), ...], ...] # 공도교(service bridge)의 실제 plan bbox — (x0, y0, x1, y1) local m, None이면 폴백 bridge_plan_bbox: tuple | None = None # 공도교 Y 방향 두께(도면 기반 실측). None이면 bbox Y 길이 사용. bridge_deck_thickness_m: float = 1.2 # 사용자 직접 지정 공도교 위치 (UI 편집 가능; 4개 값 모두 유효하면 다른 경로 대신 사용) # 기본 None/-1: 사용자 미입력으로 간주. 양수로 편집 시 override 적용. bridge_x_start: float | None = None bridge_x_end: float | None = None bridge_y_start: float | None = None bridge_y_end: float | None = None # FLOW 화살표로 검출된 유수 방향 단위벡터 (DXF XY frame, dx²+dy²=1) # None이면 PCA만 사용 (span 180° 모호성 존재). 검출 시 plan_frame_angle_deg가 # 전체 -180..180 범위로 정확 설정됨. flow_direction_2d: tuple | None = None # 메타데이터 source_files: list = field(default_factory=list) raw_text_annotations: list = field(default_factory=list) def summary(self) -> str: """요약 텍스트 반환.""" return ( f"Gate: {self.n_gates}문 × W{self.gate_width:.1f}m × H{self.gate_height:.1f}m\n" f" Sill EL.{self.el_gate_sill:.2f} / Crest EL.{self.el_weir_crest:.2f} / " f"Top EL.{self.el_gate_top:.2f} / Trunnion EL.{self.el_trunnion_pin:.2f}\n" f" W.L: MWL {self.el_mwl:.2f} / NHWL {self.el_nhwl:.2f} / LWL {self.el_lwl:.2f}\n" f" Total: {self.total_span:.1f}m × {self.total_length:.1f}m\n" f" 부속: 공도교={'O' if self.has_service_bridge else 'X'}, " f"개폐장치={'O' if self.has_hoist_housings else 'X'}, " f"에이프런={'O' if self.has_downstream_apron else 'X'}, " f"수면={'O' if self.has_water_surface else 'X'}\n" f" 기하: plan_outline={len(self.plan_outline_polygon)}pts, " f"piers={len(self.pier_plan_polygons)}\n" f" Gate centers (X): {[f'{x:.1f}' for x in self.gate_centers_x]}" ) # --------------------------------------------------------------------------- # 파서 # --------------------------------------------------------------------------- class GateParser: """여수로 수문 설치도 파서.""" # 구조 레이어 (concrete 본체 geometry) STRUCT_LAYERS: ClassVar[set[str]] = { "CS-CONC-Spillway", "CS-CONC-Bridge", } def parse(self, plan_dxf: str | Path, section_dxf: str | Path | None = None) -> GateParams: """평면/정면 도면과 측면단면 도면을 파싱. Args: plan_dxf: 1/2 도면 (평면 + 정면). 필수. section_dxf: 2/2 도면 (측면 단면). 선택적, 있으면 ogee 프로파일 추출. Returns: GateParams """ params = GateParams() params.source_files.append(str(plan_dxf)) # Plan + Elevation 도면 파싱 self._parse_plan_file(Path(plan_dxf), params) # Section 도면 파싱 (있으면) if section_dxf: params.source_files.append(str(section_dxf)) self._parse_section_file(Path(section_dxf), params) # 누락된 파라미터는 기본값 + 추론 self._infer_missing_params(params) return params # ----- Plan/Elevation 도면 파싱 ----- def _parse_plan_file(self, path: Path, params: GateParams): """1/2 도면에서 수문 개수, 크기, 평면 배치, 표고를 추출 (Phase D 통합). view_detector로 평면도/입면도/측면도 영역을 분리한 뒤: - 평면도 영역 내 geometry → 외곽·교각·게이트 위치 - 입면도 영역 내 geometry → 게이트 높이 등 (현재는 보조, 추후 확장) view_detector가 영역을 못 찾으면 기존 y_mid 휴리스틱으로 폴백. """ doc = ezdxf.readfile(str(path)) msp = doc.modelspace() # 0) 부속 구조물 존재성 검출 (공도교 등) self._detect_optional_components(msp, params) # 1) 텍스트 주석 스캔 → 수문 사양 + 표고 (전체 스캔, view 무관) self._scan_text_annotations(msp, params) # 2) 뷰 영역 식별 (Phase D): view_detector 우선, 실패 시 y_mid 폴백 plan_bounds_mm = None # (x0, y0, x1, y1) — 파서는 mm 좌표로 filter try: from view_detector import detect_view_regions views = detect_view_regions(str(path)) plan_view = next((v for v in views if v.view_type == "plan"), None) if plan_view is not None: # view bounds는 m 단위, 파서는 mm 단위로 작업 → 1000배 변환 b = plan_view.bounds # (xmin_m, ymin_m, xmax_m, ymax_m) plan_bounds_mm = (b[0] * 1000.0, b[1] * 1000.0, b[2] * 1000.0, b[3] * 1000.0) params.raw_text_annotations.append(( f"[view] plan detected: " f"x=[{b[0]:.1f},{b[2]:.1f}]m y=[{b[1]:.1f},{b[3]:.1f}]m " f"(W={plan_view.width:.1f}m, H={plan_view.height:.1f}m)", 0.0, 0.0 )) except Exception as _e: pass # plan bounds로 필터할 함수 정의 def _in_plan(x, y): if plan_bounds_mm is not None: x0, y0, x1, y1 = plan_bounds_mm return x0 <= x <= x1 and y0 <= y <= y1 # 폴백: 뒤에서 계산할 y_mid 사용. 임시로 True 반환해 전체 처리. return True # 폴백용 y_mid 계산 (view_detector 실패 시) y_values = [] for e in msp: if e.dxf.layer != "CS-CONC-Spillway": continue with contextlib.suppress(Exception): if e.dxftype() == "LWPOLYLINE": y_values.extend(p[1] for p in e.get_points()) elif e.dxftype() == "LINE": y_values.extend([e.dxf.start.y, e.dxf.end.y]) if not y_values: return ymin = min(y_values) ymax = max(y_values) y_mid = (ymin + ymax) / 2 # view_detector가 plan을 찾았으면 그걸로 교체; 아니면 y_mid 유지 if plan_bounds_mm is not None: # plan y-range의 상한을 유지, 하한을 plan 영역 y0로 ymin = plan_bounds_mm[1] ymax = plan_bounds_mm[3] y_mid = ymin # 기존 로직에서 mid 이상만 plan으로 취급하므로 plan 하한을 mid로 # 3) 평면 영역 (Y > y_mid) 지오메트리 추출 plan_pts_all = [] for e in msp: if e.dxf.layer not in self.STRUCT_LAYERS: continue try: if e.dxftype() == "LWPOLYLINE": pts = [(p[0], p[1]) for p in e.get_points()] if pts and np.mean([p[1] for p in pts]) > y_mid: plan_pts_all.extend(pts) elif e.dxftype() == "LINE": s, en = e.dxf.start, e.dxf.end if (s.y + en.y) / 2 > y_mid: plan_pts_all.extend([(s.x, s.y), (en.x, en.y)]) except Exception: pass if plan_pts_all: arr = np.array(plan_pts_all) params.total_span = float(arr[:, 0].max() - arr[:, 0].min()) / 1000.0 # mm→m params.total_length = float(arr[:, 1].max() - arr[:, 1].min()) / 1000.0 # --- 평면 방향 결정 --- # 1순위: FLOW 화살표 (사용자 지정 유수 방향 → 180° 모호성 해소) # 2순위: plan geometry PCA 주축 (부호 모호성 존재, -90..+90 정규화) flow_dir = self._detect_flow_direction(msp) if flow_dir is not None: params.flow_direction_2d = flow_dir fx, fy = flow_dir # 배치 파이프라인 `fit_meshes_to_quad`는 CW quad 기본값으로 # mesh Y를 먼저 반전(flip_y_for_cw_quad=True) 후 회전 적용. # mesh +Y(빌더 downstream 컨벤션)가 world flow 방향과 일치하려면, # 유도: (0,1) → Y-flip → (0,-1) → rotate(-span) → (-sin(span),-cos(span)) # 이 벡터가 flow=(fx,fy)와 같아야 함 → sin(span)=-fx, cos(span)=-fy # → span = atan2(-fx, -fy) span_angle_rad = math.atan2(-fx, -fy) span_angle_deg = math.degrees(span_angle_rad) while span_angle_deg > 180: span_angle_deg -= 360 while span_angle_deg <= -180: span_angle_deg += 360 params.plan_frame_angle_deg = span_angle_deg params.raw_text_annotations.append(( f"[flow] detected flow_dir=({fx:+.3f},{fy:+.3f}) " f"→ plan_frame_angle={span_angle_deg:+.1f}°", 0.0, 0.0 )) else: try: centered = arr - arr.mean(axis=0) cov = np.cov(centered.T) eigvals, eigvecs = np.linalg.eigh(cov) idx = int(np.argmax(eigvals)) main_axis = eigvecs[:, idx] span_angle_rad = math.atan2(float(main_axis[1]), float(main_axis[0])) span_angle_deg = math.degrees(span_angle_rad) while span_angle_deg > 90: span_angle_deg -= 180 while span_angle_deg < -90: span_angle_deg += 180 params.plan_frame_angle_deg = span_angle_deg except Exception: params.plan_frame_angle_deg = 0.0 # 수문 중심 추정: 평면 폭을 n_gates로 균등 분할 # 정확한 위치는 아래 4) 단계에서 치수선으로 보정 x_min = float(arr[:, 0].min()) / 1000.0 # 기본 추정 if not params.gate_centers_x: gate_spacing = params.total_span / params.n_gates params.gate_centers_x = [ x_min + gate_spacing * (i + 0.5) for i in range(params.n_gates) ] # 4) DIMENSION 엔티티 → clear span 확인 및 수문 위치 보정 self._extract_gate_positions_from_dims(msp, params) # 5) Phase B' — 평면 폴리곤/교각 폴리곤 직접 추출 self._extract_plan_polygons(msp, params, y_mid, ymin, ymax) def _parse_section_file(self, path: Path, params: GateParams): """2/2 도면에서 측면 ogee 단면 프로파일 추출.""" doc = ezdxf.readfile(str(path)) msp = doc.modelspace() # 텍스트 주석도 보조적으로 스캔 (표고 보강) self._scan_text_annotations(msp, params) # CS-CONC-Spillway 레이어에서 가장 긴 LWPOLYLINE = ogee 프로파일 추정 best_poly = None best_len = 0 for e in msp: if e.dxf.layer != "CS-CONC-Spillway": continue if e.dxftype() != "LWPOLYLINE": continue try: pts = [(p[0], p[1]) for p in e.get_points()] if len(pts) > best_len: best_len = len(pts) best_poly = pts except Exception: pass if best_poly and len(best_poly) >= 5: # mm → m 변환, 로컬 좌표 정규화 arr = np.array(best_poly) # X 원점 = 상류측 시작, Z = 표고 (해발 m 변환 필요) # 단면 도면의 Y는 보통 해발표고(m)에 mm 스케일 x_local = (arr[:, 0] - arr[:, 0].min()) / 1000.0 # 도면의 Y는 해발에 mm scale (예: 12978mm는 EL.12.978m가 아님 주의) # 측면도에서 Y 범위 보고 해발 대응을 추정: Gate Sill/Weir Crest 위치 비교 # 간단화: 최저점 = el_upstream_bed, 스케일은 mm→m z_min_local = arr[:, 1].min() / 1000.0 z_shift = params.el_upstream_bed - z_min_local z_local = arr[:, 1] / 1000.0 + z_shift params.ogee_profile = list(zip(x_local.tolist(), z_local.tolist(), strict=False)) # ----- 부속 구조물 존재성 검출 (Phase A) ----- # # 구체 검출 로직은 optional_detector 모듈에 위임. 본 파서는 구조물 특유의 # 컴포넌트 명세(레이어 토큰·텍스트 키워드·default)만 선언. _COMPONENT_SPECS = None # lazy init (import 시 호출) @classmethod def _get_component_specs(cls): if cls._COMPONENT_SPECS is None: from optional_detector import ComponentSpec cls._COMPONENT_SPECS = [ ComponentSpec( name="service_bridge", layer_tokens=("bridge", "공도교", "공도", "관리도로", "service road"), text_keywords=("공도교", "service bridge", "관리교", "관리도로"), default=False, ), ComponentSpec( name="hoist_housings", layer_tokens=("hoist", "권양", "winch", "gantry"), text_keywords=("권양기", "hoist", "winch", "hoisting"), default=True, preserve_default_on_no_signal=True, # 별도 레이어 없이 그리는 경우 많음 ), ComponentSpec( name="downstream_apron", layer_tokens=("apron", "에이프런", "stilling", "물받이", "감세"), text_keywords=("에이프런", "apron", "물받이", "감세공", "stilling basin"), default=True, preserve_default_on_no_signal=True, # 시각 맥락용 ), ] return cls._COMPONENT_SPECS def _detect_flow_direction(self, msp, search_radius_mm: float = 10000.0, shaft_min_len_mm: float = 2000.0) -> tuple | None: """도면의 "FLOW" 텍스트와 인접 화살표(LINE 클러스터)에서 유수 방향 추출. 알고리즘: 1. TEXT/MTEXT에서 "FLOW", "흐름", "유수" 키워드 탐지 2. 각 텍스트 위치에서 반경 10m 내 LINE 수집 - 길이 ≥ 2m: shaft 후보 (가장 긴 것 선택) - 길이 < 2m: arrowhead 후보 3. shaft의 두 끝점 중 **arrowhead 점들이 더 밀집한 쪽이 tip** (arrowhead가 없으면 텍스트에서 더 먼 쪽을 tip으로 폴백) 4. 방향 = (tip - tail) 정규화 5. 여러 FLOW가 있으면 평균 Returns: (dx, dy) 단위벡터 or None (검출 실패) """ flow_dirs: list[tuple[float, float]] = [] # TEXT 엔티티 먼저 모아두고, LINE은 msp 한 번 더 순회 (메모리 효율) flow_texts = [] for e in msp: try: et = e.dxftype() if et not in ("TEXT", "MTEXT"): continue txt = e.dxf.text if et == "TEXT" else (e.text or "") except Exception: continue if not txt: continue tu = txt.strip().upper() if "FLOW" not in tu and "흐름" not in txt and "유수" not in txt: continue try: pos = e.dxf.insert flow_texts.append((float(pos.x), float(pos.y))) except Exception: continue if not flow_texts: return None # LINE 수집 (한 번만) all_lines: list[tuple[float, float, float, float, float]] = [] # (sx,sy,ex,ey,len) for e in msp: try: if e.dxftype() != "LINE": continue s = e.dxf.start en = e.dxf.end dx = en.x - s.x dy = en.y - s.y length = math.sqrt(dx * dx + dy * dy) all_lines.append((float(s.x), float(s.y), float(en.x), float(en.y), length)) except Exception: continue for (tx, ty) in flow_texts: shaft_cands = [] arrow_pts = [] for (sx, sy, ex, ey, L) in all_lines: mx = (sx + ex) * 0.5 my = (sy + ey) * 0.5 if math.hypot(mx - tx, my - ty) > search_radius_mm: continue if shaft_min_len_mm <= L: shaft_cands.append((L, sx, sy, ex, ey)) else: arrow_pts.append((sx, sy)) arrow_pts.append((ex, ey)) if not shaft_cands: continue shaft_cands.sort(key=lambda t: -t[0]) _, sx, sy, ex, ey = shaft_cands[0] s_end = (sx, sy); e_end = (ex, ey) # 어느 끝이 tip인가: arrowhead 점들이 더 밀집한 쪽 s_cnt = 0; e_cnt = 0 for (ax, ay) in arrow_pts: if math.hypot(ax - sx, ay - sy) < math.hypot(ax - ex, ay - ey): s_cnt += 1 else: e_cnt += 1 if s_cnt == 0 and e_cnt == 0: # arrowhead 검출 실패 → 텍스트에서 먼 쪽을 tip으로 가정 d_s = math.hypot(sx - tx, sy - ty) d_e = math.hypot(ex - tx, ey - ty) if d_s > d_e: tip, tail = s_end, e_end else: tip, tail = e_end, s_end elif s_cnt > e_cnt: tip, tail = s_end, e_end else: tip, tail = e_end, s_end dx = tip[0] - tail[0] dy = tip[1] - tail[1] L = math.hypot(dx, dy) if L < 1e-3: continue flow_dirs.append((dx / L, dy / L)) if not flow_dirs: return None # 평균 (여러 FLOW가 있으면 방향 일치성 확인 후 평균) avg_dx = sum(d[0] for d in flow_dirs) / len(flow_dirs) avg_dy = sum(d[1] for d in flow_dirs) / len(flow_dirs) L = math.hypot(avg_dx, avg_dy) if L < 0.3: # 여러 FLOW가 서로 반대 방향을 가리키면 벡터 합이 작아짐 → 신뢰도 낮음 return None return (avg_dx / L, avg_dy / L) def _detect_optional_components(self, msp, params: GateParams): """Phase A: 부속 구조물 존재 여부를 도면에서 검출해 has_* 플래그 갱신.""" from optional_detector import detect_components, summary_line reports = detect_components(msp, self._get_component_specs()) params.has_service_bridge = reports["service_bridge"].present params.has_hoist_housings = reports["hoist_housings"].present params.has_downstream_apron = reports["downstream_apron"].present params.raw_text_annotations.append((summary_line(reports), 0.0, 0.0)) # ----- Phase B' 평면 폴리곤 추출 ----- @staticmethod def _polygon_area(pts: list) -> float: """shoelace 면적 (부호 없음).""" if len(pts) < 3: return 0.0 a = 0.0 n = len(pts) for i in range(n): x1, y1 = pts[i][0], pts[i][1] x2, y2 = pts[(i + 1) % n][0], pts[(i + 1) % n][1] a += x1 * y2 - x2 * y1 return abs(a) * 0.5 @staticmethod def _is_closed(pts: list, tol: float = 1.0) -> bool: """첫점~끝점 거리가 tol 이하거나 동일점이면 폐합.""" if len(pts) < 3: return False dx = pts[0][0] - pts[-1][0] dy = pts[0][1] - pts[-1][1] return (dx * dx + dy * dy) ** 0.5 <= tol def _extract_plan_polygons(self, msp, params: GateParams, y_mid: float, ymin: float, ymax: float): """plan 영역에서 외곽/교각 폴리곤 추출 (3단계 폴백). 추출 성공 시 self._pier_origin_mm = (ox, oy)에 실제 사용된 origin을 기록 → bridge bbox가 같은 로컬 프레임에 정합.""" self._pier_origin_mm = None # 성공 시 각 경로에서 설정 # === 1차: 폐합 LWPOLYLINE === closed_polys: list[tuple[list, float]] = [] for e in msp: if e.dxf.layer != "CS-CONC-Spillway": continue if e.dxftype() != "LWPOLYLINE": continue try: pts = [(p[0], p[1]) for p in e.get_points()] except Exception: continue if not pts: continue mid_y = sum(p[1] for p in pts) / len(pts) if mid_y <= y_mid: continue is_closed_dxf = False with contextlib.suppress(Exception): is_closed_dxf = bool(e.closed) is_closed_geom = self._is_closed(pts, tol=5.0) if not (is_closed_dxf or is_closed_geom): continue area = self._polygon_area(pts) if area < 100.0: continue closed_polys.append((pts, area)) method_used = "none" expected_n_piers = params.n_gates + 1 if closed_polys: # 폐합 폴리곤 경로 (기존 로직) closed_polys.sort(key=lambda t: -t[1]) outline_pts, outline_area = closed_polys[0] arr_outline = np.array(outline_pts) x_min_mm = arr_outline[:, 0].min() y_min_mm = arr_outline[:, 1].min() self._pier_origin_mm = (x_min_mm, y_min_mm) xmin_m = x_min_mm / 1000.0 ymin_m = y_min_mm / 1000.0 params.plan_outline_polygon = [ ((x / 1000.0) - xmin_m, (y / 1000.0) - ymin_m) for x, y in outline_pts ] pier_min_area = outline_area * 0.015 pier_max_area = outline_area * 0.45 pier_polys_m = [] for pts, area in closed_polys[1:]: if not (pier_min_area <= area <= pier_max_area): continue pier_polys_m.append([ ((x / 1000.0) - xmin_m, (y / 1000.0) - ymin_m) for x, y in pts ]) params.pier_plan_polygons = pier_polys_m method_used = "closed_polylines" # 2차: polygon_reconstructor로 face enumeration (폐합 실패 또는 pier 부족) if len(params.pier_plan_polygons) != expected_n_piers: ok_recon = self._extract_piers_from_line_soup(msp, params, y_mid) if ok_recon: method_used = "face_enumeration" # 3차: 선 스윕 + 격자 정렬 (수평/수직 선 클러스터 → pier 경계 매칭) if len(params.pier_plan_polygons) != expected_n_piers: ok_sweep = self._extract_piers_from_vertical_clusters(msp, params, y_mid) if ok_sweep: method_used = "vertical_clusters" # 4차: bridge 실제 bbox 추출 — pier 추출 방법이 사용한 origin과 동일 프레임 if params.has_service_bridge and self._pier_origin_mm is not None: bridge_bbox_mm = self._extract_bridge_bbox_mm(msp) if bridge_bbox_mm is not None: ox, oy = self._pier_origin_mm bbox = ( (bridge_bbox_mm[0] - ox) / 1000.0, (bridge_bbox_mm[1] - oy) / 1000.0, (bridge_bbox_mm[2] - ox) / 1000.0, (bridge_bbox_mm[3] - oy) / 1000.0, ) params.bridge_plan_bbox = bbox # 사용자 편집 가능한 초기값으로 UI param 노출 (sanity 통과 가정 하) params.bridge_x_start = float(bbox[0]) params.bridge_y_start = float(bbox[1]) params.bridge_x_end = float(bbox[2]) params.bridge_y_end = float(bbox[3]) params.raw_text_annotations.append(( f"[plan_poly] outline_pts={len(params.plan_outline_polygon)}, " f"piers={len(params.pier_plan_polygons)}/{expected_n_piers}, " f"bridge_bbox={params.bridge_plan_bbox}, " f"method={method_used}", 0.0, 0.0 )) def _compute_plan_origin_mm(self, params: GateParams, msp) -> tuple | None: """pier_plan_polygons 또는 plan_outline_polygon의 DXF mm origin을 역산. 각 추출 방법이 이미 (0,0)-기준 로컬 좌표로 저장했으므로, DXF mm 좌표에서 해당 points의 실제 min X/Y를 찾아 origin으로 사용. 여러 폴백 경로를 시도. """ if params.pier_plan_polygons: # pier 폴리곤의 로컬 좌표에서 최소값은 (0,0)에 가까우므로, DXF 상의 # 실제 해당 point 위치를 찾기 위해 폴리곤의 mm 기준 bbox를 별도 계산 # → parser가 로컬 저장 시 사용한 origin과 동일해야 함. # 각 추출 메서드가 origin을 다르게 쓸 수 있으므로, plan 영역 CS-CONC-Spillway # 전체 geometry에서 pier 폴리곤 실제 위치를 역추정: # 간단히: plan 영역 CS-CONC-Spillway geometry의 bbox min을 origin으로 return self._plan_bbox_origin_mm(msp) if params.plan_outline_polygon: return self._plan_bbox_origin_mm(msp) return None def _plan_bbox_origin_mm(self, msp) -> tuple | None: """plan 영역 CS-CONC-Spillway 전체 geometry의 DXF mm bbox min 반환.""" try: from view_detector import detect_view_regions # noqa: F401 (protective availability check) except ImportError: return None for fn in getattr(self, "_cached_paths", []) or []: pass # 현재 context에서 path 접근 불가 — msp로 대체 # msp 기반 bbox: plan 영역 bounds를 view_detector 없이 추정하기 어려움. # 대신 pier_plan_polygons을 역산: 여러 추출 경로가 모두 "plan 영역 geometry # 최소점"을 origin으로 쓰므로, 여기서도 동일하게 계산. xs: list = [] ys: list = [] for e in msp: if e.dxf.layer != "CS-CONC-Spillway": continue try: if e.dxftype() == "LINE": xs.extend([e.dxf.start.x, e.dxf.end.x]) ys.extend([e.dxf.start.y, e.dxf.end.y]) elif e.dxftype() == "LWPOLYLINE": for p in e.get_points(): xs.append(p[0]) ys.append(p[1]) except Exception: continue if not xs: return None # plan 영역만: Y 중간값 이상만 남김 (기존 y_mid 휴리스틱과 동일) y_sorted = sorted(ys) y_mid = y_sorted[len(y_sorted) // 2] xs_plan = [x for x, y in zip(xs, ys, strict=False) if y > y_mid] ys_plan = [y for y in ys if y > y_mid] if not xs_plan: return None return (min(xs_plan), min(ys_plan)) def _extract_bridge_bbox_mm(self, msp) -> tuple | None: """Bridge 관련 레이어(CS-CONC-Bridge / 공도교 / 관리도로 등)의 DXF mm bbox (x_min, y_min, x_max, y_max) 반환. 없으면 None. 주의: 관리도로_수정은 보조 레이어라 제외(주 레이어만 사용). """ tokens = ("bridge", "공도교", "공도", "service road") xs: list = [] ys: list = [] for e in msp: try: layer = e.dxf.layer except Exception: continue lname = layer.lower() if not any(t.lower() in lname for t in tokens): continue try: if e.dxftype() == "LINE": xs.extend([e.dxf.start.x, e.dxf.end.x]) ys.extend([e.dxf.start.y, e.dxf.end.y]) elif e.dxftype() == "LWPOLYLINE": for p in e.get_points(): xs.append(p[0]) ys.append(p[1]) except Exception: continue if not xs: return None return (min(xs), min(ys), max(xs), max(ys)) def _extract_piers_from_line_soup(self, msp, params: GateParams, y_mid: float) -> bool: """plan 영역 CS-CONC-Spillway의 개방선(LINE + LWPOLYLINE 인접쌍)에서 **polygon_reconstructor로 폐합 영역(face)을 복원**해 outline/pier 분류. 알고리즘: 1. 선분 수집 (plan bounds 내부만) 2. polygon_reconstructor.reconstruct_polygons로 모든 face 복원 3. 면적 내림차순: 가장 큰 것은 bbox outer face → outline (또는 해당 face 제외하고 그 다음 것이 실제 구조물 외곽) 4. 외곽 면적의 1.5%~45% 범위 면을 pier 후보 반환: pier가 n_gates+1 개 검출되면 True, 아니면 False. """ # 1) 선분 수집 (원본 DXF mm 단위, plan 영역만) try: from polygon_reconstructor import reconstruct_polygons except ImportError: return False segs: list[tuple[tuple[float, float], tuple[float, float]]] = [] # plan_bounds_mm는 호출 컨텍스트에서 y_mid 계산에 반영됐으므로 # 여기서는 y > y_mid 필터만 일관 적용 for e in msp: if e.dxf.layer != "CS-CONC-Spillway": continue et = e.dxftype() if et == "LINE": try: s, en = e.dxf.start, e.dxf.end if (s.y + en.y) / 2 > y_mid: segs.append(((s.x, s.y), (en.x, en.y))) except Exception: pass elif et == "LWPOLYLINE": try: pts = [(p[0], p[1]) for p in e.get_points()] except Exception: continue if not pts or sum(p[1] for p in pts) / len(pts) <= y_mid: continue segs.extend(pairwise(pts)) # closed flag이면 마지막→첫 번째도 추가 try: if bool(getattr(e, "closed", False)): segs.append((pts[-1], pts[0])) except Exception: pass if not segs: return False # 2) Face 복원 (DXF mm 단위, tol=5mm — CAD 수치 오차 흡수) faces = reconstruct_polygons(segs, tol=5.0, min_area=1000.0, max_faces=3000) if not faces: # 최소한 bbox outline은 세팅 all_x = [p[0] for seg in segs for p in (seg[0], seg[1])] all_y = [p[1] for seg in segs for p in (seg[0], seg[1])] xmin_mm, xmax_mm = min(all_x), max(all_x) ymin_mm, ymax_mm = min(all_y), max(all_y) w = (xmax_mm - xmin_mm) / 1000.0 h = (ymax_mm - ymin_mm) / 1000.0 params.plan_outline_polygon = [ (0.0, 0.0), (w, 0.0), (w, h), (0.0, h) ] return False # 3) 가장 큰 face = 외곽, 그 다음부터 pier 후보 # (planar face enumeration에서 outer face가 항상 포함되진 않으나, # 포함돼도 본 필터에서 정상 처리됨 — 최대 면적을 outline으로 사용) outline_pts, outline_area = faces[0] # 좌표 원점 정규화 (plan 영역 bbox 원점을 0,0으로) arr_x = [p[0] for p in outline_pts] arr_y = [p[1] for p in outline_pts] x_min_mm = min(arr_x); y_min_mm = min(arr_y) self._pier_origin_mm = (x_min_mm, y_min_mm) params.plan_outline_polygon = [ ((x - x_min_mm) / 1000.0, (y - y_min_mm) / 1000.0) for x, y in outline_pts ] # Pier 후보: 외곽 면적의 1.5%~45% + 예상 pier 치수 범위(폭·길이 각 50~200%)에 부합 pier_min_area = outline_area * 0.015 pier_max_area = outline_area * 0.45 # pier_length는 보통 여수로 전체 길이와 비슷하므로 총 길이 사용 expected_pw = params.pier_width expected_pl = max(params.pier_length, params.total_length * 0.5) w_lo, w_hi = expected_pw * 0.5, expected_pw * 2.0 l_lo, l_hi = expected_pl * 0.5, expected_pl * 2.0 pier_polys: list[list[tuple]] = [] for pts, area in faces[1:]: if not (pier_min_area <= area <= pier_max_area): continue xs = [p[0] for p in pts] ys = [p[1] for p in pts] w_mm = max(xs) - min(xs) l_mm = max(ys) - min(ys) w_m = w_mm / 1000.0; l_m = l_mm / 1000.0 # 폭/길이 어느 쪽이 span 축인지 자동 판단 — 더 긴 쪽을 length로 매핑 if l_m < w_m: w_m, l_m = l_m, w_m if not (w_lo <= w_m <= w_hi): continue if not (l_lo <= l_m <= l_hi): continue pier_polys.append([ ((x - x_min_mm) / 1000.0, (y - y_min_mm) / 1000.0) for x, y in pts ]) params.pier_plan_polygons = pier_polys # 성공 기준: pier 수가 예상(n_gates+1)과 일치 expected = params.n_gates + 1 return len(pier_polys) == expected # ----- Phase B'' 선 스윕 + 격자 정렬 ----- def _extract_piers_from_vertical_clusters(self, msp, params: GateParams, y_mid: float) -> bool: """개방선으로 그려진 도면에서 수직 클러스터 gap 패턴으로 gate/pier 식별. 전략: parametric 수치(gate_width, pier_width)에 의존하지 않고 **gap 분포로 gate opening을 먼저 찾은 뒤**, 클러스터를 pier 영역으로 그룹핑해 bbox 폴리곤을 생성. 실측 DXF 흔한 파턴에 맞춰짐. 단계: 1) plan 영역 수직 세그먼트 수집 2) X 1D 클러스터링 → 각 클러스터 (x_avg, y_min, y_max, total_len) 3) 길이 필터 (pier_length × 0.3 이상 = 의미 있는 structural 수직선) 4) 인접 클러스터 gap 계산 → 상위 n_gates 개가 gate opening 5) gate opening 사이/양끝 영역을 pier 영역으로 그룹화 6) 각 pier 영역의 leftmost·rightmost 클러스터가 pier 경계, Y 범위는 그 영역 클러스터들의 교집합 7) 로컬 좌표 m로 변환해 pier_plan_polygons 저장 반환: 정확히 n_gates+1 개 pier 생성되면 True. """ n_expected = params.n_gates + 1 # 1) 수직 세그먼트 수집 verticals: list[tuple[float, float, float]] = [] # (x_avg, y1, y2) for e in msp: if e.dxf.layer != "CS-CONC-Spillway": continue et = e.dxftype() pairs: list[tuple[tuple, tuple]] = [] if et == "LINE": try: s, en = e.dxf.start, e.dxf.end pairs.append(((s.x, s.y), (en.x, en.y))) except Exception: continue elif et == "LWPOLYLINE": try: pts = [(p[0], p[1]) for p in e.get_points()] except Exception: continue pairs.extend(pairwise(pts)) for p1, p2 in pairs: dx = p2[0] - p1[0]; dy = p2[1] - p1[1] if abs(dy) < 100.0: continue if abs(dy) < 3.0 * abs(dx): continue if (p1[1] + p2[1]) / 2 <= y_mid: continue verticals.append(((p1[0] + p2[0]) / 2, min(p1[1], p2[1]), max(p1[1], p2[1]))) if not verticals: return False # 2) X 클러스터링 (greedy 1D, tol 500mm) verticals.sort(key=lambda v: v[0]) cluster_tol_mm = 500.0 clusters_raw: list[list] = [[verticals[0]]] for seg in verticals[1:]: if seg[0] - clusters_raw[-1][-1][0] < cluster_tol_mm: clusters_raw[-1].append(seg) else: clusters_raw.append([seg]) # 3) 특성 계산 + 길이 필터 length_threshold = max(params.pier_length * 0.3 * 1000.0, 3000.0) strong: list[tuple[float, float, float, float]] = [] # (x, y_min, y_max, total_len) for cl in clusters_raw: xs = [s[0] for s in cl] y_min = min(s[1] for s in cl) y_max = max(s[2] for s in cl) total_len = sum(s[2] - s[1] for s in cl) if total_len >= length_threshold: strong.append((sum(xs) / len(xs), y_min, y_max, total_len)) if len(strong) < 2 * n_expected: # 기대하는 수직선 수(pier당 2)가 안 나오면 실패 return False # 4) gap 분포 분석 → 상위 n_gates gap이 gate opening strong.sort(key=lambda c: c[0]) gaps = [(i, strong[i + 1][0] - strong[i][0]) for i in range(len(strong) - 1)] if not gaps: return False # 상위 n_gates gap 인덱스 sorted_gaps = sorted(gaps, key=lambda g: -g[1]) if len(sorted_gaps) < params.n_gates: return False gate_gap_indices = sorted([sorted_gaps[i][0] for i in range(params.n_gates)]) # 5) pier 영역 그룹화 pier_regions: list[list[tuple]] = [] prev = 0 for gi in gate_gap_indices: pier_regions.append(strong[prev:gi + 1]) # gi 포함 = pier 오른쪽 edge prev = gi + 1 pier_regions.append(strong[prev:]) if len(pier_regions) != n_expected: return False # 5a) 각 pier region이 최소 2개 클러스터 (left + right edge) 확보 for region in pier_regions: if len(region) < 2: return False # 6) pier 폴리곤 구성 (leftmost~rightmost X, Y 범위는 region 전체) pier_polys_mm: list[list[tuple]] = [] for region in pier_regions: left_x = region[0][0] right_x = region[-1][0] # Y 범위: region 클러스터들의 교집합 (공통으로 존재하는 수직 범위) y_min = max(c[1] for c in region) y_max = min(c[2] for c in region) # 교집합이 짧으면 합집합으로 폴백 (정보 보존) if y_max - y_min < params.pier_length * 1000.0 * 0.3: y_min = min(c[1] for c in region) y_max = max(c[2] for c in region) pier_polys_mm.append([ (left_x, y_min), (right_x, y_min), (right_x, y_max), (left_x, y_max), ]) # 7) 로컬 좌표 m 변환 (plan_outline_polygon이 있으면 그 origin 재사용, 없으면 pier bbox) all_x = [p[0] for poly in pier_polys_mm for p in poly] all_y = [p[1] for poly in pier_polys_mm for p in poly] ox_mm = min(all_x) oy_mm = min(all_y) self._pier_origin_mm = (ox_mm, oy_mm) if not params.plan_outline_polygon: # pier bbox를 outline으로 사용 (단순 사각형) w_m = (max(all_x) - ox_mm) / 1000.0 h_m = (max(all_y) - oy_mm) / 1000.0 # 약간의 여유 margin margin = 0.5 params.plan_outline_polygon = [ (-margin, -margin), (w_m + margin, -margin), (w_m + margin, h_m + margin), (-margin, h_m + margin), ] params.pier_plan_polygons = [ [((x - ox_mm) / 1000.0, (y - oy_mm) / 1000.0) for x, y in poly] for poly in pier_polys_mm ] return True # ----- 텍스트 주석 스캔 ----- # 핵심 키워드 → 파라미터 매핑 _ELEVATION_PATTERNS: ClassVar[list[tuple[str, str]]] = [ (r"Gate Sill\s*EL\.?\s*(\d+\.?\d*)", "el_gate_sill"), (r"Weir Crest\s*EL\.?\s*(\d+\.?\d*)", "el_weir_crest"), (r"Gate Top\s*EL\.?\s*(\d+\.?\d*)", "el_gate_top"), (r"Trunnion(?: Pin)?\s*EL\.?\s*(\d+\.?\d*)", "el_trunnion_pin"), (r"Stoplog Sill\s*EL\.?\s*(\d+\.?\d*)", "el_stoplog_sill"), (r"M\.?W\.?L\.?\s*EL\.?\s*(\d+\.?\d*)", "el_mwl"), (r"N\.?H\.?W\.?L\.?\s*EL\.?\s*(\d+\.?\d*)", "el_nhwl"), (r"F\.?W\.?L\.?\s*EL\.?\s*(\d+\.?\d*)", "el_nhwl"), # F.W.L ≈ N.H.W.L (r"L\.?W\.?L\.?\s*EL\.?\s*(\d+\.?\d*)", "el_lwl"), ] _GATE_SPEC_PATTERN = re.compile( r"W\s*(\d+\.?\d*)\s*m?\s*[xX×]\s*H\s*(\d+\.?\d*)\s*m?\s*[xX×]\s*(\d+)\s*(?:문|門|bay)", re.IGNORECASE, ) def _scan_text_annotations(self, msp, params: GateParams): """TEXT, MTEXT 주석을 스캔하여 사양/표고 파싱.""" for e in msp: et = e.dxftype() if et not in ("TEXT", "MTEXT"): continue try: txt = e.dxf.text if et == "TEXT" else (e.text or "") txt = txt.strip() except Exception: continue if not txt: continue # 원본 주석 보관 try: pos = e.dxf.insert params.raw_text_annotations.append((txt, pos.x, pos.y)) except Exception: params.raw_text_annotations.append((txt, 0.0, 0.0)) # 수문 사양: "W15.0m x H7.0m x 3문" m = self._GATE_SPEC_PATTERN.search(txt) if m: params.gate_width = float(m.group(1)) params.gate_height = float(m.group(2)) params.n_gates = int(m.group(3)) # 표고 패턴 for pattern, field_name in self._ELEVATION_PATTERNS: mm = re.search(pattern, txt, re.IGNORECASE) if mm: try: val = float(mm.group(1)) if hasattr(params, field_name): setattr(params, field_name, val) except Exception: pass # ----- 수문 위치 추출 (치수선 기반) ----- def _extract_gate_positions_from_dims(self, msp, params: GateParams): """DIMENSION 엔티티에서 'Clear Span'을 찾아 수문 중심 X 좌표 보정.""" clear_span_dims = [] for e in msp: if e.dxftype() != "DIMENSION": continue try: text = e.dxf.get("text", "") meas = e.dxf.get("actual_measurement", None) if meas is None or meas < 1000 or meas > 100000: continue # "Clear Span <>" 패턴 또는 15000 근처의 수평 치수 is_clear_span = "Clear Span" in text or abs(meas - 15000) < 500 dp1 = e.dxf.defpoint dp2 = e.dxf.defpoint2 if e.dxf.hasattr("defpoint2") else dp1 dx = abs(dp2.x - dp1.x) dy = abs(dp2.y - dp1.y) # 수평 치수 (dx > dy) if dx > dy and is_clear_span: # dp1과 dp2의 중점 X (그리고 평면영역 Y > 45000 인지 확인) mid_x = (dp1.x + dp2.x) / 2 mid_y = (dp1.y + dp2.y) / 2 if mid_y > 40000: # 평면영역 clear_span_dims.append(mid_x / 1000.0) # mm→m except Exception: continue if clear_span_dims: clear_span_dims.sort() # 중복 제거 (동일 X 근처) unique = [] for x in clear_span_dims: if not unique or abs(x - unique[-1]) > params.gate_width * 0.5: unique.append(x) if len(unique) >= 2: # 최소 2개 이상 확인되면 신뢰 # 로컬 좌표계로 변환 x_min = min(unique) params.gate_centers_x = [x - x_min + params.gate_width / 2 for x in unique] params.n_gates = len(unique) # ----- 누락 파라미터 추론 ----- def _infer_missing_params(self, params: GateParams): """파싱되지 않은 파라미터를 합리적 기본값/추론값으로 채움.""" # 수문 높이: Gate Top - Gate Sill if abs(params.gate_height - 7.0) < 0.01: # 기본값 상태면 diff = params.el_gate_top - params.el_gate_sill if 4.0 < diff < 15.0: params.gate_height = diff # pier_count = n_gates + 1 params.pier_count = params.n_gates + 1 # 교각 폭 추론: gate_centers_x가 있으면 인접 수문 간격에서 계산 if len(params.gate_centers_x) >= 2: # 인접 수문 중심 간격 = gate_width + pier_width spacings = [params.gate_centers_x[i+1] - params.gate_centers_x[i] for i in range(len(params.gate_centers_x) - 1)] avg_spacing = sum(spacings) / len(spacings) inferred_pw = avg_spacing - params.gate_width if 1.0 <= inferred_pw <= 8.0: params.pier_width = inferred_pw else: params.pier_width = 3.0 else: total_gate_w = params.n_gates * params.gate_width if params.total_span > total_gate_w: inferred = (params.total_span - total_gate_w) / (params.n_gates + 1) params.pier_width = inferred if 1.5 <= inferred <= 3.5 else 3.0 else: params.pier_width = 3.0 # gate_centers_x가 비어있으면 균등 배치 if not params.gate_centers_x: pw = params.pier_width gw = params.gate_width params.gate_centers_x = [ pw + gw * 0.5 + i * (gw + pw) for i in range(params.n_gates) ] # 구조물 유효 폭 (수문+교각만, wing wall 제외) pw = params.pier_width gw = params.gate_width effective_span = params.n_gates * gw + (params.n_gates + 1) * pw # gate_centers_x를 로컬 좌표계(0 ~ effective_span)로 재정렬 if params.gate_centers_x: offset = params.gate_centers_x[0] - (pw + gw / 2) params.gate_centers_x = [x - offset for x in params.gate_centers_x] params.total_span = effective_span # ogee_profile이 비어있으면 표준 ogee 곡선으로 생성 if not params.ogee_profile: params.ogee_profile = self._default_ogee_profile(params) def _default_ogee_profile(self, params: GateParams) -> list: """표준 ogee 여수로 단면을 표고 기반으로 생성. 간략화된 crest → 경사면 → apron 프로파일. 점 목록 [(x_along_flow, z_elevation)] """ crest_el = params.el_weir_crest sill_el = params.el_gate_sill upstream_bed = params.el_upstream_bed downstream_bed = params.el_downstream # 상류 수직 → 크레스트 정상 → 하류 곡선 → 수평 apron profile = [ (0.0, upstream_bed), # 상류 바닥 (0.0, crest_el - 2.0), # 상류 옹벽 상단 (수면 아래) (1.0, crest_el), # 크레스트 정점 (3.0, crest_el - 1.5), # 하류 곡선 시작 (7.0, sill_el + 2.0), # 하류 경사면 중간 (12.0, sill_el), # 게이트 sill 레벨 (20.0, sill_el), # 평탄 apron (25.0, downstream_bed), # 하류 바닥 ] return profile # --------------------------------------------------------------------------- # 편의 함수 # --------------------------------------------------------------------------- def parse_gate_dxf(plan_dxf: str, section_dxf: str | None = None) -> GateParams: """간편 호출 인터페이스.""" parser = GateParser() return parser.parse(plan_dxf, section_dxf) if __name__ == "__main__": # 샘플 테스트 import sys if len(sys.argv) >= 3: params = parse_gate_dxf(sys.argv[1], sys.argv[2]) elif len(sys.argv) == 2: params = parse_gate_dxf(sys.argv[1]) else: # Gate_Sample 기본 경로 base = Path("Gate_Sample") f1 = base / "12995740-M40-001 여수로 수문 설치도(1/2).dxf" f2 = base / "12995740-M40-002 여수로 수문 설치도(2/2).dxf" params = parse_gate_dxf(str(f1), str(f2)) print(params.summary()) print() print(f"ogee profile: {len(params.ogee_profile)} points") if params.ogee_profile: for x, z in params.ogee_profile[:10]: print(f" ({x:.2f}m, EL.{z:.2f})")