"""로거 - SQLite DB + structlog 기반 작업 이력 추적.""" from __future__ import annotations import logging import sys from datetime import datetime from pathlib import Path import structlog from sqlalchemy import Column, DateTime, Float, Integer, String, Text, create_engine from sqlalchemy.orm import DeclarativeBase, Session, sessionmaker # ──────────────────────────── ORM 모델 ──────────────────────────── class Base(DeclarativeBase): pass class JobRecord(Base): """조감도 생성 작업 1건의 이력 레코드.""" __tablename__ = "jobs" id = Column(Integer, primary_key=True, autoincrement=True) dxf_path = Column(String(512), nullable=False) dxf_hash = Column(String(32)) timestamp = Column(DateTime, default=datetime.utcnow) seed = Column(Integer) prompt_version = Column(String(32)) prompt_hash = Column(String(32)) status = Column(String(16), default="pending") # pending / running / done / failed output_path = Column(String(512)) quality_score = Column(Float) error_message = Column(Text) latency_ms = Column(Float) # ──────────────────────────── DB 세션 ──────────────────────────── _engine = None _SessionFactory = None def init_db(db_path: str | Path = "cad_aerial_gen.db"): global _engine, _SessionFactory # noqa: PLW0603 (module-level singleton init) _engine = create_engine(f"sqlite:///{db_path}", echo=False) Base.metadata.create_all(_engine) _SessionFactory = sessionmaker(bind=_engine) def get_db_session() -> Session: if _SessionFactory is None: init_db() return _SessionFactory() # ──────────────────────────── structlog 설정 ──────────────────────────── def setup_logging(log_file: Path | None = None, level: str = "INFO"): """콘솔 + 파일 동시 로깅을 설정한다.""" handlers = [logging.StreamHandler(sys.stdout)] if log_file: log_file.parent.mkdir(parents=True, exist_ok=True) handlers.append(logging.FileHandler(str(log_file), encoding="utf-8")) logging.basicConfig( format="%(message)s", level=getattr(logging, level.upper(), logging.INFO), handlers=handlers, ) structlog.configure( processors=[ structlog.contextvars.merge_contextvars, structlog.processors.add_log_level, structlog.processors.TimeStamper(fmt="%Y-%m-%d %H:%M:%S"), structlog.dev.ConsoleRenderer(), ], wrapper_class=structlog.make_filtering_bound_logger( getattr(logging, level.upper(), logging.INFO) ), logger_factory=structlog.PrintLoggerFactory(), ) def get_logger(name: str = "cad_aerial_gen"): return structlog.get_logger(name) # ──────────────────────────── 작업 이력 헬퍼 ──────────────────────────── class JobLogger: """작업 이력 CRUD 래퍼.""" def create_job(self, db: Session, dxf_path: str, dxf_hash: str = "") -> JobRecord: record = JobRecord(dxf_path=dxf_path, dxf_hash=dxf_hash, status="pending") db.add(record) db.commit() db.refresh(record) return record def start_job(self, db: Session, job_id: int, seed: int, prompt_version: str, prompt_hash: str): record = db.query(JobRecord).filter_by(id=job_id).first() if record: record.status = "running" record.seed = seed record.prompt_version = prompt_version record.prompt_hash = prompt_hash db.commit() def complete_job( self, db: Session, job_id: int, output_path: str, quality_score: float, latency_ms: float, ): record = db.query(JobRecord).filter_by(id=job_id).first() if record: record.status = "done" record.output_path = output_path record.quality_score = quality_score record.latency_ms = latency_ms db.commit() def fail_job(self, db: Session, job_id: int, error: str): record = db.query(JobRecord).filter_by(id=job_id).first() if record: record.status = "failed" record.error_message = error db.commit() def list_jobs(self, db: Session, limit: int = 50): return db.query(JobRecord).order_by(JobRecord.id.desc()).limit(limit).all()