Files
s-canvas/_unused/REF_BY_SEOK/logger.py
HYUNJUNGLEE b9342f6726 Import S-CANVAS source + iter=1~7 lint cleanup
S-CANVAS (Saman Corp.) — DXF + DEM + AI 기반 3D 조감도 생성 엔진.
~24k LOC Python (scanvas_maker.py 7072 LOC GUI + 구조물 파서/빌더 다수).

이 커밋은 7-iter cleanup이 적용된 상태로 import:
- F821 8 + B023 6: 비동기 lambda + except/loop 변수 캡처 NameError
  (Py3.13에서 reproduce 확인된 진짜 버그)
- RUF012 4 + RUF013 1: ClassVar / implicit Optional 명시화
- F811/B905/B904/F401/F841/W293/F541/UP/SIM/RUF/PLR 700+ cleanup/modernization

신규 파일:
- ruff.toml: target=py313, Korean unicode/저자 스타일/도메인 복잡도 무력화
- requirements-py313.txt: pyproj>=3.7, scipy>=1.14, numpy>=2.0.2 (Py3.13 wheel)
- .gitignore: gcp-key.json, 캐시, 백업, 생성 이미지 제외

검증: ruff 0 errors, py_compile 0 errors, import 33/33 OK on Py3.13.13.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-08 10:29:08 +09:00

138 lines
4.7 KiB
Python

"""로거 - SQLite DB + structlog 기반 작업 이력 추적."""
from __future__ import annotations
import logging
import sys
from datetime import datetime
from pathlib import Path
from typing import Optional
import structlog
from sqlalchemy import Column, DateTime, Float, Integer, String, Text, create_engine
from sqlalchemy.orm import DeclarativeBase, Session, sessionmaker
# ──────────────────────────── ORM 모델 ────────────────────────────
class Base(DeclarativeBase):
pass
class JobRecord(Base):
"""조감도 생성 작업 1건의 이력 레코드."""
__tablename__ = "jobs"
id = Column(Integer, primary_key=True, autoincrement=True)
dxf_path = Column(String(512), nullable=False)
dxf_hash = Column(String(32))
timestamp = Column(DateTime, default=datetime.utcnow)
seed = Column(Integer)
prompt_version = Column(String(32))
prompt_hash = Column(String(32))
status = Column(String(16), default="pending") # pending / running / done / failed
output_path = Column(String(512))
quality_score = Column(Float)
error_message = Column(Text)
latency_ms = Column(Float)
# ──────────────────────────── DB 세션 ────────────────────────────
_engine = None
_SessionFactory = None
def init_db(db_path: str | Path = "cad_aerial_gen.db"):
global _engine, _SessionFactory
_engine = create_engine(f"sqlite:///{db_path}", echo=False)
Base.metadata.create_all(_engine)
_SessionFactory = sessionmaker(bind=_engine)
def get_db_session() -> Session:
if _SessionFactory is None:
init_db()
return _SessionFactory()
# ──────────────────────────── structlog 설정 ────────────────────────────
def setup_logging(log_file: Optional[Path] = None, level: str = "INFO"):
"""콘솔 + 파일 동시 로깅을 설정한다."""
handlers = [logging.StreamHandler(sys.stdout)]
if log_file:
log_file.parent.mkdir(parents=True, exist_ok=True)
handlers.append(logging.FileHandler(str(log_file), encoding="utf-8"))
logging.basicConfig(
format="%(message)s",
level=getattr(logging, level.upper(), logging.INFO),
handlers=handlers,
)
structlog.configure(
processors=[
structlog.contextvars.merge_contextvars,
structlog.processors.add_log_level,
structlog.processors.TimeStamper(fmt="%Y-%m-%d %H:%M:%S"),
structlog.dev.ConsoleRenderer(),
],
wrapper_class=structlog.make_filtering_bound_logger(
getattr(logging, level.upper(), logging.INFO)
),
logger_factory=structlog.PrintLoggerFactory(),
)
def get_logger(name: str = "cad_aerial_gen"):
return structlog.get_logger(name)
# ──────────────────────────── 작업 이력 헬퍼 ────────────────────────────
class JobLogger:
"""작업 이력 CRUD 래퍼."""
def create_job(self, db: Session, dxf_path: str, dxf_hash: str = "") -> JobRecord:
record = JobRecord(dxf_path=dxf_path, dxf_hash=dxf_hash, status="pending")
db.add(record)
db.commit()
db.refresh(record)
return record
def start_job(self, db: Session, job_id: int, seed: int, prompt_version: str, prompt_hash: str):
record = db.query(JobRecord).filter_by(id=job_id).first()
if record:
record.status = "running"
record.seed = seed
record.prompt_version = prompt_version
record.prompt_hash = prompt_hash
db.commit()
def complete_job(
self,
db: Session,
job_id: int,
output_path: str,
quality_score: float,
latency_ms: float,
):
record = db.query(JobRecord).filter_by(id=job_id).first()
if record:
record.status = "done"
record.output_path = output_path
record.quality_score = quality_score
record.latency_ms = latency_ms
db.commit()
def fail_job(self, db: Session, job_id: int, error: str):
record = db.query(JobRecord).filter_by(id=job_id).first()
if record:
record.status = "failed"
record.error_message = error
db.commit()
def list_jobs(self, db: Session, limit: int = 50):
return db.query(JobRecord).order_by(JobRecord.id.desc()).limit(limit).all()