Import S-CANVAS source + iter=1~7 lint cleanup

S-CANVAS (Saman Corp.) — DXF + DEM + AI 기반 3D 조감도 생성 엔진.
~24k LOC Python (scanvas_maker.py 7072 LOC GUI + 구조물 파서/빌더 다수).

이 커밋은 7-iter cleanup이 적용된 상태로 import:
- F821 8 + B023 6: 비동기 lambda + except/loop 변수 캡처 NameError
  (Py3.13에서 reproduce 확인된 진짜 버그)
- RUF012 4 + RUF013 1: ClassVar / implicit Optional 명시화
- F811/B905/B904/F401/F841/W293/F541/UP/SIM/RUF/PLR 700+ cleanup/modernization

신규 파일:
- ruff.toml: target=py313, Korean unicode/저자 스타일/도메인 복잡도 무력화
- requirements-py313.txt: pyproj>=3.7, scipy>=1.14, numpy>=2.0.2 (Py3.13 wheel)
- .gitignore: gcp-key.json, 캐시, 백업, 생성 이미지 제외

검증: ruff 0 errors, py_compile 0 errors, import 33/33 OK on Py3.13.13.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
2026-05-08 10:29:08 +09:00
parent 53d8b53c2f
commit b9342f6726
92 changed files with 3413501 additions and 0 deletions

136
harness/logger.py Normal file
View File

@@ -0,0 +1,136 @@
"""로거 - SQLite DB + structlog 기반 작업 이력 추적."""
from __future__ import annotations
import logging
import sys
from datetime import datetime
from pathlib import Path
import structlog
from sqlalchemy import Column, DateTime, Float, Integer, String, Text, create_engine
from sqlalchemy.orm import DeclarativeBase, Session, sessionmaker
# ──────────────────────────── ORM 모델 ────────────────────────────
class Base(DeclarativeBase):
pass
class JobRecord(Base):
"""조감도 생성 작업 1건의 이력 레코드."""
__tablename__ = "jobs"
id = Column(Integer, primary_key=True, autoincrement=True)
dxf_path = Column(String(512), nullable=False)
dxf_hash = Column(String(32))
timestamp = Column(DateTime, default=datetime.utcnow)
seed = Column(Integer)
prompt_version = Column(String(32))
prompt_hash = Column(String(32))
status = Column(String(16), default="pending") # pending / running / done / failed
output_path = Column(String(512))
quality_score = Column(Float)
error_message = Column(Text)
latency_ms = Column(Float)
# ──────────────────────────── DB 세션 ────────────────────────────
_engine = None
_SessionFactory = None
def init_db(db_path: str | Path = "cad_aerial_gen.db"):
global _engine, _SessionFactory # noqa: PLW0603 (module-level singleton init)
_engine = create_engine(f"sqlite:///{db_path}", echo=False)
Base.metadata.create_all(_engine)
_SessionFactory = sessionmaker(bind=_engine)
def get_db_session() -> Session:
if _SessionFactory is None:
init_db()
return _SessionFactory()
# ──────────────────────────── structlog 설정 ────────────────────────────
def setup_logging(log_file: Path | None = None, level: str = "INFO"):
"""콘솔 + 파일 동시 로깅을 설정한다."""
handlers = [logging.StreamHandler(sys.stdout)]
if log_file:
log_file.parent.mkdir(parents=True, exist_ok=True)
handlers.append(logging.FileHandler(str(log_file), encoding="utf-8"))
logging.basicConfig(
format="%(message)s",
level=getattr(logging, level.upper(), logging.INFO),
handlers=handlers,
)
structlog.configure(
processors=[
structlog.contextvars.merge_contextvars,
structlog.processors.add_log_level,
structlog.processors.TimeStamper(fmt="%Y-%m-%d %H:%M:%S"),
structlog.dev.ConsoleRenderer(),
],
wrapper_class=structlog.make_filtering_bound_logger(
getattr(logging, level.upper(), logging.INFO)
),
logger_factory=structlog.PrintLoggerFactory(),
)
def get_logger(name: str = "cad_aerial_gen"):
return structlog.get_logger(name)
# ──────────────────────────── 작업 이력 헬퍼 ────────────────────────────
class JobLogger:
"""작업 이력 CRUD 래퍼."""
def create_job(self, db: Session, dxf_path: str, dxf_hash: str = "") -> JobRecord:
record = JobRecord(dxf_path=dxf_path, dxf_hash=dxf_hash, status="pending")
db.add(record)
db.commit()
db.refresh(record)
return record
def start_job(self, db: Session, job_id: int, seed: int, prompt_version: str, prompt_hash: str):
record = db.query(JobRecord).filter_by(id=job_id).first()
if record:
record.status = "running"
record.seed = seed
record.prompt_version = prompt_version
record.prompt_hash = prompt_hash
db.commit()
def complete_job(
self,
db: Session,
job_id: int,
output_path: str,
quality_score: float,
latency_ms: float,
):
record = db.query(JobRecord).filter_by(id=job_id).first()
if record:
record.status = "done"
record.output_path = output_path
record.quality_score = quality_score
record.latency_ms = latency_ms
db.commit()
def fail_job(self, db: Session, job_id: int, error: str):
record = db.query(JobRecord).filter_by(id=job_id).first()
if record:
record.status = "failed"
record.error_message = error
db.commit()
def list_jobs(self, db: Session, limit: int = 50):
return db.query(JobRecord).order_by(JobRecord.id.desc()).limit(limit).all()