S-CANVAS (Saman Corp.) — DXF + DEM + AI 기반 3D 조감도 생성 엔진. ~24k LOC Python (scanvas_maker.py 7072 LOC GUI + 구조물 파서/빌더 다수). 이 커밋은 7-iter cleanup이 적용된 상태로 import: - F821 8 + B023 6: 비동기 lambda + except/loop 변수 캡처 NameError (Py3.13에서 reproduce 확인된 진짜 버그) - RUF012 4 + RUF013 1: ClassVar / implicit Optional 명시화 - F811/B905/B904/F401/F841/W293/F541/UP/SIM/RUF/PLR 700+ cleanup/modernization 신규 파일: - ruff.toml: target=py313, Korean unicode/저자 스타일/도메인 복잡도 무력화 - requirements-py313.txt: pyproj>=3.7, scipy>=1.14, numpy>=2.0.2 (Py3.13 wheel) - .gitignore: gcp-key.json, 캐시, 백업, 생성 이미지 제외 검증: ruff 0 errors, py_compile 0 errors, import 33/33 OK on Py3.13.13. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
1216 lines
52 KiB
Python
1216 lines
52 KiB
Python
"""여수로 수문 설치도 DXF 파서.
|
||
|
||
여수로(spillway) 수문 설치도(설계도)에서 수문 구조물의 핵심 파라미터를
|
||
자동 추출한다. 보통 1/2 도면(평면+정면), 2/2 도면(측면단면)의 쌍으로 제공됨.
|
||
|
||
추출되는 핵심 파라미터:
|
||
- 수문 개수(n_gates), 수문 폭(gate_width), 수문 높이(gate_height)
|
||
- 각 주요 표고(Gate Sill, Weir Crest, Gate Top, Trunnion Pin, 수위들)
|
||
- 평면도 외곽(ogee 단면이 extrude될 span 방향)
|
||
- 측면 단면 외곽(ogee profile polyline)
|
||
- 교각(pier) 위치 목록
|
||
|
||
사용법:
|
||
parser = GateParser()
|
||
params = parser.parse(plan_dxf_path, section_dxf_path)
|
||
"""
|
||
|
||
from __future__ import annotations
|
||
|
||
import contextlib
|
||
import math
|
||
import re
|
||
from dataclasses import dataclass, field
|
||
from itertools import pairwise
|
||
from pathlib import Path
|
||
from typing import ClassVar
|
||
|
||
import ezdxf
|
||
import numpy as np
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# 데이터 클래스
|
||
# ---------------------------------------------------------------------------
|
||
|
||
@dataclass
|
||
class GateParams:
|
||
"""여수로 수문 구조물 파라미터 (단위: m, 표고는 해발기준 m)."""
|
||
|
||
# 수문 본체
|
||
n_gates: int = 3
|
||
gate_width: float = 15.0 # 수문 폭 (clear span)
|
||
gate_height: float = 7.0 # 수문 높이 (sill→top)
|
||
|
||
# 교각 (pier)
|
||
pier_count: int = 4 # 보통 n_gates + 1
|
||
pier_width: float = 3.0 # 교각 폭 (유출방향 수직)
|
||
pier_length: float = 25.0 # 교각 길이 (유출방향)
|
||
|
||
# 주요 표고 (m, 해발)
|
||
el_gate_sill: float = 46.700
|
||
el_stoplog_sill: float = 46.000 # Stoplog sill (파서에서 추출; 현재 빌더 미사용)
|
||
el_weir_crest: float = 47.000
|
||
el_gate_top: float = 53.700
|
||
el_trunnion_pin: float = 50.200
|
||
el_mwl: float = 53.830 # M.W.L 최대수위
|
||
el_nhwl: float = 52.500 # N.H.W.L 상시만수위
|
||
el_lwl: float = 45.000 # L.W.L 최저수위
|
||
el_downstream: float = 44.000
|
||
el_upstream_bed: float = 41.500
|
||
el_bridge_top: float = 56.000 # 공도교 상면
|
||
|
||
# 구조 전체 (평면)
|
||
total_span: float = 75.0 # dam axis 방향 전체 폭
|
||
total_length: float = 25.0 # 유출방향 전체 길이
|
||
dam_axis_y: float = 0.0 # dam axis의 Y 좌표 (로컬)
|
||
|
||
# 단면 프로파일 (ogee) - 로컬 좌표계로 정규화된 점 목록 [(x, z), ...]
|
||
# x: 유출방향 (0=상류 시작), z: 표고 (해발)
|
||
ogee_profile: list = field(default_factory=list)
|
||
|
||
# 평면 외곽 (상류측/하류측 경계)
|
||
plan_outline_upstream: list = field(default_factory=list)
|
||
plan_outline_downstream: list = field(default_factory=list)
|
||
|
||
# 수문 게이트 중심선 (평면 X 좌표, 순서대로) - 로컬 기준
|
||
gate_centers_x: list = field(default_factory=list)
|
||
|
||
# 평면 방향 보정 (detail DXF 내에서 mesh의 +X축이 가리키는 각도)
|
||
# PCA로 plan outline 주축을 찾아 mesh 빌더 좌표계와 detail DXF 좌표계의
|
||
# 상대 회전을 표현. 기본 0.0 = detail의 +X가 span 방향 (수평 그려진 경우).
|
||
plan_frame_angle_deg: float = 0.0
|
||
|
||
# 부속 구조물 존재 여부 (도면 검출 결과로 갱신; 구조적 요소는 default False, 시각은 True)
|
||
has_service_bridge: bool = False # 공도교 (수문 상부 service bridge)
|
||
has_hoist_housings: bool = True # 여수로 개폐장치 — 래디얼 게이트엔 보통 필수 (파서가 hoist 관련 레이어/블록 검출 시 확정 유지, 없으면 False로 낮춤; 플래그 이름은 직렬화 호환 위해 보존)
|
||
has_downstream_apron: bool = True # 하류 에이프런 — 시각 맥락, 사용자 토글 가능
|
||
has_water_surface: bool = True # 상류 수면 표시 — 시각 맥락, 사용자 토글 가능
|
||
|
||
# Phase B' — 실제 도면 기하 (parametric 폴백의 우선 대체)
|
||
# plan 영역 CS-CONC 레이어의 폐합 폴리라인 (chamber-local 좌표, m 단위)
|
||
plan_outline_polygon: list = field(default_factory=list) # 외곽: [(x,y), ...]
|
||
pier_plan_polygons: list = field(default_factory=list) # 각 교각: [[(x,y), ...], ...]
|
||
# 공도교(service bridge)의 실제 plan bbox — (x0, y0, x1, y1) local m, None이면 폴백
|
||
bridge_plan_bbox: tuple | None = None
|
||
# 공도교 Y 방향 두께(도면 기반 실측). None이면 bbox Y 길이 사용.
|
||
bridge_deck_thickness_m: float = 1.2
|
||
|
||
# 사용자 직접 지정 공도교 위치 (UI 편집 가능; 4개 값 모두 유효하면 다른 경로 대신 사용)
|
||
# 기본 None/-1: 사용자 미입력으로 간주. 양수로 편집 시 override 적용.
|
||
bridge_x_start: float | None = None
|
||
bridge_x_end: float | None = None
|
||
bridge_y_start: float | None = None
|
||
bridge_y_end: float | None = None
|
||
|
||
# FLOW 화살표로 검출된 유수 방향 단위벡터 (DXF XY frame, dx²+dy²=1)
|
||
# None이면 PCA만 사용 (span 180° 모호성 존재). 검출 시 plan_frame_angle_deg가
|
||
# 전체 -180..180 범위로 정확 설정됨.
|
||
flow_direction_2d: tuple | None = None
|
||
|
||
# 메타데이터
|
||
source_files: list = field(default_factory=list)
|
||
raw_text_annotations: list = field(default_factory=list)
|
||
|
||
def summary(self) -> str:
|
||
"""요약 텍스트 반환."""
|
||
return (
|
||
f"Gate: {self.n_gates}문 × W{self.gate_width:.1f}m × H{self.gate_height:.1f}m\n"
|
||
f" Sill EL.{self.el_gate_sill:.2f} / Crest EL.{self.el_weir_crest:.2f} / "
|
||
f"Top EL.{self.el_gate_top:.2f} / Trunnion EL.{self.el_trunnion_pin:.2f}\n"
|
||
f" W.L: MWL {self.el_mwl:.2f} / NHWL {self.el_nhwl:.2f} / LWL {self.el_lwl:.2f}\n"
|
||
f" Total: {self.total_span:.1f}m × {self.total_length:.1f}m\n"
|
||
f" 부속: 공도교={'O' if self.has_service_bridge else 'X'}, "
|
||
f"개폐장치={'O' if self.has_hoist_housings else 'X'}, "
|
||
f"에이프런={'O' if self.has_downstream_apron else 'X'}, "
|
||
f"수면={'O' if self.has_water_surface else 'X'}\n"
|
||
f" 기하: plan_outline={len(self.plan_outline_polygon)}pts, "
|
||
f"piers={len(self.pier_plan_polygons)}\n"
|
||
f" Gate centers (X): {[f'{x:.1f}' for x in self.gate_centers_x]}"
|
||
)
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# 파서
|
||
# ---------------------------------------------------------------------------
|
||
|
||
class GateParser:
|
||
"""여수로 수문 설치도 파서."""
|
||
|
||
# 구조 레이어 (concrete 본체 geometry)
|
||
STRUCT_LAYERS: ClassVar[set[str]] = {
|
||
"CS-CONC-Spillway",
|
||
"CS-CONC-Bridge",
|
||
}
|
||
|
||
def parse(self, plan_dxf: str | Path, section_dxf: str | Path | None = None) -> GateParams:
|
||
"""평면/정면 도면과 측면단면 도면을 파싱.
|
||
|
||
Args:
|
||
plan_dxf: 1/2 도면 (평면 + 정면). 필수.
|
||
section_dxf: 2/2 도면 (측면 단면). 선택적, 있으면 ogee 프로파일 추출.
|
||
|
||
Returns:
|
||
GateParams
|
||
"""
|
||
params = GateParams()
|
||
params.source_files.append(str(plan_dxf))
|
||
|
||
# Plan + Elevation 도면 파싱
|
||
self._parse_plan_file(Path(plan_dxf), params)
|
||
|
||
# Section 도면 파싱 (있으면)
|
||
if section_dxf:
|
||
params.source_files.append(str(section_dxf))
|
||
self._parse_section_file(Path(section_dxf), params)
|
||
|
||
# 누락된 파라미터는 기본값 + 추론
|
||
self._infer_missing_params(params)
|
||
|
||
return params
|
||
|
||
# ----- Plan/Elevation 도면 파싱 -----
|
||
|
||
def _parse_plan_file(self, path: Path, params: GateParams):
|
||
"""1/2 도면에서 수문 개수, 크기, 평면 배치, 표고를 추출 (Phase D 통합).
|
||
|
||
view_detector로 평면도/입면도/측면도 영역을 분리한 뒤:
|
||
- 평면도 영역 내 geometry → 외곽·교각·게이트 위치
|
||
- 입면도 영역 내 geometry → 게이트 높이 등 (현재는 보조, 추후 확장)
|
||
view_detector가 영역을 못 찾으면 기존 y_mid 휴리스틱으로 폴백.
|
||
"""
|
||
doc = ezdxf.readfile(str(path))
|
||
msp = doc.modelspace()
|
||
|
||
# 0) 부속 구조물 존재성 검출 (공도교 등)
|
||
self._detect_optional_components(msp, params)
|
||
|
||
# 1) 텍스트 주석 스캔 → 수문 사양 + 표고 (전체 스캔, view 무관)
|
||
self._scan_text_annotations(msp, params)
|
||
|
||
# 2) 뷰 영역 식별 (Phase D): view_detector 우선, 실패 시 y_mid 폴백
|
||
plan_bounds_mm = None # (x0, y0, x1, y1) — 파서는 mm 좌표로 filter
|
||
try:
|
||
from view_detector import detect_view_regions
|
||
views = detect_view_regions(str(path))
|
||
plan_view = next((v for v in views if v.view_type == "plan"), None)
|
||
if plan_view is not None:
|
||
# view bounds는 m 단위, 파서는 mm 단위로 작업 → 1000배 변환
|
||
b = plan_view.bounds # (xmin_m, ymin_m, xmax_m, ymax_m)
|
||
plan_bounds_mm = (b[0] * 1000.0, b[1] * 1000.0,
|
||
b[2] * 1000.0, b[3] * 1000.0)
|
||
params.raw_text_annotations.append((
|
||
f"[view] plan detected: "
|
||
f"x=[{b[0]:.1f},{b[2]:.1f}]m y=[{b[1]:.1f},{b[3]:.1f}]m "
|
||
f"(W={plan_view.width:.1f}m, H={plan_view.height:.1f}m)",
|
||
0.0, 0.0
|
||
))
|
||
except Exception as _e:
|
||
pass
|
||
|
||
# plan bounds로 필터할 함수 정의
|
||
def _in_plan(x, y):
|
||
if plan_bounds_mm is not None:
|
||
x0, y0, x1, y1 = plan_bounds_mm
|
||
return x0 <= x <= x1 and y0 <= y <= y1
|
||
# 폴백: 뒤에서 계산할 y_mid 사용. 임시로 True 반환해 전체 처리.
|
||
return True
|
||
|
||
# 폴백용 y_mid 계산 (view_detector 실패 시)
|
||
y_values = []
|
||
for e in msp:
|
||
if e.dxf.layer != "CS-CONC-Spillway":
|
||
continue
|
||
with contextlib.suppress(Exception):
|
||
if e.dxftype() == "LWPOLYLINE":
|
||
y_values.extend(p[1] for p in e.get_points())
|
||
elif e.dxftype() == "LINE":
|
||
y_values.extend([e.dxf.start.y, e.dxf.end.y])
|
||
|
||
if not y_values:
|
||
return
|
||
|
||
ymin = min(y_values)
|
||
ymax = max(y_values)
|
||
y_mid = (ymin + ymax) / 2
|
||
|
||
# view_detector가 plan을 찾았으면 그걸로 교체; 아니면 y_mid 유지
|
||
if plan_bounds_mm is not None:
|
||
# plan y-range의 상한을 유지, 하한을 plan 영역 y0로
|
||
ymin = plan_bounds_mm[1]
|
||
ymax = plan_bounds_mm[3]
|
||
y_mid = ymin # 기존 로직에서 mid 이상만 plan으로 취급하므로 plan 하한을 mid로
|
||
|
||
# 3) 평면 영역 (Y > y_mid) 지오메트리 추출
|
||
plan_pts_all = []
|
||
for e in msp:
|
||
if e.dxf.layer not in self.STRUCT_LAYERS:
|
||
continue
|
||
try:
|
||
if e.dxftype() == "LWPOLYLINE":
|
||
pts = [(p[0], p[1]) for p in e.get_points()]
|
||
if pts and np.mean([p[1] for p in pts]) > y_mid:
|
||
plan_pts_all.extend(pts)
|
||
elif e.dxftype() == "LINE":
|
||
s, en = e.dxf.start, e.dxf.end
|
||
if (s.y + en.y) / 2 > y_mid:
|
||
plan_pts_all.extend([(s.x, s.y), (en.x, en.y)])
|
||
except Exception:
|
||
pass
|
||
|
||
if plan_pts_all:
|
||
arr = np.array(plan_pts_all)
|
||
params.total_span = float(arr[:, 0].max() - arr[:, 0].min()) / 1000.0 # mm→m
|
||
params.total_length = float(arr[:, 1].max() - arr[:, 1].min()) / 1000.0
|
||
|
||
# --- 평면 방향 결정 ---
|
||
# 1순위: FLOW 화살표 (사용자 지정 유수 방향 → 180° 모호성 해소)
|
||
# 2순위: plan geometry PCA 주축 (부호 모호성 존재, -90..+90 정규화)
|
||
flow_dir = self._detect_flow_direction(msp)
|
||
if flow_dir is not None:
|
||
params.flow_direction_2d = flow_dir
|
||
fx, fy = flow_dir
|
||
# 배치 파이프라인 `fit_meshes_to_quad`는 CW quad 기본값으로
|
||
# mesh Y를 먼저 반전(flip_y_for_cw_quad=True) 후 회전 적용.
|
||
# mesh +Y(빌더 downstream 컨벤션)가 world flow 방향과 일치하려면,
|
||
# 유도: (0,1) → Y-flip → (0,-1) → rotate(-span) → (-sin(span),-cos(span))
|
||
# 이 벡터가 flow=(fx,fy)와 같아야 함 → sin(span)=-fx, cos(span)=-fy
|
||
# → span = atan2(-fx, -fy)
|
||
span_angle_rad = math.atan2(-fx, -fy)
|
||
span_angle_deg = math.degrees(span_angle_rad)
|
||
while span_angle_deg > 180: span_angle_deg -= 360
|
||
while span_angle_deg <= -180: span_angle_deg += 360
|
||
params.plan_frame_angle_deg = span_angle_deg
|
||
params.raw_text_annotations.append((
|
||
f"[flow] detected flow_dir=({fx:+.3f},{fy:+.3f}) "
|
||
f"→ plan_frame_angle={span_angle_deg:+.1f}°",
|
||
0.0, 0.0
|
||
))
|
||
else:
|
||
try:
|
||
centered = arr - arr.mean(axis=0)
|
||
cov = np.cov(centered.T)
|
||
eigvals, eigvecs = np.linalg.eigh(cov)
|
||
idx = int(np.argmax(eigvals))
|
||
main_axis = eigvecs[:, idx]
|
||
span_angle_rad = math.atan2(float(main_axis[1]), float(main_axis[0]))
|
||
span_angle_deg = math.degrees(span_angle_rad)
|
||
while span_angle_deg > 90: span_angle_deg -= 180
|
||
while span_angle_deg < -90: span_angle_deg += 180
|
||
params.plan_frame_angle_deg = span_angle_deg
|
||
except Exception:
|
||
params.plan_frame_angle_deg = 0.0
|
||
|
||
# 수문 중심 추정: 평면 폭을 n_gates로 균등 분할
|
||
# 정확한 위치는 아래 4) 단계에서 치수선으로 보정
|
||
x_min = float(arr[:, 0].min()) / 1000.0
|
||
# 기본 추정
|
||
if not params.gate_centers_x:
|
||
gate_spacing = params.total_span / params.n_gates
|
||
params.gate_centers_x = [
|
||
x_min + gate_spacing * (i + 0.5) for i in range(params.n_gates)
|
||
]
|
||
|
||
# 4) DIMENSION 엔티티 → clear span 확인 및 수문 위치 보정
|
||
self._extract_gate_positions_from_dims(msp, params)
|
||
|
||
# 5) Phase B' — 평면 폴리곤/교각 폴리곤 직접 추출
|
||
self._extract_plan_polygons(msp, params, y_mid, ymin, ymax)
|
||
|
||
def _parse_section_file(self, path: Path, params: GateParams):
|
||
"""2/2 도면에서 측면 ogee 단면 프로파일 추출."""
|
||
doc = ezdxf.readfile(str(path))
|
||
msp = doc.modelspace()
|
||
|
||
# 텍스트 주석도 보조적으로 스캔 (표고 보강)
|
||
self._scan_text_annotations(msp, params)
|
||
|
||
# CS-CONC-Spillway 레이어에서 가장 긴 LWPOLYLINE = ogee 프로파일 추정
|
||
best_poly = None
|
||
best_len = 0
|
||
|
||
for e in msp:
|
||
if e.dxf.layer != "CS-CONC-Spillway":
|
||
continue
|
||
if e.dxftype() != "LWPOLYLINE":
|
||
continue
|
||
try:
|
||
pts = [(p[0], p[1]) for p in e.get_points()]
|
||
if len(pts) > best_len:
|
||
best_len = len(pts)
|
||
best_poly = pts
|
||
except Exception:
|
||
pass
|
||
|
||
if best_poly and len(best_poly) >= 5:
|
||
# mm → m 변환, 로컬 좌표 정규화
|
||
arr = np.array(best_poly)
|
||
# X 원점 = 상류측 시작, Z = 표고 (해발 m 변환 필요)
|
||
# 단면 도면의 Y는 보통 해발표고(m)에 mm 스케일
|
||
x_local = (arr[:, 0] - arr[:, 0].min()) / 1000.0
|
||
# 도면의 Y는 해발에 mm scale (예: 12978mm는 EL.12.978m가 아님 주의)
|
||
# 측면도에서 Y 범위 보고 해발 대응을 추정: Gate Sill/Weir Crest 위치 비교
|
||
# 간단화: 최저점 = el_upstream_bed, 스케일은 mm→m
|
||
z_min_local = arr[:, 1].min() / 1000.0
|
||
z_shift = params.el_upstream_bed - z_min_local
|
||
z_local = arr[:, 1] / 1000.0 + z_shift
|
||
|
||
params.ogee_profile = list(zip(x_local.tolist(), z_local.tolist(), strict=False))
|
||
|
||
# ----- 부속 구조물 존재성 검출 (Phase A) -----
|
||
#
|
||
# 구체 검출 로직은 optional_detector 모듈에 위임. 본 파서는 구조물 특유의
|
||
# 컴포넌트 명세(레이어 토큰·텍스트 키워드·default)만 선언.
|
||
|
||
_COMPONENT_SPECS = None # lazy init (import 시 호출)
|
||
|
||
@classmethod
|
||
def _get_component_specs(cls):
|
||
if cls._COMPONENT_SPECS is None:
|
||
from optional_detector import ComponentSpec
|
||
cls._COMPONENT_SPECS = [
|
||
ComponentSpec(
|
||
name="service_bridge",
|
||
layer_tokens=("bridge", "공도교", "공도", "관리도로", "service road"),
|
||
text_keywords=("공도교", "service bridge", "관리교", "관리도로"),
|
||
default=False,
|
||
),
|
||
ComponentSpec(
|
||
name="hoist_housings",
|
||
layer_tokens=("hoist", "권양", "winch", "gantry"),
|
||
text_keywords=("권양기", "hoist", "winch", "hoisting"),
|
||
default=True,
|
||
preserve_default_on_no_signal=True, # 별도 레이어 없이 그리는 경우 많음
|
||
),
|
||
ComponentSpec(
|
||
name="downstream_apron",
|
||
layer_tokens=("apron", "에이프런", "stilling", "물받이", "감세"),
|
||
text_keywords=("에이프런", "apron", "물받이", "감세공", "stilling basin"),
|
||
default=True,
|
||
preserve_default_on_no_signal=True, # 시각 맥락용
|
||
),
|
||
]
|
||
return cls._COMPONENT_SPECS
|
||
|
||
def _detect_flow_direction(self, msp,
|
||
search_radius_mm: float = 10000.0,
|
||
shaft_min_len_mm: float = 2000.0) -> tuple | None:
|
||
"""도면의 "FLOW" 텍스트와 인접 화살표(LINE 클러스터)에서 유수 방향 추출.
|
||
|
||
알고리즘:
|
||
1. TEXT/MTEXT에서 "FLOW", "흐름", "유수" 키워드 탐지
|
||
2. 각 텍스트 위치에서 반경 10m 내 LINE 수집
|
||
- 길이 ≥ 2m: shaft 후보 (가장 긴 것 선택)
|
||
- 길이 < 2m: arrowhead 후보
|
||
3. shaft의 두 끝점 중 **arrowhead 점들이 더 밀집한 쪽이 tip**
|
||
(arrowhead가 없으면 텍스트에서 더 먼 쪽을 tip으로 폴백)
|
||
4. 방향 = (tip - tail) 정규화
|
||
5. 여러 FLOW가 있으면 평균
|
||
|
||
Returns:
|
||
(dx, dy) 단위벡터 or None (검출 실패)
|
||
"""
|
||
flow_dirs: list[tuple[float, float]] = []
|
||
# TEXT 엔티티 먼저 모아두고, LINE은 msp 한 번 더 순회 (메모리 효율)
|
||
flow_texts = []
|
||
for e in msp:
|
||
try:
|
||
et = e.dxftype()
|
||
if et not in ("TEXT", "MTEXT"):
|
||
continue
|
||
txt = e.dxf.text if et == "TEXT" else (e.text or "")
|
||
except Exception:
|
||
continue
|
||
if not txt:
|
||
continue
|
||
tu = txt.strip().upper()
|
||
if "FLOW" not in tu and "흐름" not in txt and "유수" not in txt:
|
||
continue
|
||
try:
|
||
pos = e.dxf.insert
|
||
flow_texts.append((float(pos.x), float(pos.y)))
|
||
except Exception:
|
||
continue
|
||
|
||
if not flow_texts:
|
||
return None
|
||
|
||
# LINE 수집 (한 번만)
|
||
all_lines: list[tuple[float, float, float, float, float]] = [] # (sx,sy,ex,ey,len)
|
||
for e in msp:
|
||
try:
|
||
if e.dxftype() != "LINE":
|
||
continue
|
||
s = e.dxf.start
|
||
en = e.dxf.end
|
||
dx = en.x - s.x
|
||
dy = en.y - s.y
|
||
length = math.sqrt(dx * dx + dy * dy)
|
||
all_lines.append((float(s.x), float(s.y),
|
||
float(en.x), float(en.y), length))
|
||
except Exception:
|
||
continue
|
||
|
||
for (tx, ty) in flow_texts:
|
||
shaft_cands = []
|
||
arrow_pts = []
|
||
for (sx, sy, ex, ey, L) in all_lines:
|
||
mx = (sx + ex) * 0.5
|
||
my = (sy + ey) * 0.5
|
||
if math.hypot(mx - tx, my - ty) > search_radius_mm:
|
||
continue
|
||
if shaft_min_len_mm <= L:
|
||
shaft_cands.append((L, sx, sy, ex, ey))
|
||
else:
|
||
arrow_pts.append((sx, sy))
|
||
arrow_pts.append((ex, ey))
|
||
|
||
if not shaft_cands:
|
||
continue
|
||
shaft_cands.sort(key=lambda t: -t[0])
|
||
_, sx, sy, ex, ey = shaft_cands[0]
|
||
s_end = (sx, sy); e_end = (ex, ey)
|
||
|
||
# 어느 끝이 tip인가: arrowhead 점들이 더 밀집한 쪽
|
||
s_cnt = 0; e_cnt = 0
|
||
for (ax, ay) in arrow_pts:
|
||
if math.hypot(ax - sx, ay - sy) < math.hypot(ax - ex, ay - ey):
|
||
s_cnt += 1
|
||
else:
|
||
e_cnt += 1
|
||
|
||
if s_cnt == 0 and e_cnt == 0:
|
||
# arrowhead 검출 실패 → 텍스트에서 먼 쪽을 tip으로 가정
|
||
d_s = math.hypot(sx - tx, sy - ty)
|
||
d_e = math.hypot(ex - tx, ey - ty)
|
||
if d_s > d_e:
|
||
tip, tail = s_end, e_end
|
||
else:
|
||
tip, tail = e_end, s_end
|
||
elif s_cnt > e_cnt:
|
||
tip, tail = s_end, e_end
|
||
else:
|
||
tip, tail = e_end, s_end
|
||
|
||
dx = tip[0] - tail[0]
|
||
dy = tip[1] - tail[1]
|
||
L = math.hypot(dx, dy)
|
||
if L < 1e-3:
|
||
continue
|
||
flow_dirs.append((dx / L, dy / L))
|
||
|
||
if not flow_dirs:
|
||
return None
|
||
|
||
# 평균 (여러 FLOW가 있으면 방향 일치성 확인 후 평균)
|
||
avg_dx = sum(d[0] for d in flow_dirs) / len(flow_dirs)
|
||
avg_dy = sum(d[1] for d in flow_dirs) / len(flow_dirs)
|
||
L = math.hypot(avg_dx, avg_dy)
|
||
if L < 0.3:
|
||
# 여러 FLOW가 서로 반대 방향을 가리키면 벡터 합이 작아짐 → 신뢰도 낮음
|
||
return None
|
||
return (avg_dx / L, avg_dy / L)
|
||
|
||
def _detect_optional_components(self, msp, params: GateParams):
|
||
"""Phase A: 부속 구조물 존재 여부를 도면에서 검출해 has_* 플래그 갱신."""
|
||
from optional_detector import detect_components, summary_line
|
||
reports = detect_components(msp, self._get_component_specs())
|
||
params.has_service_bridge = reports["service_bridge"].present
|
||
params.has_hoist_housings = reports["hoist_housings"].present
|
||
params.has_downstream_apron = reports["downstream_apron"].present
|
||
params.raw_text_annotations.append((summary_line(reports), 0.0, 0.0))
|
||
|
||
# ----- Phase B' 평면 폴리곤 추출 -----
|
||
|
||
@staticmethod
|
||
def _polygon_area(pts: list) -> float:
|
||
"""shoelace 면적 (부호 없음)."""
|
||
if len(pts) < 3:
|
||
return 0.0
|
||
a = 0.0
|
||
n = len(pts)
|
||
for i in range(n):
|
||
x1, y1 = pts[i][0], pts[i][1]
|
||
x2, y2 = pts[(i + 1) % n][0], pts[(i + 1) % n][1]
|
||
a += x1 * y2 - x2 * y1
|
||
return abs(a) * 0.5
|
||
|
||
@staticmethod
|
||
def _is_closed(pts: list, tol: float = 1.0) -> bool:
|
||
"""첫점~끝점 거리가 tol 이하거나 동일점이면 폐합."""
|
||
if len(pts) < 3:
|
||
return False
|
||
dx = pts[0][0] - pts[-1][0]
|
||
dy = pts[0][1] - pts[-1][1]
|
||
return (dx * dx + dy * dy) ** 0.5 <= tol
|
||
|
||
def _extract_plan_polygons(self, msp, params: GateParams,
|
||
y_mid: float, ymin: float, ymax: float):
|
||
"""plan 영역에서 외곽/교각 폴리곤 추출 (3단계 폴백). 추출 성공 시
|
||
self._pier_origin_mm = (ox, oy)에 실제 사용된 origin을 기록 →
|
||
bridge bbox가 같은 로컬 프레임에 정합."""
|
||
self._pier_origin_mm = None # 성공 시 각 경로에서 설정
|
||
# === 1차: 폐합 LWPOLYLINE ===
|
||
closed_polys: list[tuple[list, float]] = []
|
||
for e in msp:
|
||
if e.dxf.layer != "CS-CONC-Spillway":
|
||
continue
|
||
if e.dxftype() != "LWPOLYLINE":
|
||
continue
|
||
try:
|
||
pts = [(p[0], p[1]) for p in e.get_points()]
|
||
except Exception:
|
||
continue
|
||
if not pts:
|
||
continue
|
||
mid_y = sum(p[1] for p in pts) / len(pts)
|
||
if mid_y <= y_mid:
|
||
continue
|
||
is_closed_dxf = False
|
||
with contextlib.suppress(Exception):
|
||
is_closed_dxf = bool(e.closed)
|
||
is_closed_geom = self._is_closed(pts, tol=5.0)
|
||
if not (is_closed_dxf or is_closed_geom):
|
||
continue
|
||
area = self._polygon_area(pts)
|
||
if area < 100.0:
|
||
continue
|
||
closed_polys.append((pts, area))
|
||
|
||
method_used = "none"
|
||
expected_n_piers = params.n_gates + 1
|
||
|
||
if closed_polys:
|
||
# 폐합 폴리곤 경로 (기존 로직)
|
||
closed_polys.sort(key=lambda t: -t[1])
|
||
outline_pts, outline_area = closed_polys[0]
|
||
arr_outline = np.array(outline_pts)
|
||
x_min_mm = arr_outline[:, 0].min()
|
||
y_min_mm = arr_outline[:, 1].min()
|
||
self._pier_origin_mm = (x_min_mm, y_min_mm)
|
||
xmin_m = x_min_mm / 1000.0
|
||
ymin_m = y_min_mm / 1000.0
|
||
params.plan_outline_polygon = [
|
||
((x / 1000.0) - xmin_m, (y / 1000.0) - ymin_m)
|
||
for x, y in outline_pts
|
||
]
|
||
pier_min_area = outline_area * 0.015
|
||
pier_max_area = outline_area * 0.45
|
||
pier_polys_m = []
|
||
for pts, area in closed_polys[1:]:
|
||
if not (pier_min_area <= area <= pier_max_area):
|
||
continue
|
||
pier_polys_m.append([
|
||
((x / 1000.0) - xmin_m, (y / 1000.0) - ymin_m)
|
||
for x, y in pts
|
||
])
|
||
params.pier_plan_polygons = pier_polys_m
|
||
method_used = "closed_polylines"
|
||
|
||
# 2차: polygon_reconstructor로 face enumeration (폐합 실패 또는 pier 부족)
|
||
if len(params.pier_plan_polygons) != expected_n_piers:
|
||
ok_recon = self._extract_piers_from_line_soup(msp, params, y_mid)
|
||
if ok_recon:
|
||
method_used = "face_enumeration"
|
||
|
||
# 3차: 선 스윕 + 격자 정렬 (수평/수직 선 클러스터 → pier 경계 매칭)
|
||
if len(params.pier_plan_polygons) != expected_n_piers:
|
||
ok_sweep = self._extract_piers_from_vertical_clusters(msp, params, y_mid)
|
||
if ok_sweep:
|
||
method_used = "vertical_clusters"
|
||
|
||
# 4차: bridge 실제 bbox 추출 — pier 추출 방법이 사용한 origin과 동일 프레임
|
||
if params.has_service_bridge and self._pier_origin_mm is not None:
|
||
bridge_bbox_mm = self._extract_bridge_bbox_mm(msp)
|
||
if bridge_bbox_mm is not None:
|
||
ox, oy = self._pier_origin_mm
|
||
bbox = (
|
||
(bridge_bbox_mm[0] - ox) / 1000.0,
|
||
(bridge_bbox_mm[1] - oy) / 1000.0,
|
||
(bridge_bbox_mm[2] - ox) / 1000.0,
|
||
(bridge_bbox_mm[3] - oy) / 1000.0,
|
||
)
|
||
params.bridge_plan_bbox = bbox
|
||
# 사용자 편집 가능한 초기값으로 UI param 노출 (sanity 통과 가정 하)
|
||
params.bridge_x_start = float(bbox[0])
|
||
params.bridge_y_start = float(bbox[1])
|
||
params.bridge_x_end = float(bbox[2])
|
||
params.bridge_y_end = float(bbox[3])
|
||
|
||
params.raw_text_annotations.append((
|
||
f"[plan_poly] outline_pts={len(params.plan_outline_polygon)}, "
|
||
f"piers={len(params.pier_plan_polygons)}/{expected_n_piers}, "
|
||
f"bridge_bbox={params.bridge_plan_bbox}, "
|
||
f"method={method_used}",
|
||
0.0, 0.0
|
||
))
|
||
|
||
def _compute_plan_origin_mm(self, params: GateParams, msp) -> tuple | None:
|
||
"""pier_plan_polygons 또는 plan_outline_polygon의 DXF mm origin을 역산.
|
||
|
||
각 추출 방법이 이미 (0,0)-기준 로컬 좌표로 저장했으므로, DXF mm 좌표에서
|
||
해당 points의 실제 min X/Y를 찾아 origin으로 사용. 여러 폴백 경로를 시도.
|
||
"""
|
||
if params.pier_plan_polygons:
|
||
# pier 폴리곤의 로컬 좌표에서 최소값은 (0,0)에 가까우므로, DXF 상의
|
||
# 실제 해당 point 위치를 찾기 위해 폴리곤의 mm 기준 bbox를 별도 계산
|
||
# → parser가 로컬 저장 시 사용한 origin과 동일해야 함.
|
||
# 각 추출 메서드가 origin을 다르게 쓸 수 있으므로, plan 영역 CS-CONC-Spillway
|
||
# 전체 geometry에서 pier 폴리곤 실제 위치를 역추정:
|
||
# 간단히: plan 영역 CS-CONC-Spillway geometry의 bbox min을 origin으로
|
||
return self._plan_bbox_origin_mm(msp)
|
||
if params.plan_outline_polygon:
|
||
return self._plan_bbox_origin_mm(msp)
|
||
return None
|
||
|
||
def _plan_bbox_origin_mm(self, msp) -> tuple | None:
|
||
"""plan 영역 CS-CONC-Spillway 전체 geometry의 DXF mm bbox min 반환."""
|
||
try:
|
||
from view_detector import detect_view_regions # noqa: F401 (protective availability check)
|
||
except ImportError:
|
||
return None
|
||
for fn in getattr(self, "_cached_paths", []) or []:
|
||
pass # 현재 context에서 path 접근 불가 — msp로 대체
|
||
# msp 기반 bbox: plan 영역 bounds를 view_detector 없이 추정하기 어려움.
|
||
# 대신 pier_plan_polygons을 역산: 여러 추출 경로가 모두 "plan 영역 geometry
|
||
# 최소점"을 origin으로 쓰므로, 여기서도 동일하게 계산.
|
||
xs: list = []
|
||
ys: list = []
|
||
for e in msp:
|
||
if e.dxf.layer != "CS-CONC-Spillway":
|
||
continue
|
||
try:
|
||
if e.dxftype() == "LINE":
|
||
xs.extend([e.dxf.start.x, e.dxf.end.x])
|
||
ys.extend([e.dxf.start.y, e.dxf.end.y])
|
||
elif e.dxftype() == "LWPOLYLINE":
|
||
for p in e.get_points():
|
||
xs.append(p[0])
|
||
ys.append(p[1])
|
||
except Exception:
|
||
continue
|
||
if not xs:
|
||
return None
|
||
# plan 영역만: Y 중간값 이상만 남김 (기존 y_mid 휴리스틱과 동일)
|
||
y_sorted = sorted(ys)
|
||
y_mid = y_sorted[len(y_sorted) // 2]
|
||
xs_plan = [x for x, y in zip(xs, ys, strict=False) if y > y_mid]
|
||
ys_plan = [y for y in ys if y > y_mid]
|
||
if not xs_plan:
|
||
return None
|
||
return (min(xs_plan), min(ys_plan))
|
||
|
||
def _extract_bridge_bbox_mm(self, msp) -> tuple | None:
|
||
"""Bridge 관련 레이어(CS-CONC-Bridge / 공도교 / 관리도로 등)의
|
||
DXF mm bbox (x_min, y_min, x_max, y_max) 반환. 없으면 None.
|
||
|
||
주의: 관리도로_수정은 보조 레이어라 제외(주 레이어만 사용).
|
||
"""
|
||
tokens = ("bridge", "공도교", "공도", "service road")
|
||
xs: list = []
|
||
ys: list = []
|
||
for e in msp:
|
||
try:
|
||
layer = e.dxf.layer
|
||
except Exception:
|
||
continue
|
||
lname = layer.lower()
|
||
if not any(t.lower() in lname for t in tokens):
|
||
continue
|
||
try:
|
||
if e.dxftype() == "LINE":
|
||
xs.extend([e.dxf.start.x, e.dxf.end.x])
|
||
ys.extend([e.dxf.start.y, e.dxf.end.y])
|
||
elif e.dxftype() == "LWPOLYLINE":
|
||
for p in e.get_points():
|
||
xs.append(p[0])
|
||
ys.append(p[1])
|
||
except Exception:
|
||
continue
|
||
if not xs:
|
||
return None
|
||
return (min(xs), min(ys), max(xs), max(ys))
|
||
|
||
def _extract_piers_from_line_soup(self, msp, params: GateParams,
|
||
y_mid: float) -> bool:
|
||
"""plan 영역 CS-CONC-Spillway의 개방선(LINE + LWPOLYLINE 인접쌍)에서
|
||
**polygon_reconstructor로 폐합 영역(face)을 복원**해 outline/pier 분류.
|
||
|
||
알고리즘:
|
||
1. 선분 수집 (plan bounds 내부만)
|
||
2. polygon_reconstructor.reconstruct_polygons로 모든 face 복원
|
||
3. 면적 내림차순: 가장 큰 것은 bbox outer face → outline (또는 해당
|
||
face 제외하고 그 다음 것이 실제 구조물 외곽)
|
||
4. 외곽 면적의 1.5%~45% 범위 면을 pier 후보
|
||
|
||
반환: pier가 n_gates+1 개 검출되면 True, 아니면 False.
|
||
"""
|
||
# 1) 선분 수집 (원본 DXF mm 단위, plan 영역만)
|
||
try:
|
||
from polygon_reconstructor import reconstruct_polygons
|
||
except ImportError:
|
||
return False
|
||
|
||
segs: list[tuple[tuple[float, float], tuple[float, float]]] = []
|
||
# plan_bounds_mm는 호출 컨텍스트에서 y_mid 계산에 반영됐으므로
|
||
# 여기서는 y > y_mid 필터만 일관 적용
|
||
for e in msp:
|
||
if e.dxf.layer != "CS-CONC-Spillway":
|
||
continue
|
||
et = e.dxftype()
|
||
if et == "LINE":
|
||
try:
|
||
s, en = e.dxf.start, e.dxf.end
|
||
if (s.y + en.y) / 2 > y_mid:
|
||
segs.append(((s.x, s.y), (en.x, en.y)))
|
||
except Exception:
|
||
pass
|
||
elif et == "LWPOLYLINE":
|
||
try:
|
||
pts = [(p[0], p[1]) for p in e.get_points()]
|
||
except Exception:
|
||
continue
|
||
if not pts or sum(p[1] for p in pts) / len(pts) <= y_mid:
|
||
continue
|
||
segs.extend(pairwise(pts))
|
||
# closed flag이면 마지막→첫 번째도 추가
|
||
try:
|
||
if bool(getattr(e, "closed", False)):
|
||
segs.append((pts[-1], pts[0]))
|
||
except Exception:
|
||
pass
|
||
|
||
if not segs:
|
||
return False
|
||
|
||
# 2) Face 복원 (DXF mm 단위, tol=5mm — CAD 수치 오차 흡수)
|
||
faces = reconstruct_polygons(segs, tol=5.0, min_area=1000.0, max_faces=3000)
|
||
|
||
if not faces:
|
||
# 최소한 bbox outline은 세팅
|
||
all_x = [p[0] for seg in segs for p in (seg[0], seg[1])]
|
||
all_y = [p[1] for seg in segs for p in (seg[0], seg[1])]
|
||
xmin_mm, xmax_mm = min(all_x), max(all_x)
|
||
ymin_mm, ymax_mm = min(all_y), max(all_y)
|
||
w = (xmax_mm - xmin_mm) / 1000.0
|
||
h = (ymax_mm - ymin_mm) / 1000.0
|
||
params.plan_outline_polygon = [
|
||
(0.0, 0.0), (w, 0.0), (w, h), (0.0, h)
|
||
]
|
||
return False
|
||
|
||
# 3) 가장 큰 face = 외곽, 그 다음부터 pier 후보
|
||
# (planar face enumeration에서 outer face가 항상 포함되진 않으나,
|
||
# 포함돼도 본 필터에서 정상 처리됨 — 최대 면적을 outline으로 사용)
|
||
outline_pts, outline_area = faces[0]
|
||
# 좌표 원점 정규화 (plan 영역 bbox 원점을 0,0으로)
|
||
arr_x = [p[0] for p in outline_pts]
|
||
arr_y = [p[1] for p in outline_pts]
|
||
x_min_mm = min(arr_x); y_min_mm = min(arr_y)
|
||
self._pier_origin_mm = (x_min_mm, y_min_mm)
|
||
params.plan_outline_polygon = [
|
||
((x - x_min_mm) / 1000.0, (y - y_min_mm) / 1000.0)
|
||
for x, y in outline_pts
|
||
]
|
||
|
||
# Pier 후보: 외곽 면적의 1.5%~45% + 예상 pier 치수 범위(폭·길이 각 50~200%)에 부합
|
||
pier_min_area = outline_area * 0.015
|
||
pier_max_area = outline_area * 0.45
|
||
# pier_length는 보통 여수로 전체 길이와 비슷하므로 총 길이 사용
|
||
expected_pw = params.pier_width
|
||
expected_pl = max(params.pier_length, params.total_length * 0.5)
|
||
w_lo, w_hi = expected_pw * 0.5, expected_pw * 2.0
|
||
l_lo, l_hi = expected_pl * 0.5, expected_pl * 2.0
|
||
|
||
pier_polys: list[list[tuple]] = []
|
||
for pts, area in faces[1:]:
|
||
if not (pier_min_area <= area <= pier_max_area):
|
||
continue
|
||
xs = [p[0] for p in pts]
|
||
ys = [p[1] for p in pts]
|
||
w_mm = max(xs) - min(xs)
|
||
l_mm = max(ys) - min(ys)
|
||
w_m = w_mm / 1000.0; l_m = l_mm / 1000.0
|
||
# 폭/길이 어느 쪽이 span 축인지 자동 판단 — 더 긴 쪽을 length로 매핑
|
||
if l_m < w_m:
|
||
w_m, l_m = l_m, w_m
|
||
if not (w_lo <= w_m <= w_hi):
|
||
continue
|
||
if not (l_lo <= l_m <= l_hi):
|
||
continue
|
||
pier_polys.append([
|
||
((x - x_min_mm) / 1000.0, (y - y_min_mm) / 1000.0)
|
||
for x, y in pts
|
||
])
|
||
|
||
params.pier_plan_polygons = pier_polys
|
||
|
||
# 성공 기준: pier 수가 예상(n_gates+1)과 일치
|
||
expected = params.n_gates + 1
|
||
return len(pier_polys) == expected
|
||
|
||
# ----- Phase B'' 선 스윕 + 격자 정렬 -----
|
||
|
||
def _extract_piers_from_vertical_clusters(self, msp, params: GateParams,
|
||
y_mid: float) -> bool:
|
||
"""개방선으로 그려진 도면에서 수직 클러스터 gap 패턴으로 gate/pier 식별.
|
||
|
||
전략: parametric 수치(gate_width, pier_width)에 의존하지 않고
|
||
**gap 분포로 gate opening을 먼저 찾은 뒤**, 클러스터를 pier 영역으로
|
||
그룹핑해 bbox 폴리곤을 생성. 실측 DXF 흔한 파턴에 맞춰짐.
|
||
|
||
단계:
|
||
1) plan 영역 수직 세그먼트 수집
|
||
2) X 1D 클러스터링 → 각 클러스터 (x_avg, y_min, y_max, total_len)
|
||
3) 길이 필터 (pier_length × 0.3 이상 = 의미 있는 structural 수직선)
|
||
4) 인접 클러스터 gap 계산 → 상위 n_gates 개가 gate opening
|
||
5) gate opening 사이/양끝 영역을 pier 영역으로 그룹화
|
||
6) 각 pier 영역의 leftmost·rightmost 클러스터가 pier 경계, Y 범위는
|
||
그 영역 클러스터들의 교집합
|
||
7) 로컬 좌표 m로 변환해 pier_plan_polygons 저장
|
||
|
||
반환: 정확히 n_gates+1 개 pier 생성되면 True.
|
||
"""
|
||
n_expected = params.n_gates + 1
|
||
|
||
# 1) 수직 세그먼트 수집
|
||
verticals: list[tuple[float, float, float]] = [] # (x_avg, y1, y2)
|
||
for e in msp:
|
||
if e.dxf.layer != "CS-CONC-Spillway":
|
||
continue
|
||
et = e.dxftype()
|
||
pairs: list[tuple[tuple, tuple]] = []
|
||
if et == "LINE":
|
||
try:
|
||
s, en = e.dxf.start, e.dxf.end
|
||
pairs.append(((s.x, s.y), (en.x, en.y)))
|
||
except Exception:
|
||
continue
|
||
elif et == "LWPOLYLINE":
|
||
try:
|
||
pts = [(p[0], p[1]) for p in e.get_points()]
|
||
except Exception:
|
||
continue
|
||
pairs.extend(pairwise(pts))
|
||
for p1, p2 in pairs:
|
||
dx = p2[0] - p1[0]; dy = p2[1] - p1[1]
|
||
if abs(dy) < 100.0:
|
||
continue
|
||
if abs(dy) < 3.0 * abs(dx):
|
||
continue
|
||
if (p1[1] + p2[1]) / 2 <= y_mid:
|
||
continue
|
||
verticals.append(((p1[0] + p2[0]) / 2,
|
||
min(p1[1], p2[1]), max(p1[1], p2[1])))
|
||
|
||
if not verticals:
|
||
return False
|
||
|
||
# 2) X 클러스터링 (greedy 1D, tol 500mm)
|
||
verticals.sort(key=lambda v: v[0])
|
||
cluster_tol_mm = 500.0
|
||
clusters_raw: list[list] = [[verticals[0]]]
|
||
for seg in verticals[1:]:
|
||
if seg[0] - clusters_raw[-1][-1][0] < cluster_tol_mm:
|
||
clusters_raw[-1].append(seg)
|
||
else:
|
||
clusters_raw.append([seg])
|
||
|
||
# 3) 특성 계산 + 길이 필터
|
||
length_threshold = max(params.pier_length * 0.3 * 1000.0, 3000.0)
|
||
strong: list[tuple[float, float, float, float]] = [] # (x, y_min, y_max, total_len)
|
||
for cl in clusters_raw:
|
||
xs = [s[0] for s in cl]
|
||
y_min = min(s[1] for s in cl)
|
||
y_max = max(s[2] for s in cl)
|
||
total_len = sum(s[2] - s[1] for s in cl)
|
||
if total_len >= length_threshold:
|
||
strong.append((sum(xs) / len(xs), y_min, y_max, total_len))
|
||
|
||
if len(strong) < 2 * n_expected:
|
||
# 기대하는 수직선 수(pier당 2)가 안 나오면 실패
|
||
return False
|
||
|
||
# 4) gap 분포 분석 → 상위 n_gates gap이 gate opening
|
||
strong.sort(key=lambda c: c[0])
|
||
gaps = [(i, strong[i + 1][0] - strong[i][0]) for i in range(len(strong) - 1)]
|
||
if not gaps:
|
||
return False
|
||
# 상위 n_gates gap 인덱스
|
||
sorted_gaps = sorted(gaps, key=lambda g: -g[1])
|
||
if len(sorted_gaps) < params.n_gates:
|
||
return False
|
||
gate_gap_indices = sorted([sorted_gaps[i][0] for i in range(params.n_gates)])
|
||
|
||
# 5) pier 영역 그룹화
|
||
pier_regions: list[list[tuple]] = []
|
||
prev = 0
|
||
for gi in gate_gap_indices:
|
||
pier_regions.append(strong[prev:gi + 1]) # gi 포함 = pier 오른쪽 edge
|
||
prev = gi + 1
|
||
pier_regions.append(strong[prev:])
|
||
|
||
if len(pier_regions) != n_expected:
|
||
return False
|
||
|
||
# 5a) 각 pier region이 최소 2개 클러스터 (left + right edge) 확보
|
||
for region in pier_regions:
|
||
if len(region) < 2:
|
||
return False
|
||
|
||
# 6) pier 폴리곤 구성 (leftmost~rightmost X, Y 범위는 region 전체)
|
||
pier_polys_mm: list[list[tuple]] = []
|
||
for region in pier_regions:
|
||
left_x = region[0][0]
|
||
right_x = region[-1][0]
|
||
# Y 범위: region 클러스터들의 교집합 (공통으로 존재하는 수직 범위)
|
||
y_min = max(c[1] for c in region)
|
||
y_max = min(c[2] for c in region)
|
||
# 교집합이 짧으면 합집합으로 폴백 (정보 보존)
|
||
if y_max - y_min < params.pier_length * 1000.0 * 0.3:
|
||
y_min = min(c[1] for c in region)
|
||
y_max = max(c[2] for c in region)
|
||
pier_polys_mm.append([
|
||
(left_x, y_min), (right_x, y_min),
|
||
(right_x, y_max), (left_x, y_max),
|
||
])
|
||
|
||
# 7) 로컬 좌표 m 변환 (plan_outline_polygon이 있으면 그 origin 재사용, 없으면 pier bbox)
|
||
all_x = [p[0] for poly in pier_polys_mm for p in poly]
|
||
all_y = [p[1] for poly in pier_polys_mm for p in poly]
|
||
ox_mm = min(all_x)
|
||
oy_mm = min(all_y)
|
||
self._pier_origin_mm = (ox_mm, oy_mm)
|
||
|
||
if not params.plan_outline_polygon:
|
||
# pier bbox를 outline으로 사용 (단순 사각형)
|
||
w_m = (max(all_x) - ox_mm) / 1000.0
|
||
h_m = (max(all_y) - oy_mm) / 1000.0
|
||
# 약간의 여유 margin
|
||
margin = 0.5
|
||
params.plan_outline_polygon = [
|
||
(-margin, -margin), (w_m + margin, -margin),
|
||
(w_m + margin, h_m + margin), (-margin, h_m + margin),
|
||
]
|
||
|
||
params.pier_plan_polygons = [
|
||
[((x - ox_mm) / 1000.0, (y - oy_mm) / 1000.0) for x, y in poly]
|
||
for poly in pier_polys_mm
|
||
]
|
||
return True
|
||
|
||
# ----- 텍스트 주석 스캔 -----
|
||
|
||
# 핵심 키워드 → 파라미터 매핑
|
||
_ELEVATION_PATTERNS: ClassVar[list[tuple[str, str]]] = [
|
||
(r"Gate Sill\s*EL\.?\s*(\d+\.?\d*)", "el_gate_sill"),
|
||
(r"Weir Crest\s*EL\.?\s*(\d+\.?\d*)", "el_weir_crest"),
|
||
(r"Gate Top\s*EL\.?\s*(\d+\.?\d*)", "el_gate_top"),
|
||
(r"Trunnion(?: Pin)?\s*EL\.?\s*(\d+\.?\d*)", "el_trunnion_pin"),
|
||
(r"Stoplog Sill\s*EL\.?\s*(\d+\.?\d*)", "el_stoplog_sill"),
|
||
(r"M\.?W\.?L\.?\s*EL\.?\s*(\d+\.?\d*)", "el_mwl"),
|
||
(r"N\.?H\.?W\.?L\.?\s*EL\.?\s*(\d+\.?\d*)", "el_nhwl"),
|
||
(r"F\.?W\.?L\.?\s*EL\.?\s*(\d+\.?\d*)", "el_nhwl"), # F.W.L ≈ N.H.W.L
|
||
(r"L\.?W\.?L\.?\s*EL\.?\s*(\d+\.?\d*)", "el_lwl"),
|
||
]
|
||
|
||
_GATE_SPEC_PATTERN = re.compile(
|
||
r"W\s*(\d+\.?\d*)\s*m?\s*[xX×]\s*H\s*(\d+\.?\d*)\s*m?\s*[xX×]\s*(\d+)\s*(?:문|門|bay)",
|
||
re.IGNORECASE,
|
||
)
|
||
|
||
def _scan_text_annotations(self, msp, params: GateParams):
|
||
"""TEXT, MTEXT 주석을 스캔하여 사양/표고 파싱."""
|
||
for e in msp:
|
||
et = e.dxftype()
|
||
if et not in ("TEXT", "MTEXT"):
|
||
continue
|
||
try:
|
||
txt = e.dxf.text if et == "TEXT" else (e.text or "")
|
||
txt = txt.strip()
|
||
except Exception:
|
||
continue
|
||
|
||
if not txt:
|
||
continue
|
||
|
||
# 원본 주석 보관
|
||
try:
|
||
pos = e.dxf.insert
|
||
params.raw_text_annotations.append((txt, pos.x, pos.y))
|
||
except Exception:
|
||
params.raw_text_annotations.append((txt, 0.0, 0.0))
|
||
|
||
# 수문 사양: "W15.0m x H7.0m x 3문"
|
||
m = self._GATE_SPEC_PATTERN.search(txt)
|
||
if m:
|
||
params.gate_width = float(m.group(1))
|
||
params.gate_height = float(m.group(2))
|
||
params.n_gates = int(m.group(3))
|
||
|
||
# 표고 패턴
|
||
for pattern, field_name in self._ELEVATION_PATTERNS:
|
||
mm = re.search(pattern, txt, re.IGNORECASE)
|
||
if mm:
|
||
try:
|
||
val = float(mm.group(1))
|
||
if hasattr(params, field_name):
|
||
setattr(params, field_name, val)
|
||
except Exception:
|
||
pass
|
||
|
||
# ----- 수문 위치 추출 (치수선 기반) -----
|
||
|
||
def _extract_gate_positions_from_dims(self, msp, params: GateParams):
|
||
"""DIMENSION 엔티티에서 'Clear Span'을 찾아 수문 중심 X 좌표 보정."""
|
||
clear_span_dims = []
|
||
for e in msp:
|
||
if e.dxftype() != "DIMENSION":
|
||
continue
|
||
try:
|
||
text = e.dxf.get("text", "")
|
||
meas = e.dxf.get("actual_measurement", None)
|
||
if meas is None or meas < 1000 or meas > 100000:
|
||
continue
|
||
|
||
# "Clear Span <>" 패턴 또는 15000 근처의 수평 치수
|
||
is_clear_span = "Clear Span" in text or abs(meas - 15000) < 500
|
||
|
||
dp1 = e.dxf.defpoint
|
||
dp2 = e.dxf.defpoint2 if e.dxf.hasattr("defpoint2") else dp1
|
||
dx = abs(dp2.x - dp1.x)
|
||
dy = abs(dp2.y - dp1.y)
|
||
|
||
# 수평 치수 (dx > dy)
|
||
if dx > dy and is_clear_span:
|
||
# dp1과 dp2의 중점 X (그리고 평면영역 Y > 45000 인지 확인)
|
||
mid_x = (dp1.x + dp2.x) / 2
|
||
mid_y = (dp1.y + dp2.y) / 2
|
||
if mid_y > 40000: # 평면영역
|
||
clear_span_dims.append(mid_x / 1000.0) # mm→m
|
||
except Exception:
|
||
continue
|
||
|
||
if clear_span_dims:
|
||
clear_span_dims.sort()
|
||
# 중복 제거 (동일 X 근처)
|
||
unique = []
|
||
for x in clear_span_dims:
|
||
if not unique or abs(x - unique[-1]) > params.gate_width * 0.5:
|
||
unique.append(x)
|
||
if len(unique) >= 2: # 최소 2개 이상 확인되면 신뢰
|
||
# 로컬 좌표계로 변환
|
||
x_min = min(unique)
|
||
params.gate_centers_x = [x - x_min + params.gate_width / 2 for x in unique]
|
||
params.n_gates = len(unique)
|
||
|
||
# ----- 누락 파라미터 추론 -----
|
||
|
||
def _infer_missing_params(self, params: GateParams):
|
||
"""파싱되지 않은 파라미터를 합리적 기본값/추론값으로 채움."""
|
||
# 수문 높이: Gate Top - Gate Sill
|
||
if abs(params.gate_height - 7.0) < 0.01: # 기본값 상태면
|
||
diff = params.el_gate_top - params.el_gate_sill
|
||
if 4.0 < diff < 15.0:
|
||
params.gate_height = diff
|
||
|
||
# pier_count = n_gates + 1
|
||
params.pier_count = params.n_gates + 1
|
||
|
||
# 교각 폭 추론: gate_centers_x가 있으면 인접 수문 간격에서 계산
|
||
if len(params.gate_centers_x) >= 2:
|
||
# 인접 수문 중심 간격 = gate_width + pier_width
|
||
spacings = [params.gate_centers_x[i+1] - params.gate_centers_x[i]
|
||
for i in range(len(params.gate_centers_x) - 1)]
|
||
avg_spacing = sum(spacings) / len(spacings)
|
||
inferred_pw = avg_spacing - params.gate_width
|
||
if 1.0 <= inferred_pw <= 8.0:
|
||
params.pier_width = inferred_pw
|
||
else:
|
||
params.pier_width = 3.0
|
||
else:
|
||
total_gate_w = params.n_gates * params.gate_width
|
||
if params.total_span > total_gate_w:
|
||
inferred = (params.total_span - total_gate_w) / (params.n_gates + 1)
|
||
params.pier_width = inferred if 1.5 <= inferred <= 3.5 else 3.0
|
||
else:
|
||
params.pier_width = 3.0
|
||
|
||
# gate_centers_x가 비어있으면 균등 배치
|
||
if not params.gate_centers_x:
|
||
pw = params.pier_width
|
||
gw = params.gate_width
|
||
params.gate_centers_x = [
|
||
pw + gw * 0.5 + i * (gw + pw) for i in range(params.n_gates)
|
||
]
|
||
|
||
# 구조물 유효 폭 (수문+교각만, wing wall 제외)
|
||
pw = params.pier_width
|
||
gw = params.gate_width
|
||
effective_span = params.n_gates * gw + (params.n_gates + 1) * pw
|
||
# gate_centers_x를 로컬 좌표계(0 ~ effective_span)로 재정렬
|
||
if params.gate_centers_x:
|
||
offset = params.gate_centers_x[0] - (pw + gw / 2)
|
||
params.gate_centers_x = [x - offset for x in params.gate_centers_x]
|
||
params.total_span = effective_span
|
||
|
||
# ogee_profile이 비어있으면 표준 ogee 곡선으로 생성
|
||
if not params.ogee_profile:
|
||
params.ogee_profile = self._default_ogee_profile(params)
|
||
|
||
def _default_ogee_profile(self, params: GateParams) -> list:
|
||
"""표준 ogee 여수로 단면을 표고 기반으로 생성.
|
||
|
||
간략화된 crest → 경사면 → apron 프로파일.
|
||
점 목록 [(x_along_flow, z_elevation)]
|
||
"""
|
||
crest_el = params.el_weir_crest
|
||
sill_el = params.el_gate_sill
|
||
upstream_bed = params.el_upstream_bed
|
||
downstream_bed = params.el_downstream
|
||
|
||
# 상류 수직 → 크레스트 정상 → 하류 곡선 → 수평 apron
|
||
profile = [
|
||
(0.0, upstream_bed), # 상류 바닥
|
||
(0.0, crest_el - 2.0), # 상류 옹벽 상단 (수면 아래)
|
||
(1.0, crest_el), # 크레스트 정점
|
||
(3.0, crest_el - 1.5), # 하류 곡선 시작
|
||
(7.0, sill_el + 2.0), # 하류 경사면 중간
|
||
(12.0, sill_el), # 게이트 sill 레벨
|
||
(20.0, sill_el), # 평탄 apron
|
||
(25.0, downstream_bed), # 하류 바닥
|
||
]
|
||
return profile
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# 편의 함수
|
||
# ---------------------------------------------------------------------------
|
||
|
||
def parse_gate_dxf(plan_dxf: str, section_dxf: str | None = None) -> GateParams:
|
||
"""간편 호출 인터페이스."""
|
||
parser = GateParser()
|
||
return parser.parse(plan_dxf, section_dxf)
|
||
|
||
|
||
if __name__ == "__main__":
|
||
# 샘플 테스트
|
||
import sys
|
||
if len(sys.argv) >= 3:
|
||
params = parse_gate_dxf(sys.argv[1], sys.argv[2])
|
||
elif len(sys.argv) == 2:
|
||
params = parse_gate_dxf(sys.argv[1])
|
||
else:
|
||
# Gate_Sample 기본 경로
|
||
base = Path("Gate_Sample")
|
||
f1 = base / "12995740-M40-001 여수로 수문 설치도(1/2).dxf"
|
||
f2 = base / "12995740-M40-002 여수로 수문 설치도(2/2).dxf"
|
||
params = parse_gate_dxf(str(f1), str(f2))
|
||
|
||
print(params.summary())
|
||
print()
|
||
print(f"ogee profile: {len(params.ogee_profile)} points")
|
||
if params.ogee_profile:
|
||
for x, z in params.ogee_profile[:10]:
|
||
print(f" ({x:.2f}m, EL.{z:.2f})")
|