Add source code, design assets, and CAD samples

This commit is contained in:
2026-05-07 20:30:34 +09:00
parent 720858c7ae
commit 184185c635
49 changed files with 3407636 additions and 0 deletions

500
structure_vlm_feedback.py Normal file
View File

@@ -0,0 +1,500 @@
"""구조물 상세도면 ↔ 3D 빌드 결과 간 VLM(Gemini Vision) 피드백 루프.
흐름:
1. 상세 DXF를 평면 PNG로 렌더(ezdxf + matplotlib)
2. 빌드된 3D 메시를 top-down PNG로 렌더(PyVista off-screen)
3. 두 이미지 + 현재 파라미터 JSON을 Gemini Vision에 전달
4. 구조화된 JSON diff 수신 (missing/incorrect/excess)
5. diff를 파라미터에 머지(사용자 승인 후)
기존 Gemini 인프라(google.genai + Vertex AI gcp-key.json)를 그대로 재사용하며,
별도 API/SDK/결제선 없음. 모델은 기본 gemini-2.5-flash(저비용) → 필요 시 2.5-pro.
"""
from __future__ import annotations
import base64
import json
import os
import re
from dataclasses import asdict, fields, is_dataclass
from pathlib import Path
from typing import Any, Callable, Optional
import numpy as np
# ---------------------------------------------------------------------------
# 렌더링
# ---------------------------------------------------------------------------
def render_dxf_to_png(dxf_paths: list[str] | str,
output_path: str,
size: int = 1400,
dpi: int = 140,
bg: str = "white",
fg: str = "black") -> str:
"""상세 DXF를 matplotlib으로 렌더링해 PNG 저장.
Args:
dxf_paths: 단일 경로 또는 경로 리스트(첫 파일만 렌더)
output_path: 저장 경로
size: 결과 이미지 한 변 픽셀 (정사각)
dpi: matplotlib dpi
bg/fg: 배경/선 색
Returns:
output_path
"""
import matplotlib
matplotlib.use("Agg", force=True)
import matplotlib.pyplot as plt
import ezdxf
from ezdxf.addons.drawing import Frontend, RenderContext
from ezdxf.addons.drawing.matplotlib import MatplotlibBackend
from ezdxf.addons.drawing.config import Configuration
if isinstance(dxf_paths, (list, tuple)):
dxf_path = dxf_paths[0]
else:
dxf_path = dxf_paths
doc = ezdxf.readfile(dxf_path)
msp = doc.modelspace()
fig_in = size / dpi
fig, ax = plt.subplots(figsize=(fig_in, fig_in), dpi=dpi)
fig.patch.set_facecolor(bg)
ax.set_facecolor(bg)
ax.set_aspect("equal")
ax.set_axis_off()
ctx = RenderContext(doc)
try:
cfg = Configuration()
except Exception:
cfg = None
backend = MatplotlibBackend(ax)
frontend = Frontend(ctx, backend, config=cfg) if cfg else Frontend(ctx, backend)
frontend.draw_layout(msp, finalize=True)
fig.savefig(output_path, dpi=dpi, bbox_inches="tight",
facecolor=bg, pad_inches=0.1)
plt.close(fig)
return output_path
def render_meshes_topdown(meshes: list[tuple],
output_path: str,
size: int = 1400,
bg: str = "white") -> str:
"""빌드된 메시 리스트를 top-down(평면) 뷰로 렌더.
Args:
meshes: [(pv.PolyData, color, opacity), ...]
output_path: 저장 경로
size: 정사각 픽셀
bg: 배경색
Returns:
output_path
"""
import pyvista as pv
p = pv.Plotter(off_screen=True, window_size=(size, size))
p.set_background(bg)
for item in meshes:
try:
if len(item) >= 3:
mesh, color, opacity = item[0], item[1], item[2]
else:
mesh, color, opacity = item[0], item[1], 1.0
p.add_mesh(mesh, color=color, opacity=opacity,
show_edges=True, edge_color="#888888",
line_width=0.5, smooth_shading=False)
except Exception:
continue
p.enable_parallel_projection()
p.view_xy() # +Z에서 -Z 방향 내려다봄
p.camera.zoom(1.0)
try:
p.screenshot(output_path, transparent_background=False)
finally:
p.close()
return output_path
# ---------------------------------------------------------------------------
# 파라미터 직렬화
# ---------------------------------------------------------------------------
def params_to_dict(params: Any) -> dict:
"""dataclass / dict 객체를 JSON-직렬화 가능한 dict로 변환."""
if params is None:
return {}
if is_dataclass(params):
d = asdict(params)
elif isinstance(params, dict):
d = dict(params)
else:
# 일반 객체 속성 덤프
d = {k: v for k, v in vars(params).items() if not k.startswith("_")}
return _json_safe(d)
def _json_safe(obj):
if isinstance(obj, dict):
return {k: _json_safe(v) for k, v in obj.items()}
if isinstance(obj, (list, tuple)):
return [_json_safe(v) for v in obj]
if isinstance(obj, (np.floating, np.integer)):
return float(obj) if isinstance(obj, np.floating) else int(obj)
if isinstance(obj, np.ndarray):
return obj.tolist()
if isinstance(obj, (str, int, float, bool)) or obj is None:
return obj
return str(obj)
# ---------------------------------------------------------------------------
# Gemini 호출
# ---------------------------------------------------------------------------
_DIFF_SCHEMA_PROMPT = """당신은 기계설비 설계도면을 검토하는 엔지니어링 검증 도구입니다.
첨부된 이미지 2장:
image 1 = 원본 설계 도면 (DXF의 평면도 렌더)
image 2 = 현재 파서·빌더가 생성한 3D 모델의 top-down 뷰
현재 추출된 파라미터 JSON:
```json
{params_json}
```
구조물 유형: {structure_type}
두 이미지를 비교해 도면의 의도가 3D 모델에 얼마나 정확히 반영됐는지 평가하고,
차이(missing/incorrect/excess)를 다음 JSON 스키마로만 반환:
{{
"summary": "1-2문장 한국어 요약",
"match_score": 0.0~1.0 사이 실수 (도면 반영률, 1.0=완벽),
"param_updates": [
{{"path": "필드명 (예: chamber_width, bottom_el)",
"current": "현재 값",
"suggested": "도면에서 관찰한 값",
"reason": "한국어 근거 1문장"}}
],
"valves_missing": [
{{"name": "M-NNN 또는 설명", "x": float, "y": float,
"diameter_mm": int, "valve_type": "GATE|BUTTERFLY|CHECK|BALL",
"reason": "왜 누락으로 판단했는지"}}
],
"valves_incorrect": [
{{"name": "M-NNN", "field": "수정할 필드",
"current": "현재 값", "suggested": "제안 값", "reason": "..."}}
],
"pipes_missing": [
{{"name": "식별명", "diameter_mm": int,
"start": [x, y, z], "end": [x, y, z],
"reason": "..."}}
],
"pipes_incorrect": [
{{"name": "식별명", "field": "...", "current": "...", "suggested": "...", "reason": "..."}}
],
"excess_notes": ["모델에 있지만 도면에 없는 요소 설명"]
}}
주의:
- 확실하지 않은 항목은 제안하지 마세요(false positive 최소화).
- 좌표·직경은 chamber/구조물 로컬 좌표계 기준 meter 또는 mm를 파라미터 JSON 단위에 맞추세요.
- 단순 렌더 품질 차이(색·조명)는 무시하세요. 도면 의도만 비교.
- JSON 외 어떠한 텍스트도 반환하지 마세요.
"""
def _read_bytes(path: str) -> bytes:
with open(path, "rb") as f:
return f.read()
def request_structure_diff(client,
drawing_png_path: str,
render_png_path: str,
params_dict: dict,
structure_type: str = "valve_chamber",
model: str = "gemini-2.5-flash",
log_fn: Callable[[str], None] = print,
timeout_s: float = 60.0) -> dict:
"""Gemini Vision에 도면+렌더+파라미터 전달해 JSON diff 수신.
Args:
client: google.genai.Client 인스턴스 (caller가 인증 설정)
drawing_png_path: 원본 도면 PNG
render_png_path: 3D top-down PNG
params_dict: 현재 파라미터 (JSON-safe)
structure_type: 구조물 종류 (프롬프트 컨텍스트)
model: Gemini 모델명 (기본 2.5-flash)
log_fn: 로그 callback
timeout_s: 호출 타임아웃 (실제로는 SDK 설정에 따름)
Returns:
diff dict (스키마는 _DIFF_SCHEMA_PROMPT 참고)
Raises:
RuntimeError: 호출 실패 또는 JSON 파싱 실패
"""
try:
from google.genai import types as gtypes
except ImportError as e:
raise RuntimeError(f"google.genai SDK 필요: {e}")
params_json = json.dumps(params_dict, ensure_ascii=False, indent=2)
prompt = _DIFF_SCHEMA_PROMPT.format(
params_json=params_json,
structure_type=structure_type,
)
drawing_bytes = _read_bytes(drawing_png_path)
render_bytes = _read_bytes(render_png_path)
parts = [
gtypes.Part.from_bytes(data=drawing_bytes, mime_type="image/png"),
gtypes.Part.from_bytes(data=render_bytes, mime_type="image/png"),
gtypes.Part.from_text(text=prompt),
]
log_fn(f" [VLM] Gemini 호출: model={model}, drawing={len(drawing_bytes)//1024}KB, render={len(render_bytes)//1024}KB")
try:
resp = client.models.generate_content(
model=model,
contents=parts,
config=gtypes.GenerateContentConfig(
response_mime_type="application/json",
temperature=0.1,
),
)
except Exception as e:
raise RuntimeError(f"Gemini 호출 실패: {e}")
text = getattr(resp, "text", None) or ""
if not text:
# 일부 SDK 버전은 candidates[0].content.parts[0].text 사용
try:
text = resp.candidates[0].content.parts[0].text
except Exception:
text = ""
if not text:
raise RuntimeError("Gemini 응답이 비어있습니다.")
# 혹시 모를 코드블록 제거
text = re.sub(r"^```(?:json)?\s*", "", text.strip())
text = re.sub(r"\s*```$", "", text)
try:
diff = json.loads(text)
except json.JSONDecodeError as e:
# 부분 복구 시도 (첫 {부터 마지막 }까지)
m = re.search(r"\{.*\}", text, re.DOTALL)
if m:
try:
diff = json.loads(m.group(0))
except Exception:
raise RuntimeError(f"JSON 파싱 실패: {e}\n원문: {text[:300]}")
else:
raise RuntimeError(f"JSON 파싱 실패: {e}\n원문: {text[:300]}")
log_fn(f" [VLM] 응답 수신: match_score={diff.get('match_score', '?')} "
f"updates={len(diff.get('param_updates', []))} "
f"v_missing={len(diff.get('valves_missing', []))} "
f"p_missing={len(diff.get('pipes_missing', []))}")
return diff
# ---------------------------------------------------------------------------
# diff 적용
# ---------------------------------------------------------------------------
def apply_diff_to_params(params: Any,
diff: dict,
selections: Optional[dict] = None,
log_fn: Callable[[str], None] = print) -> dict:
"""diff를 params 객체에 in-place 적용.
Args:
params: dataclass 인스턴스 (ValveChamberParams 등)
diff: request_structure_diff 반환값
selections: {"param_updates": [bool, ...], "valves_missing": [bool, ...],
"pipes_missing": [bool, ...]} — 사용자 체크박스. None이면 모두 적용.
log_fn: 로그 callback
Returns:
{"applied": int, "skipped": int, "errors": [str, ...]}
"""
sel = selections or {}
applied = 0
errors: list[str] = []
# 1) 스칼라/벡터 파라미터 업데이트
for i, upd in enumerate(diff.get("param_updates", []) or []):
if sel.get("param_updates") is not None and not sel["param_updates"][i]:
continue
path = upd.get("path", "").strip()
suggested = upd.get("suggested")
if not path:
continue
try:
_set_by_path(params, path, suggested)
applied += 1
log_fn(f" [VLM apply] {path} = {suggested!r}")
except Exception as e:
errors.append(f"{path}: {e}")
# 2) Valve 추가
if hasattr(params, "valves") and isinstance(params.valves, list):
try:
from valve_chamber_parser import Valve
except ImportError:
Valve = None
if Valve is not None:
for i, v in enumerate(diff.get("valves_missing", []) or []):
if sel.get("valves_missing") is not None and not sel["valves_missing"][i]:
continue
try:
dia_mm = float(v.get("diameter_mm", 400))
params.valves.append(Valve(
index=len(params.valves),
name=v.get("name", f"V+{i+1}"),
valve_type=v.get("valve_type", "GATE"),
center_x=float(v.get("x", 0.0)),
center_y=float(v.get("y", 0.0)),
elevation=float(getattr(params, "bottom_el", 0.0)) + 1.5,
diameter=dia_mm / 1000.0,
label=(v.get("name", "") + " [VLM 추가]")[:60],
))
applied += 1
log_fn(f" [VLM apply] +valve {v.get('name')}")
except Exception as e:
errors.append(f"valve_missing[{i}]: {e}")
# 3) Pipe 추가
if hasattr(params, "pipes") and isinstance(params.pipes, list):
try:
from valve_chamber_parser import Pipe
except ImportError:
Pipe = None
if Pipe is not None:
for i, pp in enumerate(diff.get("pipes_missing", []) or []):
if sel.get("pipes_missing") is not None and not sel["pipes_missing"][i]:
continue
try:
dia_mm = float(pp.get("diameter_mm", 800))
start = tuple(pp.get("start", (0.0, 0.0, 0.0)))
end = tuple(pp.get("end", (0.0, 0.0, 0.0)))
params.pipes.append(Pipe(
name=pp.get("name", f"P+{i+1}") + " [VLM]",
diameter=dia_mm / 1000.0,
start=start,
end=end,
elevation=start[2] if len(start) > 2 else 0.0,
))
applied += 1
log_fn(f" [VLM apply] +pipe {pp.get('name')}")
except Exception as e:
errors.append(f"pipe_missing[{i}]: {e}")
return {"applied": applied, "errors": errors}
def _set_by_path(obj: Any, path: str, value: Any):
"""단순 속성 경로로 값 설정 (현재 PoC는 평면 필드만 지원)."""
# a.b.c[0] 형식은 최소화 — 평면 필드만
if "." in path or "[" in path:
# 차후 확장 포인트: 지금은 경고만 기록
raise ValueError(f"중첩 경로는 미지원: {path}")
if not hasattr(obj, path):
raise AttributeError(f"속성 없음: {path}")
current = getattr(obj, path)
# 타입 강제 변환 (숫자/문자열만)
if isinstance(current, bool):
new_val = bool(value)
elif isinstance(current, int):
new_val = int(float(value))
elif isinstance(current, float):
new_val = float(value)
elif isinstance(current, str):
new_val = str(value)
else:
new_val = value
setattr(obj, path, new_val)
# ---------------------------------------------------------------------------
# 클라이언트 생성 (scanvas_maker의 패턴 재사용)
# ---------------------------------------------------------------------------
def build_genai_client(project: Optional[str] = None,
location: str = "global",
use_vertex: bool = True,
api_key: Optional[str] = None,
log_fn: Callable[[str], None] = print):
"""Gemini 클라이언트 생성. Vertex AI 우선, 실패 시 API Key 폴백.
scanvas_maker의 AI 렌더링 경로와 동일 인증(gcp-key.json 또는 API Key)을 사용.
"""
try:
from google import genai
except ImportError as e:
raise RuntimeError(f"google-genai SDK 필요: pip install google-genai ({e})")
if use_vertex:
proj = project or os.environ.get("GCP_PROJECT_ID", "")
if proj:
try:
client = genai.Client(vertexai=True, project=proj, location=location)
log_fn(f" [VLM] Vertex AI client: project={proj}, location={location}")
return client
except Exception as e:
log_fn(f" [VLM] Vertex AI 실패 → API Key 폴백: {e}")
key = api_key or os.environ.get("GOOGLE_API_KEY") or os.environ.get("GEMINI_API_KEY", "")
if not key:
raise RuntimeError("Gemini 인증 정보 없음 (Vertex project 또는 API key 필요)")
client = genai.Client(api_key=key)
log_fn(" [VLM] API Key client")
return client
# ---------------------------------------------------------------------------
# 편의 함수: 전체 루프 1회 실행
# ---------------------------------------------------------------------------
def run_feedback_once(params: Any,
meshes: list,
dxf_paths: list[str],
client,
structure_type: str = "valve_chamber",
model: str = "gemini-2.5-flash",
work_dir: str | Path = "cache/vlm",
log_fn: Callable[[str], None] = print) -> dict:
"""1회 피드백 사이클 실행: 렌더 2장 + Gemini 호출. diff 반환만.
apply는 호출자가 사용자 승인 후 apply_diff_to_params 호출.
"""
work_dir = Path(work_dir)
work_dir.mkdir(parents=True, exist_ok=True)
drawing_png = str(work_dir / "drawing.png")
render_png = str(work_dir / "render_topdown.png")
log_fn(" [VLM] 도면 PNG 렌더링...")
render_dxf_to_png(dxf_paths, drawing_png)
log_fn(f" [VLM] 3D top-down 렌더링...")
render_meshes_topdown(meshes, render_png)
params_dict = params_to_dict(params)
diff = request_structure_diff(
client, drawing_png, render_png, params_dict,
structure_type=structure_type, model=model, log_fn=log_fn,
)
diff["_artifacts"] = {"drawing_png": drawing_png, "render_png": render_png}
return diff