S-CANVAS (Saman Corp.) — DXF + DEM + AI 기반 3D 조감도 생성 엔진. ~24k LOC Python (scanvas_maker.py 7072 LOC GUI + 구조물 파서/빌더 다수). 이 커밋은 7-iter cleanup이 적용된 상태로 import: - F821 8 + B023 6: 비동기 lambda + except/loop 변수 캡처 NameError (Py3.13에서 reproduce 확인된 진짜 버그) - RUF012 4 + RUF013 1: ClassVar / implicit Optional 명시화 - F811/B905/B904/F401/F841/W293/F541/UP/SIM/RUF/PLR 700+ cleanup/modernization 신규 파일: - ruff.toml: target=py313, Korean unicode/저자 스타일/도메인 복잡도 무력화 - requirements-py313.txt: pyproj>=3.7, scipy>=1.14, numpy>=2.0.2 (Py3.13 wheel) - .gitignore: gcp-key.json, 캐시, 백업, 생성 이미지 제외 검증: ruff 0 errors, py_compile 0 errors, import 33/33 OK on Py3.13.13. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
680 lines
28 KiB
Python
680 lines
28 KiB
Python
"""제수변실 (Valve Chamber) + 도수관로 DXF 파서.
|
||
|
||
구조 특성:
|
||
- 콘크리트 실(chamber) 본체
|
||
- 내부 밸브 다수 (게이트/버터플라이/체크 등)
|
||
- 도수관 (intake main pipe, 주 입수관)
|
||
- 송수관 (transmission pipes, 여러 계통)
|
||
- 상단 슬라이드 뚜껑/맨홀
|
||
- 외부 관로 연장
|
||
|
||
사용법:
|
||
parser = ValveChamberParser()
|
||
params = parser.parse(["valve_chamber.dxf"])
|
||
"""
|
||
|
||
from __future__ import annotations
|
||
|
||
import re
|
||
from dataclasses import dataclass, field
|
||
|
||
import ezdxf
|
||
import numpy as np
|
||
|
||
from view_detector import detect_view_regions
|
||
from dxf_geometry import extract_structural_geometry
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# 데이터 클래스
|
||
# ---------------------------------------------------------------------------
|
||
|
||
VALVE_TYPES = {
|
||
"BUTTERFLY": "버터플라이",
|
||
"GATE": "게이트",
|
||
"CHECK": "체크",
|
||
"BALL": "볼",
|
||
"UNKNOWN": "일반",
|
||
}
|
||
|
||
|
||
@dataclass
|
||
class Valve:
|
||
"""개별 밸브."""
|
||
index: int = 0
|
||
name: str = "" # "M-302" 등
|
||
valve_type: str = "GATE" # BUTTERFLY / GATE / CHECK
|
||
center_x: float = 0.0 # 실 로컬 X (m)
|
||
center_y: float = 0.0 # 실 로컬 Y (m)
|
||
elevation: float = 0.0 # EL (m)
|
||
diameter: float = 0.4 # 관경 (m)
|
||
label: str = "" # "M-302 주밸브"
|
||
|
||
|
||
@dataclass
|
||
class Pipe:
|
||
"""개별 관로 (도수관/송수관)."""
|
||
name: str = "" # "M-301 도수관"
|
||
diameter: float = 0.8 # 관경 (m)
|
||
start: tuple = (0.0, 0.0, 0.0) # 월드 좌표
|
||
end: tuple = (0.0, 0.0, 0.0)
|
||
elevation: float = 0.0
|
||
|
||
|
||
@dataclass
|
||
class ValveChamberParams:
|
||
"""제수변실 파라미터."""
|
||
|
||
# 실 본체
|
||
chamber_width: float = 27.0 # X 방향 가로 (실 폭)
|
||
chamber_depth: float = 9.0 # Y 방향 세로 (실 깊이)
|
||
chamber_wall_thickness: float = 0.6
|
||
bottom_el: float = 21.0
|
||
top_el: float = 28.5 # 상판 EL
|
||
|
||
# 내부 바닥 여러 EL (단형 바닥)
|
||
floor_elevations: list = field(default_factory=list)
|
||
|
||
# 밸브
|
||
valves: list = field(default_factory=list)
|
||
|
||
# 관로 (도수/송수)
|
||
pipes: list = field(default_factory=list)
|
||
|
||
# 주 도수관 (chamber 관통)
|
||
main_conduit_diameter: float = 1.0
|
||
main_conduit_direction: str = "X" # "X" (가로 관통) | "Y"
|
||
main_conduit_el: float = 22.0
|
||
|
||
# 상단 슬라이드 뚜껑 / 맨홀
|
||
has_hatch: bool = True
|
||
hatch_count: int = 1
|
||
hatch_size: float = 1.0
|
||
|
||
# 외부 관로 연장 (방향별 분리 — 도면 실측에 맞춤)
|
||
external_pipe_length: float = 5.0 # legacy 단일값 (fallback·하위호환)
|
||
upstream_pipe_length: float = 3.0 # 좌측 도수관 상류(분기점 왼쪽) 길이
|
||
downstream_pipe_length: float = 4.0 # 우측 송수관 하류 길이 (실측 ≈ 3.6m)
|
||
|
||
# 상류 도수관 분기 (Y-branch): 도수관 1개 → 상단·하단 2개로 분기 후 chamber 진입
|
||
has_inlet_branch: bool = True # 분기 구조 여부 (UI 토글)
|
||
branch_spread_m: float = 4.4 # 상단·하단 관 중심 Y 간격 (실측)
|
||
branch_angle_deg: float = 35.0 # 분기 합류각 (deg, 단관→분기)
|
||
branch_trunk_length: float = 3.0 # 분기점 이전의 단일 도수관 길이
|
||
|
||
# 출입 계단
|
||
has_entry_stairs: bool = True
|
||
|
||
# 지반 / 환경
|
||
ground_level: float | None = None # 실 바닥보다 높은 지반
|
||
|
||
# 소스
|
||
source_files: list = field(default_factory=list)
|
||
raw_annotations: list = field(default_factory=list)
|
||
|
||
def summary(self) -> str:
|
||
return (
|
||
f"Valve Chamber: {self.chamber_width:.1f} × {self.chamber_depth:.1f}m, "
|
||
f"EL.{self.bottom_el:.1f}~{self.top_el:.1f} (H={self.top_el - self.bottom_el:.1f}m)\n"
|
||
f" 밸브 {len(self.valves)}개, 관로 {len(self.pipes)}개, "
|
||
f"뚜껑 {'O' if self.has_hatch else 'X'}"
|
||
)
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# 파서
|
||
# ---------------------------------------------------------------------------
|
||
|
||
EL_PATTERN = re.compile(r"EL\.?\s*[=:]?\s*(\d+\.?\d*)", re.IGNORECASE)
|
||
|
||
VALVE_TEXT_PATTERN = re.compile(
|
||
r"(M-\d{3})[\s.]*(?:(.*?밸브)|(.*?관))",
|
||
re.IGNORECASE,
|
||
)
|
||
|
||
# 밸브 타입 키워드
|
||
VALVE_TYPE_KEYWORDS = {
|
||
"BUTTERFLY": ["버터플라이", "butterfly"],
|
||
"GATE": ["게이트", "gate"],
|
||
"CHECK": ["체크", "check"],
|
||
"BALL": ["볼", "ball"],
|
||
}
|
||
|
||
|
||
def _clean_mtext(raw: str) -> str:
|
||
"""MTEXT 포맷 코드(\\C4;, \\f...;, \\H...;)를 제거하고 \\P를 줄바꿈으로 변환."""
|
||
if not raw:
|
||
return ""
|
||
text = raw
|
||
# 색상 \\C#;, 폰트 \\f...;, 높이 \\H...;, 너비 \\W...; 등 일반 포맷 코드 제거
|
||
text = re.sub(r"\\C\d+;?", "", text)
|
||
text = re.sub(r"\\f[^;]*;", "", text)
|
||
text = re.sub(r"\\H[\d.]+x?;?", "", text)
|
||
text = re.sub(r"\\W[\d.]+;?", "", text)
|
||
text = re.sub(r"\\Q[\d.\-]+;?", "", text)
|
||
text = re.sub(r"\\T[\d.]+;?", "", text)
|
||
text = re.sub(r"\\A\d+;?", "", text)
|
||
# 줄바꿈
|
||
text = text.replace("\\P", "\n")
|
||
# 그룹 괄호 정리
|
||
text = text.strip()
|
||
if text.startswith("{"):
|
||
text = text[1:]
|
||
if text.endswith("}"):
|
||
text = text[:-1]
|
||
return text.strip()
|
||
|
||
|
||
def _mleader_endpoint(ml) -> tuple[float, float] | None:
|
||
"""MLEADER에서 첫 leader line의 끝점(화살표 위치, DXF raw 좌표) 반환."""
|
||
try:
|
||
ctx = ml.context
|
||
for leader in (getattr(ctx, "leaders", None) or []):
|
||
for line in (getattr(leader, "lines", None) or []):
|
||
vs = getattr(line, "vertices", None)
|
||
if vs:
|
||
return (vs[0].x, vs[0].y)
|
||
except Exception:
|
||
return None
|
||
return None
|
||
|
||
|
||
def _mleader_text(ml) -> str:
|
||
try:
|
||
ctx = ml.context
|
||
if ctx and getattr(ctx, "mtext", None):
|
||
return _clean_mtext(ctx.mtext.default_content or "")
|
||
except Exception:
|
||
return ""
|
||
return ""
|
||
|
||
|
||
class ValveChamberParser:
|
||
"""제수변실 파서."""
|
||
|
||
def parse(self, dxf_paths: list[str]) -> ValveChamberParams:
|
||
params = ValveChamberParams()
|
||
params.source_files = list(dxf_paths)
|
||
|
||
for path in dxf_paths:
|
||
try:
|
||
self._parse_single(path, params)
|
||
except Exception as e:
|
||
print(f" 파싱 오류: {e}")
|
||
|
||
self._finalize(params)
|
||
return params
|
||
|
||
def _parse_single(self, path: str, params: ValveChamberParams):
|
||
doc = ezdxf.readfile(path)
|
||
msp = doc.modelspace()
|
||
|
||
geom = extract_structural_geometry(path)
|
||
scale = geom.unit_scale
|
||
|
||
views = detect_view_regions(path)
|
||
|
||
# EL 텍스트 수집
|
||
el_texts = self._collect_el(msp, scale)
|
||
if el_texts:
|
||
els = [v for _, _, v in el_texts]
|
||
params.top_el = max(params.top_el, *els)
|
||
params.bottom_el = min(params.bottom_el, *els)
|
||
# 중간 EL들 추출 (바닥 단)
|
||
mid_els = sorted(set(
|
||
round(v, 1) for _, _, v in el_texts
|
||
if params.bottom_el < v < params.top_el
|
||
))
|
||
params.floor_elevations = mid_els[:5] # 상위 5개
|
||
params.raw_annotations.extend(
|
||
[(f"EL.{v:.2f}", x, y) for x, y, v in el_texts]
|
||
)
|
||
|
||
# 실 본체 크기: 평면도 영역
|
||
plan_view = None
|
||
for v in views:
|
||
if v.view_type == "plan":
|
||
plan_view = v
|
||
break
|
||
|
||
# 1순위: 평면도 영역 그대로 사용 (기본값 27×9는 폴백, 실제 도면 우선)
|
||
if plan_view:
|
||
params.chamber_width = plan_view.width
|
||
params.chamber_depth = plan_view.height
|
||
else:
|
||
# 2순위: CS-CONC-밸브실 레이어 bbox
|
||
chamber_pts = []
|
||
for e in msp:
|
||
if e.dxf.layer != "CS-CONC-밸브실":
|
||
continue
|
||
try:
|
||
if e.dxftype() == "LWPOLYLINE":
|
||
chamber_pts.extend((p[0] * scale, p[1] * scale)
|
||
for p in e.get_points())
|
||
elif e.dxftype() == "LINE":
|
||
chamber_pts.append((e.dxf.start.x * scale, e.dxf.start.y * scale))
|
||
chamber_pts.append((e.dxf.end.x * scale, e.dxf.end.y * scale))
|
||
except Exception:
|
||
continue
|
||
if chamber_pts:
|
||
arr = np.array(chamber_pts)
|
||
params.chamber_width = arr[:, 0].max() - arr[:, 0].min()
|
||
params.chamber_depth = arr[:, 1].max() - arr[:, 1].min()
|
||
|
||
# 밸브/관로 추출:
|
||
# 1) MLEADER (안내선) 끝점에서 정확한 chamber-local 좌표 확보
|
||
# 2) MLEADER에 없는 항목은 TEXT 기반 폴백 (M-301 도수관 등)
|
||
ml_valves, ml_pipes = self._extract_from_mleaders(msp, plan_view, scale, params)
|
||
txt_valves, txt_pipes = self._extract_valves_and_pipes(msp, scale)
|
||
|
||
# MLEADER 결과가 우선. 같은 M-NNN이 양쪽에 있으면 MLEADER 사용.
|
||
ml_names = {v.name for v in ml_valves} | {p.name for p in ml_pipes}
|
||
merged_valves = list(ml_valves)
|
||
merged_valves.extend(v for v in txt_valves if v.name and v.name not in ml_names)
|
||
merged_pipes = list(ml_pipes)
|
||
merged_pipes.extend(p for p in txt_pipes if p.name and p.name not in ml_names)
|
||
|
||
# dedupe by name — 같은 M-NNN이 여러 엔티티(label column TEXT + in-drawing MTEXT)
|
||
# 에서 잡혀 중복 생성되는 케이스 방지. 직경이 명시된 항목 우선.
|
||
def _dedupe(items, default_dia: float):
|
||
best: dict[str, object] = {}
|
||
for it in items:
|
||
key = it.name
|
||
if not key:
|
||
continue
|
||
if key not in best:
|
||
best[key] = it
|
||
else:
|
||
cur = best[key]
|
||
cur_default = abs(getattr(cur, "diameter", 0) - default_dia) < 1e-6
|
||
new_default = abs(getattr(it, "diameter", 0) - default_dia) < 1e-6
|
||
if cur_default and not new_default:
|
||
best[key] = it
|
||
return list(best.values())
|
||
|
||
merged_valves = _dedupe(merged_valves, default_dia=0.4)
|
||
merged_pipes = _dedupe(merged_pipes, default_dia=0.8)
|
||
|
||
if merged_valves:
|
||
params.valves = merged_valves
|
||
if merged_pipes:
|
||
params.pipes = merged_pipes
|
||
for p in merged_pipes:
|
||
if "M-301" in p.name or "도수관" in p.name:
|
||
params.main_conduit_diameter = max(p.diameter,
|
||
params.main_conduit_diameter)
|
||
params.main_conduit_el = p.elevation or params.main_conduit_el
|
||
|
||
# 슬라이드 뚜껑 INSERT 확인
|
||
for e in msp.query("INSERT"):
|
||
if "슬라이드" in getattr(e.dxf, "name", ""):
|
||
params.has_hatch = True
|
||
params.hatch_count = max(params.hatch_count, 1)
|
||
|
||
def _collect_el(self, msp, scale: float) -> list:
|
||
out = []
|
||
for e in msp:
|
||
if e.dxftype() not in ("TEXT", "MTEXT"):
|
||
continue
|
||
try:
|
||
txt = e.dxf.text if e.dxftype() == "TEXT" else (e.text or "")
|
||
m = EL_PATTERN.search(txt)
|
||
if m:
|
||
pos = e.dxf.insert
|
||
out.append((pos.x * scale, pos.y * scale, float(m.group(1))))
|
||
except Exception:
|
||
continue
|
||
return out
|
||
|
||
def _extract_from_mleaders(self, msp, plan_view, scale: float,
|
||
params: ValveChamberParams
|
||
) -> tuple[list[Valve], list[Pipe]]:
|
||
"""MLEADER(안내선)로 밸브/관로의 정확한 chamber-local 위치 추출.
|
||
|
||
- leader 끝점이 plan_view bounds 안이면 chamber-local 좌표로 변환
|
||
- 텍스트에서 M-NNN, "밸브"/"도수관"/"송수관"/"D=숫자" 추출
|
||
- destination-only pipe (예: "{천상정수장 D1,200}")도 출력 관로로 인식
|
||
"""
|
||
valves: list[Valve] = []
|
||
pipes: list[Pipe] = []
|
||
|
||
if plan_view:
|
||
cx = (plan_view.bounds[0] + plan_view.bounds[2]) / 2.0
|
||
cy = (plan_view.bounds[1] + plan_view.bounds[3]) / 2.0
|
||
x0, y0, x1, y1 = plan_view.bounds
|
||
else:
|
||
cx = cy = 0.0
|
||
x0 = y0 = -1e18; x1 = y1 = 1e18
|
||
|
||
for ml in msp.query("MULTILEADER MLEADER"):
|
||
txt = _mleader_text(ml)
|
||
end = _mleader_endpoint(ml)
|
||
if not txt or end is None:
|
||
continue
|
||
ex_m = end[0] * scale
|
||
ey_m = end[1] * scale
|
||
# plan view 밖이면 (예: side view, M-306 등) 건너뜀 — chamber 평면 배치엔 안 씀
|
||
if not (x0 <= ex_m <= x1 and y0 <= ey_m <= y1):
|
||
continue
|
||
local_x = ex_m - cx
|
||
local_y = ey_m - cy
|
||
|
||
# 텍스트 분류
|
||
m_match = re.search(r"(M-\d{3})", txt)
|
||
name = m_match.group(1) if m_match else ""
|
||
txt_clean = txt.replace("\n", " ").strip()
|
||
|
||
is_valve = "밸브" in txt_clean
|
||
is_pipe_kw = any(kw in txt_clean for kw in ("도수관", "송수관", "강재도관"))
|
||
# destination + D=숫자 → 출력 송수관
|
||
is_dest_pipe = (
|
||
("정수장" in txt_clean or "계통" in txt_clean or "유지용수" in txt_clean)
|
||
and re.search(r"D[\s,.]*\d", txt_clean) is not None
|
||
and not is_valve
|
||
)
|
||
|
||
# 직경 파싱 (D1,200 / D80 / Φ800 / D=800 / %%c800)
|
||
# D 다음에 optional 공백·=·콜론, 그 뒤 숫자(쉼표/마침표 천 단위 구분자 허용)
|
||
dia_m = re.search(r"(?:Φ|%%[cC]|\bD[\s=:]*)(\d[\d,.]*)", txt_clean)
|
||
diameter = None
|
||
if dia_m:
|
||
raw = dia_m.group(1).replace(",", "").rstrip(".")
|
||
try:
|
||
val = float(raw)
|
||
# mm 단위 가정 (val>=10), 그 이하는 m로 가정
|
||
diameter = val / 1000.0 if val >= 10 else val
|
||
except ValueError:
|
||
pass
|
||
|
||
if is_valve:
|
||
# 타입 추정: 부밸브/주밸브에서 직접 못 가르므로 GATE 기본
|
||
vtype = "GATE"
|
||
for vt, kws in VALVE_TYPE_KEYWORDS.items():
|
||
if any(kw in txt_clean for kw in kws):
|
||
vtype = vt
|
||
break
|
||
v = Valve(
|
||
index=len(valves),
|
||
name=name or f"V?{len(valves)+1}",
|
||
valve_type=vtype,
|
||
center_x=local_x,
|
||
center_y=local_y,
|
||
elevation=params.bottom_el + 1.5,
|
||
diameter=diameter or 0.5,
|
||
label=txt_clean[:60],
|
||
)
|
||
valves.append(v)
|
||
elif is_pipe_kw or is_dest_pipe:
|
||
# 방향: 끝점이 chamber 중심 기준 어느 쪽에 가까운지로 판단
|
||
# 좌측 가까우면 -X 외부, 우측이면 +X 외부, 위/아래는 ±Y
|
||
half_w = params.chamber_width / 2.0
|
||
half_d = params.chamber_depth / 2.0
|
||
rx = abs(local_x) / max(half_w, 1e-6)
|
||
ry = abs(local_y) / max(half_d, 1e-6)
|
||
z = params.bottom_el + 1.5
|
||
if rx >= ry:
|
||
# X 방향: 좌측=상류 도수관(has_inlet_branch 시 _finalize가 분기 생성),
|
||
# 우측=하류 송수관 → 실측 길이 분리
|
||
sx = 1.0 if local_x >= 0 else -1.0
|
||
ext = (params.downstream_pipe_length if sx > 0
|
||
else params.upstream_pipe_length)
|
||
start = (local_x, local_y, z)
|
||
end_pt = (sx * (half_w + ext), local_y, z)
|
||
else:
|
||
sy = 1.0 if local_y >= 0 else -1.0
|
||
ext = max(3.0, params.external_pipe_length)
|
||
start = (local_x, local_y, z)
|
||
end_pt = (local_x, sy * (half_d + ext), z)
|
||
p = Pipe(
|
||
name=name or txt_clean[:30],
|
||
diameter=diameter or 0.8,
|
||
start=start,
|
||
end=end_pt,
|
||
elevation=z,
|
||
)
|
||
pipes.append(p)
|
||
return valves, pipes
|
||
|
||
def _extract_valves_and_pipes(self, msp, scale: float,
|
||
proximity_radius_m: float = 5.0
|
||
) -> tuple[list[Valve], list[Pipe]]:
|
||
"""TEXT/MTEXT에서 M-번호 밸브/관로 추출 (proximity grouping).
|
||
|
||
도면에 따라 라벨이 분리될 수 있는 4가지 케이스 모두 처리:
|
||
1) 단일 TEXT: "M-302 송수관 주밸브(1)" — 한 엔티티에 모든 정보
|
||
2) MTEXT 멀티라인: "{도수관\\PM-301}" — 같은 엔티티 두 줄 (\\P 분리)
|
||
3) 인접 TEXT 분리: "M-301" + "도수관"이 인접 좌표에 별개 엔티티
|
||
4) 라벨 + 외부 직경 텍스트: "M-302 주밸브" + "Φ800"이 근처
|
||
|
||
알고리즘:
|
||
- 모든 TEXT/MTEXT 정리(MTEXT 포맷 코드 strip, \\P→\\n)
|
||
- 각 엔티티의 TEXT를 (clean_txt, x, y)로 수집
|
||
- M-NNN을 포함하는 엔티티마다 proximity_radius_m 내 다른 엔티티 텍스트를
|
||
병합해 full_label 구성 (단, 다른 M-NNN 엔티티는 제외)
|
||
- full_label에서 분류·직경 추출
|
||
"""
|
||
valves: list[Valve] = []
|
||
pipes: list[Pipe] = []
|
||
|
||
# 1) 모든 텍스트 엔티티 수집 (정리된 텍스트 + 위치)
|
||
items: list[tuple[str, float, float]] = []
|
||
for e in msp:
|
||
if e.dxftype() not in ("TEXT", "MTEXT"):
|
||
continue
|
||
try:
|
||
raw = e.dxf.text if e.dxftype() == "TEXT" else (e.text or "")
|
||
txt = _clean_mtext(raw) if e.dxftype() == "MTEXT" else raw.strip()
|
||
if not txt:
|
||
continue
|
||
pos = e.dxf.insert
|
||
items.append((txt, pos.x * scale, pos.y * scale))
|
||
except Exception:
|
||
continue
|
||
|
||
# 2) M-NNN을 가진 인덱스 찾기 (re.search로 any-position 매칭, 케이스 1+2 커버)
|
||
m_indices: list[tuple[int, str, float, float, str]] = [] # (idx, name, x, y, txt)
|
||
for i, (txt, x, y) in enumerate(items):
|
||
m_match = re.search(r"\b(M-\d{3})\b", txt)
|
||
if m_match:
|
||
m_indices.append((i, m_match.group(1), x, y, txt))
|
||
|
||
# 3) 각 M-NNN을 분류. 우선 own_txt만으로 시도 → 모호하면 proximity로 보강.
|
||
m_idx_set = {idx for idx, *_ in m_indices}
|
||
|
||
def _classify(text: str) -> tuple[bool, bool, bool]:
|
||
is_p = any(kw in text for kw in
|
||
("도수관", "송수관", "강재도관", "pipe", "Pipe"))
|
||
is_v = any(kw in text for kw in ("밸브", "valve", "Valve"))
|
||
is_dest = (
|
||
("정수장" in text or "계통" in text or "유지용수" in text)
|
||
and re.search(r"D[\s,.=]*\d", text) is not None
|
||
and not is_v
|
||
)
|
||
return is_p, is_v, is_dest
|
||
|
||
def _parse_dia(text: str) -> float | None:
|
||
m = re.search(r"(?:Φ|%%[cC]|\bD[\s=:]*)(\d[\d,.]*)", text)
|
||
if not m:
|
||
return None
|
||
raw_d = m.group(1).replace(",", "").rstrip(".")
|
||
try:
|
||
val = float(raw_d)
|
||
return val / 1000.0 if val >= 10 else val
|
||
except ValueError:
|
||
return None
|
||
|
||
for j, (idx, name, lx, ly, own_txt) in enumerate(m_indices):
|
||
# 1차: own_txt만으로 분류
|
||
is_pipe_kw, is_valve, is_dest_pipe = _classify(own_txt)
|
||
diameter = _parse_dia(own_txt)
|
||
full_txt = own_txt
|
||
|
||
# 2차: own에 분류·직경 정보가 부족할 때만 proximity로 보강
|
||
need_class = not (is_pipe_kw or is_valve or is_dest_pipe)
|
||
need_dia = diameter is None
|
||
if need_class or need_dia:
|
||
near_parts: list[str] = []
|
||
r2 = proximity_radius_m * proximity_radius_m
|
||
for k, (txt2, x2, y2) in enumerate(items):
|
||
if k == idx or k in m_idx_set:
|
||
continue
|
||
d2 = (x2 - lx) ** 2 + (y2 - ly) ** 2
|
||
if d2 <= r2:
|
||
near_parts.append(txt2)
|
||
if near_parts:
|
||
near_txt = " ".join(near_parts).replace("\n", " ").strip()
|
||
if need_class:
|
||
np_pipe, np_valve, np_dest = _classify(near_txt)
|
||
is_pipe_kw = is_pipe_kw or np_pipe
|
||
is_valve = is_valve or np_valve
|
||
is_dest_pipe = is_dest_pipe or np_dest
|
||
if need_dia:
|
||
diameter = _parse_dia(near_txt)
|
||
full_txt = (own_txt + " | " + near_txt).strip()
|
||
|
||
if is_valve:
|
||
vtype = "GATE"
|
||
for vt, kws in VALVE_TYPE_KEYWORDS.items():
|
||
if any(kw in full_txt for kw in kws):
|
||
vtype = vt
|
||
break
|
||
valves.append(Valve(
|
||
index=j,
|
||
name=name,
|
||
valve_type=vtype,
|
||
center_x=0,
|
||
center_y=0,
|
||
elevation=0,
|
||
diameter=diameter or (0.5 if vtype == "BUTTERFLY" else 0.4),
|
||
label=full_txt[:60],
|
||
))
|
||
elif is_pipe_kw or is_dest_pipe:
|
||
pipe = Pipe(name=name)
|
||
pipe.diameter = diameter if diameter is not None else 0.8
|
||
pipes.append(pipe)
|
||
# 밸브도 파이프도 아니면 스킵 (펌프/사다리/덮개 등 비처리 항목)
|
||
|
||
return valves, pipes
|
||
|
||
def _finalize(self, params: ValveChamberParams):
|
||
"""파라미터 정리. MLEADER에서 정확한 위치를 못 얻은 항목만 폴백 배치.
|
||
|
||
상류 도수관(M-301)은 `has_inlet_branch=True`일 때 **분기 구조**로 교체:
|
||
단일 trunk → Y-branch(경사 2개) → 상단·하단 평행 관(chamber 진입).
|
||
도면(신설 제수변실.dxf) 실측: branch_spread ≈ 4.4m, 우측 하류 관로 ≈ 3.6m.
|
||
"""
|
||
# 위치(center_x,y) 또는 (start,end)가 (0,0,0)인 것만 자동 배치
|
||
if params.valves:
|
||
unset = [v for v in params.valves if v.center_x == 0 and v.center_y == 0]
|
||
n = len(unset)
|
||
for i, v in enumerate(unset):
|
||
t = (i + 0.5) / max(n, 1) - 0.5
|
||
v.center_x = t * params.chamber_width * 0.7
|
||
v.center_y = 0.0
|
||
v.elevation = params.bottom_el + 2.0
|
||
|
||
# --- 상류 도수관 분기 생성 / 단일 폴백 ---
|
||
if params.pipes:
|
||
main_idx = [i for i, p in enumerate(params.pipes)
|
||
if "도수관" in p.name or p.name == "M-301"]
|
||
main_pipes = [params.pipes[i] for i in main_idx]
|
||
orig_main = main_pipes[0] if main_pipes else None
|
||
# 기존 도수관 항목 제거 (분기든 단일이든 새로 생성)
|
||
if main_idx:
|
||
params.pipes = [p for j, p in enumerate(params.pipes) if j not in main_idx]
|
||
|
||
dia = orig_main.diameter if orig_main else params.main_conduit_diameter
|
||
z = (orig_main.elevation if orig_main and orig_main.elevation
|
||
else params.main_conduit_el or (params.bottom_el + 1.0))
|
||
half_w = params.chamber_width / 2.0
|
||
|
||
if params.has_inlet_branch and orig_main is not None:
|
||
import math as _m
|
||
spread_half = params.branch_spread_m / 2.0
|
||
angle = _m.radians(max(10.0, min(70.0, params.branch_angle_deg)))
|
||
# 분기 경사 길이: spread_half = L_branch * sin(angle) → L_branch = spread_half/sin
|
||
# X 진행: L_branch * cos(angle)
|
||
L_branch = spread_half / max(_m.sin(angle), 0.1)
|
||
dx_branch = L_branch * _m.cos(angle)
|
||
# chamber 좌측벽 X = -half_w
|
||
# 상단·하단 평행 관 길이 = upstream_pipe_length (분기점 이후 → chamber 벽)
|
||
x_branch_out = -half_w - 0.0 # 상단·하단이 chamber 벽에 도달하는 X
|
||
x_branch_in = x_branch_out - params.upstream_pipe_length # 분기가 합쳐지는 지점(X 방향 안쪽 끝)
|
||
x_merge = x_branch_in - dx_branch # Y축 0으로 모이는 분기점(상류측)
|
||
|
||
# 1) 상단 평행관 (chamber 좌측벽 → 분기 상단 끝)
|
||
params.pipes.append(Pipe(
|
||
name="M-301 상단",
|
||
diameter=dia,
|
||
start=(x_branch_in, spread_half, z),
|
||
end=(x_branch_out, spread_half, z),
|
||
elevation=z,
|
||
))
|
||
# 2) 하단 평행관
|
||
params.pipes.append(Pipe(
|
||
name="M-301 하단",
|
||
diameter=dia,
|
||
start=(x_branch_in, -spread_half, z),
|
||
end=(x_branch_out, -spread_half, z),
|
||
elevation=z,
|
||
))
|
||
# 3) 상단 경사 (분기점 → 상단 끝)
|
||
params.pipes.append(Pipe(
|
||
name="M-301 분기상경사",
|
||
diameter=dia,
|
||
start=(x_merge, 0.0, z),
|
||
end=(x_branch_in, spread_half, z),
|
||
elevation=z,
|
||
))
|
||
# 4) 하단 경사
|
||
params.pipes.append(Pipe(
|
||
name="M-301 분기하경사",
|
||
diameter=dia,
|
||
start=(x_merge, 0.0, z),
|
||
end=(x_branch_in, -spread_half, z),
|
||
elevation=z,
|
||
))
|
||
# 5) 단일 trunk (분기점 → 상류로 trunk_length)
|
||
params.pipes.append(Pipe(
|
||
name="M-301 도수관",
|
||
diameter=dia,
|
||
start=(x_merge - max(0.5, params.branch_trunk_length), 0.0, z),
|
||
end=(x_merge, 0.0, z),
|
||
elevation=z,
|
||
))
|
||
elif orig_main is not None and orig_main.start != (0.0, 0.0, 0.0):
|
||
# 분기 비활성 + 원본 pipe가 명시적 경로를 가졌을 때만 유지.
|
||
# (start/end가 origin인 seed는 폐기 — 도면에 없는 관로가 만들어지지 않게)
|
||
params.pipes.append(orig_main)
|
||
|
||
# 나머지 origin-only pipe는 방향별 폴백
|
||
for p in params.pipes:
|
||
if not (p.start == (0.0, 0.0, 0.0) and p.end == (0.0, 0.0, 0.0)):
|
||
continue
|
||
half_d = params.chamber_depth / 2 + params.external_pipe_length
|
||
z2 = params.bottom_el + 1.5
|
||
p.start = (0, -half_d, z2)
|
||
p.end = (0, half_d, z2)
|
||
p.elevation = z2
|
||
|
||
|
||
def parse_valve_chamber(paths: list[str]) -> ValveChamberParams:
|
||
return ValveChamberParser().parse(paths)
|
||
|
||
|
||
if __name__ == "__main__":
|
||
import sys
|
||
|
||
paths = sys.argv[1:] if len(sys.argv) > 1 else [
|
||
"SAMPLE_CAD/12996710-M43-002 신설 제수변실 설비 배치도.dxf",
|
||
]
|
||
p = parse_valve_chamber(paths)
|
||
print(p.summary())
|
||
print("\n밸브:")
|
||
for v in p.valves:
|
||
print(f" {v.name} [{VALVE_TYPES.get(v.valve_type, '?')}] {v.label}")
|
||
print("\n관로:")
|
||
for pipe in p.pipes:
|
||
print(f" {pipe.name} Φ{pipe.diameter*1000:.0f}mm")
|
||
print(f"\n바닥 EL: {p.floor_elevations}")
|