refactor: improve db ops visibility and split runtime helpers

This commit is contained in:
hyunho
2026-04-01 16:41:52 +09:00
parent 1e82572e15
commit 57d9f630bc
11 changed files with 947 additions and 496 deletions

View File

@@ -0,0 +1,497 @@
from __future__ import annotations
from datetime import date, datetime
from decimal import Decimal
from fastapi import HTTPException
from .db import get_conn
DB_STATUS_TABLES = [
{
"table_ref": "public.members",
"label": "구성원 마스터",
"domain": "organization",
"timestamp_column": "updated_at",
"related_views": ["조직 현황", "자리배치도"],
"description": "조직/구성원 화면의 기준이 되는 현재 인원 마스터",
},
{
"table_ref": "public.member_versions",
"label": "구성원 이력",
"domain": "history",
"timestamp_column": "created_at",
"related_views": ["조직 현황", "이력 비교"],
"description": "as-of 조회와 변경 이력을 위한 시점 버전",
},
{
"table_ref": "public.seat_maps",
"label": "자리배치도 도면",
"domain": "seatmap",
"timestamp_column": "updated_at",
"related_views": ["자리배치도"],
"description": "오피스별 도면 메타데이터와 활성 상태",
},
{
"table_ref": "public.seat_positions",
"label": "현재 좌석 배치",
"domain": "seatmap",
"timestamp_column": "updated_at",
"related_views": ["자리배치도"],
"description": "현재 인원의 실제 배치 좌표/슬롯 연결",
},
{
"table_ref": "public.seat_assignment_versions",
"label": "좌석 배치 이력",
"domain": "history",
"timestamp_column": "created_at",
"related_views": ["자리배치도", "이력 비교"],
"description": "자리 이동 이력과 시점 조회용 배치 버전",
},
{
"table_ref": "public.integration_import_batches",
"label": "원본 업로드 배치",
"domain": "integration",
"timestamp_column": "imported_at",
"related_views": ["프로젝트별 분석", "팀/개인별 분석", "조직 현황"],
"description": "원본 파일 적재 단위와 최근 import 기록",
},
{
"table_ref": "public.integration_projects",
"label": "통합 프로젝트 표준화",
"domain": "integration",
"timestamp_column": "updated_at",
"related_views": ["프로젝트별 분석", "팀/개인별 분석"],
"description": "프로젝트 코드/이름/카테고리 정규화 결과",
},
{
"table_ref": "public.integration_work_logs",
"label": "근무 로그 표준화",
"domain": "integration",
"timestamp_column": "updated_at",
"related_views": ["팀/개인별 분석"],
"description": "MH workbook 기준 일자별 근무 로그 본체",
},
{
"table_ref": "public.integration_work_log_segments",
"label": "근무 로그 세그먼트",
"domain": "integration",
"timestamp_column": "created_at",
"related_views": ["팀/개인별 분석"],
"description": "근무 로그를 프로젝트/활동 기준으로 분해한 상세 세그먼트",
},
{
"table_ref": "public.integration_vouchers",
"label": "전표 표준화",
"domain": "integration",
"timestamp_column": "created_at",
"related_views": ["프로젝트별 분석"],
"description": "payment CSV 기준 프로젝트별 수입/지출 전표",
},
{
"table_ref": "public.integration_binary_sources",
"label": "바이너리 원본 보관",
"domain": "integration",
"timestamp_column": "imported_at",
"related_views": ["사업관리대장"],
"description": "엑셀/바이너리 원본을 DB에 보관하는 저장소",
},
{
"table_ref": "auth.users",
"label": "인증 사용자",
"domain": "auth",
"timestamp_column": "updated_at",
"related_views": ["로그인", "권한"],
"description": "로그인 계정, role, 활성 상태",
},
{
"table_ref": "auth.sessions",
"label": "인증 세션",
"domain": "auth",
"timestamp_column": "created_at",
"related_views": ["로그인", "권한"],
"description": "현재/과거 로그인 세션과 만료 상태",
},
{
"table_ref": "auth.login_audit_logs",
"label": "로그인 감사 로그",
"domain": "auth",
"timestamp_column": "created_at",
"related_views": ["로그인", "권한"],
"description": "로그인 성공/실패 기록",
},
]
DB_STATUS_TABLE_META = {str(item["table_ref"]): item for item in DB_STATUS_TABLES}
DB_STATUS_TABLE_GROUPS = {
"public.members": "유지",
"public.member_versions": "유지",
"public.seat_maps": "유지",
"public.seat_positions": "유지",
"public.seat_slots": "유지",
"public.seat_assignment_versions": "유지",
"public.history_revisions": "유지",
"public.integration_import_batches": "유지",
"public.integration_projects": "유지",
"public.integration_work_logs": "유지",
"public.integration_work_log_segments": "유지",
"public.integration_vouchers": "유지",
"public.integration_binary_sources": "유지",
"auth.users": "유지",
"auth.sessions": "유지",
"auth.login_audit_logs": "유지",
"public.member_overrides": "주의",
"public.member_retirements": "주의",
"public.member_aliases": "주의",
"public.integration_project_aliases": "주의",
"public.integration_project_category_mappings": "주의",
"public.integration_project_pm_assignments": "주의",
"public.integration_raw_organization_rows": "원본·추적",
"public.integration_raw_mh_rows": "원본·추적",
"public.integration_raw_mh_pm_rows": "원본·추적",
"public.integration_raw_payment_rows": "원본·추적",
}
DB_STATUS_PRODUCT_GROUPS = {
"탭 데이터": [
"public.members",
"public.seat_maps",
"public.seat_slots",
"public.seat_positions",
"public.integration_projects",
"public.integration_work_logs",
"public.integration_work_log_segments",
"public.integration_vouchers",
"public.integration_binary_sources",
],
"로그인·권한": [
"auth.users",
"auth.sessions",
"auth.login_audit_logs",
],
"히스토리": [
"public.history_revisions",
"public.member_versions",
"public.seat_assignment_versions",
],
"로우데이터·적재": [
"public.integration_import_batches",
"public.integration_raw_organization_rows",
"public.integration_raw_mh_rows",
"public.integration_raw_mh_pm_rows",
"public.integration_raw_payment_rows",
],
"보정·보조": [
"public.member_overrides",
"public.member_retirements",
"public.member_aliases",
"public.integration_project_aliases",
"public.integration_project_category_mappings",
"public.integration_project_pm_assignments",
],
}
DB_STATUS_SCREEN_MAP = [
{
"screen": "조직 현황",
"tables": [
"public.members",
"public.member_overrides",
"public.member_retirements",
"public.member_aliases",
"public.member_versions",
"public.history_revisions",
],
"write_flow": "원본 조직 데이터 import 후 members 계열을 갱신하고, 수정/이력 기능은 revision 기반으로 누적합니다.",
},
{
"screen": "자리배치도",
"tables": [
"public.seat_maps",
"public.seat_slots",
"public.seat_positions",
"public.seat_assignment_versions",
"public.history_revisions",
"public.members",
],
"write_flow": "고정 오피스 도면과 현재 좌석 배치를 읽고, 저장 시 현재 배치와 배치 이력을 함께 기록합니다.",
},
{
"screen": "프로젝트별 분석",
"tables": [
"public.integration_import_batches",
"public.integration_projects",
"public.integration_vouchers",
"public.integration_project_aliases",
"public.integration_project_category_mappings",
"public.integration_project_pm_assignments",
],
"write_flow": "payment 원본 import 결과와 프로젝트 보정 테이블을 조합해 프로젝트 집계를 만듭니다.",
},
{
"screen": "팀/개인별 분석",
"tables": [
"public.integration_import_batches",
"public.integration_projects",
"public.integration_work_logs",
"public.integration_work_log_segments",
"public.integration_raw_mh_rows",
"public.integration_raw_mh_pm_rows",
],
"write_flow": "MH 원본 row를 적재한 뒤 표준화 로그와 세그먼트로 분해해 화면 집계에 사용합니다.",
},
{
"screen": "사업관리대장",
"tables": [
"public.integration_binary_sources",
],
"write_flow": "현재는 기본 바이너리 원본 보관 상태만 DB에 유지하며, 상세 계산 규칙은 별도 기준 정렬이 필요합니다.",
},
{
"screen": "로그인 / 권한",
"tables": [
"auth.users",
"auth.sessions",
"auth.login_audit_logs",
],
"write_flow": "사용자 계정, 세션, 로그인 감사 로그를 auth 스키마에서 분리 운영합니다.",
},
]
def make_json_safe(value: object) -> object:
if isinstance(value, datetime):
return value.isoformat()
if isinstance(value, date):
return value.isoformat()
if isinstance(value, Decimal):
return float(value)
if isinstance(value, bytes):
return f"<{len(value)} bytes>"
if isinstance(value, dict):
return {str(key): make_json_safe(val) for key, val in value.items()}
if isinstance(value, list):
return [make_json_safe(item) for item in value]
return value
def fetch_db_status_snapshot() -> dict[str, object]:
table_items: list[dict[str, object]] = []
with get_conn() as conn:
with conn.cursor() as cur:
cur.execute(
"""
SELECT schemaname, tablename
FROM pg_tables
WHERE schemaname IN ('public', 'auth')
ORDER BY schemaname, tablename
"""
)
all_tables = cur.fetchall()
for row in all_tables:
schema_name = str(row["schemaname"])
table_name = str(row["tablename"])
table_ref = f"{schema_name}.{table_name}"
spec = DB_STATUS_TABLE_META.get(table_ref, {})
cur.execute("SELECT to_regclass(%s) IS NOT NULL AS table_exists", (table_ref,))
exists_row = cur.fetchone()
exists = bool(exists_row["table_exists"]) if exists_row is not None else False
row_count = 0
last_event_at = None
if exists:
timestamp_column = str(spec.get("timestamp_column") or "")
query = f"SELECT COUNT(*)::bigint AS row_count"
if timestamp_column:
query += f", MAX({timestamp_column}) AS last_event_at"
else:
query += ", NULL::timestamptz AS last_event_at"
query += f" FROM {schema_name}.{table_name}"
cur.execute(query)
metric_row = cur.fetchone() or {}
row_count = int(metric_row.get("row_count") or 0)
last_event_at = metric_row.get("last_event_at")
table_items.append(
{
"table_ref": table_ref,
"schema": schema_name,
"table_name": table_name,
"label": str(spec.get("label") or table_name),
"domain": str(spec.get("domain") or "other"),
"description": str(spec.get("description") or "세부 보조/원본/운영 테이블"),
"related_views": spec.get("related_views") or [],
"group": DB_STATUS_TABLE_GROUPS.get(table_ref, "주의"),
"exists": exists,
"row_count": row_count,
"last_event_at": last_event_at.isoformat() if last_event_at else None,
}
)
cur.execute(
"""
SELECT source_key, source_name, row_count, source_path, imported_at
FROM integration_import_batches
ORDER BY imported_at DESC, id DESC
"""
)
import_batches = [
{
"source_key": str(row["source_key"] or ""),
"source_name": str(row["source_name"] or ""),
"row_count": int(row["row_count"] or 0),
"source_path": str(row["source_path"] or ""),
"imported_at": row["imported_at"].isoformat() if row.get("imported_at") else None,
}
for row in cur.fetchall()
]
binary_sources: list[dict[str, object]] = []
cur.execute("SELECT to_regclass('public.integration_binary_sources') IS NOT NULL AS table_exists")
binary_exists_row = cur.fetchone()
binary_exists = bool(binary_exists_row["table_exists"]) if binary_exists_row is not None else False
if binary_exists:
cur.execute(
"""
SELECT source_key, source_name, filename, mime_type, OCTET_LENGTH(content) AS byte_size,
content_sha256, imported_at
FROM integration_binary_sources
ORDER BY imported_at DESC, id DESC
"""
)
binary_sources = [
{
"source_key": str(row["source_key"] or ""),
"source_name": str(row["source_name"] or ""),
"filename": str(row["filename"] or ""),
"mime_type": str(row["mime_type"] or ""),
"byte_size": int(row["byte_size"] or 0),
"content_sha256": str(row["content_sha256"] or ""),
"imported_at": row["imported_at"].isoformat() if row.get("imported_at") else None,
}
for row in cur.fetchall()
]
cur.execute(
"""
SELECT COUNT(*)::bigint AS total_members,
COUNT(*) FILTER (
WHERE COALESCE(BTRIM(work_status), '') <> '퇴직'
)::bigint AS active_members
FROM members
"""
)
member_row = cur.fetchone() or {}
cur.execute(
"""
SELECT COUNT(*)::bigint AS active_seat_maps
FROM seat_maps
WHERE is_active = TRUE
"""
)
seat_map_row = cur.fetchone() or {}
cur.execute(
"""
SELECT COUNT(*)::bigint AS fixed_office_maps
FROM seat_maps
WHERE source_type = 'fixed_html'
"""
)
fixed_office_row = cur.fetchone() or {}
overview = {
"visible_tables": len(DB_STATUS_TABLES),
"total_tables": len(table_items),
"existing_tables": sum(1 for item in table_items if item["exists"]),
"registered_members": int(member_row.get("total_members") or 0),
"active_members": int(member_row.get("active_members") or 0),
"active_seat_maps": int(seat_map_row.get("active_seat_maps") or 0),
"fixed_office_maps": int(fixed_office_row.get("fixed_office_maps") or 0),
"import_batches": len(import_batches),
"binary_sources": len(binary_sources),
}
group_summary = {
"유지": [item["table_ref"] for item in table_items if item["group"] == "유지"],
"주의": [item["table_ref"] for item in table_items if item["group"] == "주의"],
"원본·추적": [item["table_ref"] for item in table_items if item["group"] == "원본·추적"],
"정리 후보": [item["table_ref"] for item in table_items if item["group"] == "정리 후보"],
}
product_summary = {
group_name: table_refs
for group_name, table_refs in DB_STATUS_PRODUCT_GROUPS.items()
}
return {
"generated_at": datetime.utcnow().isoformat() + "Z",
"overview": overview,
"tables": table_items,
"import_batches": import_batches,
"binary_sources": binary_sources,
"group_summary": group_summary,
"product_summary": product_summary,
"screen_map": DB_STATUS_SCREEN_MAP,
"notes": [
"members / seat_positions / seat_maps 는 현재 운영 상태를 나타냅니다.",
"member_versions / seat_assignment_versions / history_revisions 는 시점 조회와 변경 이력을 위한 테이블입니다.",
"integration_raw_* / integration_* 는 원본 적재와 표준화 결과를 분리해서 보관합니다.",
"integration_binary_sources 는 사업관리대장 같은 바이너리 원본 보관용입니다.",
"DB를 물리적으로 합치기보다, 화면/권한/이력/로우데이터 관점으로 묶어 보는 것이 현재 운영에 더 적합합니다.",
"재직 인원은 조직현황과 동일하게 work_status 값이 '퇴직'이 아닌 구성원 기준입니다.",
],
}
def fetch_db_table_preview(schema_name: str, table_name: str, limit: int = 50) -> dict[str, object]:
if schema_name not in {"public", "auth"}:
raise HTTPException(status_code=404, detail="Unknown schema.")
with get_conn() as conn:
with conn.cursor() as cur:
cur.execute(
"""
SELECT tablename
FROM pg_tables
WHERE schemaname = %s
AND tablename = %s
""",
(schema_name, table_name),
)
exists_row = cur.fetchone()
if exists_row is None:
raise HTTPException(status_code=404, detail="Unknown table.")
table_ref = f"{schema_name}.{table_name}"
spec = DB_STATUS_TABLE_META.get(table_ref, {})
cur.execute(
"""
SELECT column_name, data_type
FROM information_schema.columns
WHERE table_schema = %s
AND table_name = %s
ORDER BY ordinal_position
""",
(schema_name, table_name),
)
columns = [{"name": str(row["column_name"]), "type": str(row["data_type"])} for row in cur.fetchall()]
cur.execute(f"SELECT COUNT(*)::bigint AS row_count FROM {schema_name}.{table_name}")
row_count = int((cur.fetchone() or {}).get("row_count") or 0)
safe_limit = max(1, min(int(limit), 50))
cur.execute(f"SELECT * FROM {schema_name}.{table_name} LIMIT {safe_limit}")
rows = [make_json_safe(dict(row)) for row in cur.fetchall()]
return {
"table_ref": table_ref,
"schema": schema_name,
"table_name": table_name,
"label": str(spec.get("label") or table_name),
"domain": str(spec.get("domain") or "other"),
"description": str(spec.get("description") or "세부 보조/원본/운영 테이블"),
"related_views": spec.get("related_views") or [],
"row_count": row_count,
"limit": safe_limit,
"columns": columns,
"rows": rows,
}

View File

@@ -0,0 +1,110 @@
from __future__ import annotations
import hashlib
import json
from pathlib import Path
from fastapi import HTTPException
from fastapi.responses import FileResponse, Response
BUSINESS_LEDGER_DEFAULT_SOURCE_KEY = "business_ledger_default"
def sync_default_business_ledger_source(cur, incoming_files_dir: Path, served_dir: Path) -> None:
cur.execute("SELECT to_regclass('public.integration_binary_sources') IS NOT NULL AS table_exists")
row = cur.fetchone()
table_exists = bool(row["table_exists"]) if row is not None else False
if not table_exists:
return
business_dashboard_dir = incoming_files_dir / "사업관리대장"
business_ledger_served_dir = served_dir / "ledger"
candidates = [
business_ledger_served_dir / "사업관리대장-1.xlsx",
business_dashboard_dir / "사업관리대장-1.xlsx",
business_dashboard_dir / "사업관리 대장-1.xlsx",
business_dashboard_dir / "사업관리대장.xlsx",
business_dashboard_dir / "사업관리 대장.xlsx",
]
source_path = next((candidate for candidate in candidates if candidate.exists()), None)
if source_path is None:
return
content = source_path.read_bytes()
content_sha256 = hashlib.sha256(content).hexdigest()
meta_json = {
"byte_size": len(content),
"source_path": str(source_path),
"synced_from": "startup",
}
cur.execute(
"""
INSERT INTO integration_binary_sources (
source_key, source_name, filename, mime_type, content, content_sha256, meta_json, imported_at
)
VALUES (%s, %s, %s, %s, %s, %s, %s::jsonb, NOW())
ON CONFLICT (source_key) DO UPDATE
SET source_name = EXCLUDED.source_name,
filename = EXCLUDED.filename,
mime_type = EXCLUDED.mime_type,
content = EXCLUDED.content,
content_sha256 = EXCLUDED.content_sha256,
meta_json = EXCLUDED.meta_json,
imported_at = NOW()
WHERE integration_binary_sources.content_sha256 IS DISTINCT FROM EXCLUDED.content_sha256
OR integration_binary_sources.filename IS DISTINCT FROM EXCLUDED.filename
OR integration_binary_sources.mime_type IS DISTINCT FROM EXCLUDED.mime_type
OR integration_binary_sources.meta_json IS DISTINCT FROM EXCLUDED.meta_json
""",
(
BUSINESS_LEDGER_DEFAULT_SOURCE_KEY,
"사업관리대장 기본 원본",
source_path.name,
"application/vnd.openxmlformats-officedocument.spreadsheetml.sheet",
content,
content_sha256,
json.dumps(meta_json, ensure_ascii=False),
),
)
def build_business_ledger_default_response(cur) -> Response:
cur.execute(
"""
SELECT filename, mime_type, content
FROM integration_binary_sources
WHERE source_key = %s
ORDER BY imported_at DESC
LIMIT 1
""",
(BUSINESS_LEDGER_DEFAULT_SOURCE_KEY,),
)
row = cur.fetchone()
if not row:
raise HTTPException(status_code=404, detail="Business ledger default source not found.")
filename = str(row["filename"] or "사업관리대장-1.xlsx")
headers = {
"Content-Disposition": 'inline; filename="business-ledger-default.xlsx"',
"X-Source-Filename": "business-ledger-default.xlsx",
"X-Original-Filename": filename,
"Cache-Control": "no-store, no-cache, must-revalidate, max-age=0",
"Pragma": "no-cache",
}
return Response(
content=bytes(row["content"]),
media_type=str(
row["mime_type"] or "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet"
),
headers=headers,
)
def build_ledger_index_response(ledger_index_path: Path) -> FileResponse:
if not ledger_index_path.exists():
raise HTTPException(status_code=404, detail="Business ledger integration file not found.")
response = FileResponse(ledger_index_path)
response.headers["Cache-Control"] = "no-store, no-cache, must-revalidate, max-age=0"
response.headers["Pragma"] = "no-cache"
return response

View File

@@ -26,8 +26,14 @@ from fastapi.staticfiles import StaticFiles
from openpyxl import load_workbook
from pydantic import BaseModel, Field
from .admin_db_status import fetch_db_status_snapshot, fetch_db_table_preview
from .config import BASE_DIR, LEGACY_DIR, MOCK_LOGIN_ENABLED, UPLOAD_DIR
from .db import get_conn, init_db
from .ledger_runtime import (
build_business_ledger_default_response,
build_ledger_index_response,
sync_default_business_ledger_source,
)
app = FastAPI(title="MH Dashboard Organization API")
@@ -45,7 +51,6 @@ INCOMING_FILES_DIR = BASE_DIR / "incoming-files"
INCOMING_SERVED_DIR = INCOMING_FILES_DIR / "served"
INCOMING_REFERENCE_DIR = INCOMING_FILES_DIR / "reference"
DB_STATUS_SERVED_DIR = INCOMING_SERVED_DIR / "db-status"
BUSINESS_DASHBOARD_DIR = INCOMING_FILES_DIR / "사업관리대장"
BUSINESS_LEDGER_SERVED_DIR = INCOMING_SERVED_DIR / "ledger"
BUSINESS_LEDGER_INDEX_PATH = BUSINESS_LEDGER_SERVED_DIR / "index.html"
FIXED_OFFICE_SOURCE_KEY = "technical-development-center"
@@ -67,7 +72,6 @@ FIXED_OFFICE_CONFIGS = {
},
}
_fixed_office_cache: dict[str, dict[str, object]] = {}
BUSINESS_LEDGER_DEFAULT_SOURCE_KEY = "business_ledger_default"
AUTH_DEFAULT_PASSWORD = "1111"
AUTH_PASSWORD_ITERATIONS = 390000
AUTH_SESSION_HOURS = 12
@@ -89,449 +93,6 @@ MH_HEADER_ORDER = [
"사업 종류", "연장근무 프로젝트 코드", "연장근무 프로젝트명", "연장근무 서브코드", "연장근무 시간(실제)", "연장근무 시간(가공)"
]
DB_STATUS_TABLES = [
{
"table_ref": "public.members",
"label": "구성원 마스터",
"domain": "organization",
"timestamp_column": "updated_at",
"related_views": ["조직 현황", "자리배치도"],
"description": "조직/구성원 화면의 기준이 되는 현재 인원 마스터",
},
{
"table_ref": "public.member_versions",
"label": "구성원 이력",
"domain": "history",
"timestamp_column": "created_at",
"related_views": ["조직 현황", "이력 비교"],
"description": "as-of 조회와 변경 이력을 위한 시점 버전",
},
{
"table_ref": "public.seat_maps",
"label": "자리배치도 도면",
"domain": "seatmap",
"timestamp_column": "updated_at",
"related_views": ["자리배치도"],
"description": "오피스별 도면 메타데이터와 활성 상태",
},
{
"table_ref": "public.seat_positions",
"label": "현재 좌석 배치",
"domain": "seatmap",
"timestamp_column": "updated_at",
"related_views": ["자리배치도"],
"description": "현재 인원의 실제 배치 좌표/슬롯 연결",
},
{
"table_ref": "public.seat_assignment_versions",
"label": "좌석 배치 이력",
"domain": "history",
"timestamp_column": "created_at",
"related_views": ["자리배치도", "이력 비교"],
"description": "자리 이동 이력과 시점 조회용 배치 버전",
},
{
"table_ref": "public.integration_import_batches",
"label": "원본 업로드 배치",
"domain": "integration",
"timestamp_column": "imported_at",
"related_views": ["프로젝트별 분석", "팀/개인별 분석", "조직 현황"],
"description": "원본 파일 적재 단위와 최근 import 기록",
},
{
"table_ref": "public.integration_projects",
"label": "통합 프로젝트 표준화",
"domain": "integration",
"timestamp_column": "updated_at",
"related_views": ["프로젝트별 분석", "팀/개인별 분석"],
"description": "프로젝트 코드/이름/카테고리 정규화 결과",
},
{
"table_ref": "public.integration_work_logs",
"label": "근무 로그 표준화",
"domain": "integration",
"timestamp_column": "updated_at",
"related_views": ["팀/개인별 분석"],
"description": "MH workbook 기준 일자별 근무 로그 본체",
},
{
"table_ref": "public.integration_work_log_segments",
"label": "근무 로그 세그먼트",
"domain": "integration",
"timestamp_column": "created_at",
"related_views": ["팀/개인별 분석"],
"description": "근무 로그를 프로젝트/활동 기준으로 분해한 상세 세그먼트",
},
{
"table_ref": "public.integration_vouchers",
"label": "전표 표준화",
"domain": "integration",
"timestamp_column": "created_at",
"related_views": ["프로젝트별 분석"],
"description": "payment CSV 기준 프로젝트별 수입/지출 전표",
},
{
"table_ref": "public.integration_binary_sources",
"label": "바이너리 원본 보관",
"domain": "integration",
"timestamp_column": "imported_at",
"related_views": ["사업관리대장"],
"description": "엑셀/바이너리 원본을 DB에 보관하는 저장소",
},
{
"table_ref": "auth.users",
"label": "인증 사용자",
"domain": "auth",
"timestamp_column": "updated_at",
"related_views": ["로그인", "권한"],
"description": "로그인 계정, role, 활성 상태",
},
{
"table_ref": "auth.sessions",
"label": "인증 세션",
"domain": "auth",
"timestamp_column": "created_at",
"related_views": ["로그인", "권한"],
"description": "현재/과거 로그인 세션과 만료 상태",
},
{
"table_ref": "auth.login_audit_logs",
"label": "로그인 감사 로그",
"domain": "auth",
"timestamp_column": "created_at",
"related_views": ["로그인", "권한"],
"description": "로그인 성공/실패 기록",
},
]
DB_STATUS_TABLE_META = {str(item["table_ref"]): item for item in DB_STATUS_TABLES}
DB_STATUS_TABLE_GROUPS = {
"public.members": "유지",
"public.member_versions": "유지",
"public.seat_maps": "유지",
"public.seat_positions": "유지",
"public.seat_slots": "유지",
"public.seat_assignment_versions": "유지",
"public.history_revisions": "유지",
"public.integration_import_batches": "유지",
"public.integration_projects": "유지",
"public.integration_work_logs": "유지",
"public.integration_work_log_segments": "유지",
"public.integration_vouchers": "유지",
"public.integration_binary_sources": "유지",
"auth.users": "유지",
"auth.sessions": "유지",
"auth.login_audit_logs": "유지",
"public.member_overrides": "주의",
"public.member_retirements": "주의",
"public.member_aliases": "주의",
"public.integration_project_aliases": "주의",
"public.integration_project_category_mappings": "주의",
"public.integration_project_pm_assignments": "주의",
"public.integration_raw_organization_rows": "원본·추적",
"public.integration_raw_mh_rows": "원본·추적",
"public.integration_raw_mh_pm_rows": "원본·추적",
"public.integration_raw_payment_rows": "원본·추적",
}
def sync_default_business_ledger_source(cur) -> None:
cur.execute("SELECT to_regclass('public.integration_binary_sources') IS NOT NULL AS table_exists")
row = cur.fetchone()
table_exists = bool(row["table_exists"]) if row is not None else False
if not table_exists:
return
candidates = [
BUSINESS_LEDGER_SERVED_DIR / "사업관리대장-1.xlsx",
BUSINESS_DASHBOARD_DIR / "사업관리대장-1.xlsx",
BUSINESS_DASHBOARD_DIR / "사업관리 대장-1.xlsx",
BUSINESS_DASHBOARD_DIR / "사업관리대장.xlsx",
BUSINESS_DASHBOARD_DIR / "사업관리 대장.xlsx",
]
source_path = next((candidate for candidate in candidates if candidate.exists()), None)
if source_path is None:
return
content = source_path.read_bytes()
content_sha256 = hashlib.sha256(content).hexdigest()
meta_json = {
"byte_size": len(content),
"source_path": str(source_path),
"synced_from": "startup",
}
cur.execute(
"""
INSERT INTO integration_binary_sources (
source_key, source_name, filename, mime_type, content, content_sha256, meta_json, imported_at
)
VALUES (%s, %s, %s, %s, %s, %s, %s::jsonb, NOW())
ON CONFLICT (source_key) DO UPDATE
SET source_name = EXCLUDED.source_name,
filename = EXCLUDED.filename,
mime_type = EXCLUDED.mime_type,
content = EXCLUDED.content,
content_sha256 = EXCLUDED.content_sha256,
meta_json = EXCLUDED.meta_json,
imported_at = NOW()
WHERE integration_binary_sources.content_sha256 IS DISTINCT FROM EXCLUDED.content_sha256
OR integration_binary_sources.filename IS DISTINCT FROM EXCLUDED.filename
OR integration_binary_sources.mime_type IS DISTINCT FROM EXCLUDED.mime_type
OR integration_binary_sources.meta_json IS DISTINCT FROM EXCLUDED.meta_json
""",
(
BUSINESS_LEDGER_DEFAULT_SOURCE_KEY,
"사업관리대장 기본 원본",
source_path.name,
"application/vnd.openxmlformats-officedocument.spreadsheetml.sheet",
content,
content_sha256,
json.dumps(meta_json, ensure_ascii=False),
),
)
def make_json_safe(value: object) -> object:
if isinstance(value, datetime):
return value.isoformat()
if isinstance(value, date):
return value.isoformat()
if isinstance(value, Decimal):
return float(value)
if isinstance(value, bytes):
return f"<{len(value)} bytes>"
if isinstance(value, dict):
return {str(key): make_json_safe(val) for key, val in value.items()}
if isinstance(value, list):
return [make_json_safe(item) for item in value]
return value
def fetch_db_status_snapshot() -> dict[str, object]:
table_items: list[dict[str, object]] = []
with get_conn() as conn:
with conn.cursor() as cur:
cur.execute(
"""
SELECT schemaname, tablename
FROM pg_tables
WHERE schemaname IN ('public', 'auth')
ORDER BY schemaname, tablename
"""
)
all_tables = cur.fetchall()
for row in all_tables:
schema_name = str(row["schemaname"])
table_name = str(row["tablename"])
table_ref = f"{schema_name}.{table_name}"
spec = DB_STATUS_TABLE_META.get(table_ref, {})
cur.execute("SELECT to_regclass(%s) IS NOT NULL AS table_exists", (table_ref,))
exists_row = cur.fetchone()
exists = bool(exists_row["table_exists"]) if exists_row is not None else False
row_count = 0
last_event_at = None
if exists:
timestamp_column = str(spec.get("timestamp_column") or "")
query = f"SELECT COUNT(*)::bigint AS row_count"
if timestamp_column:
query += f", MAX({timestamp_column}) AS last_event_at"
else:
query += ", NULL::timestamptz AS last_event_at"
query += f" FROM {schema_name}.{table_name}"
cur.execute(query)
metric_row = cur.fetchone() or {}
row_count = int(metric_row.get("row_count") or 0)
last_event_at = metric_row.get("last_event_at")
table_items.append(
{
"table_ref": table_ref,
"schema": schema_name,
"table_name": table_name,
"label": str(spec.get("label") or table_name),
"domain": str(spec.get("domain") or "other"),
"description": str(spec.get("description") or "세부 보조/원본/운영 테이블"),
"related_views": spec.get("related_views") or [],
"group": DB_STATUS_TABLE_GROUPS.get(table_ref, "주의"),
"exists": exists,
"row_count": row_count,
"last_event_at": last_event_at.isoformat() if last_event_at else None,
}
)
cur.execute(
"""
SELECT source_key, source_name, row_count, source_path, imported_at
FROM integration_import_batches
ORDER BY imported_at DESC, id DESC
"""
)
import_batches = [
{
"source_key": str(row["source_key"] or ""),
"source_name": str(row["source_name"] or ""),
"row_count": int(row["row_count"] or 0),
"source_path": str(row["source_path"] or ""),
"imported_at": row["imported_at"].isoformat() if row.get("imported_at") else None,
}
for row in cur.fetchall()
]
binary_sources: list[dict[str, object]] = []
cur.execute("SELECT to_regclass('public.integration_binary_sources') IS NOT NULL AS table_exists")
binary_exists_row = cur.fetchone()
binary_exists = bool(binary_exists_row["table_exists"]) if binary_exists_row is not None else False
if binary_exists:
cur.execute(
"""
SELECT source_key, source_name, filename, mime_type, OCTET_LENGTH(content) AS byte_size,
content_sha256, imported_at
FROM integration_binary_sources
ORDER BY imported_at DESC, id DESC
"""
)
binary_sources = [
{
"source_key": str(row["source_key"] or ""),
"source_name": str(row["source_name"] or ""),
"filename": str(row["filename"] or ""),
"mime_type": str(row["mime_type"] or ""),
"byte_size": int(row["byte_size"] or 0),
"content_sha256": str(row["content_sha256"] or ""),
"imported_at": row["imported_at"].isoformat() if row.get("imported_at") else None,
}
for row in cur.fetchall()
]
cur.execute(
"""
SELECT COUNT(*)::bigint AS total_members,
COUNT(*) FILTER (
WHERE COALESCE(BTRIM(work_status), '') NOT IN ('퇴직', '휴직')
)::bigint AS active_members
FROM members
"""
)
member_row = cur.fetchone() or {}
cur.execute(
"""
SELECT COUNT(*)::bigint AS active_seat_maps
FROM seat_maps
WHERE is_active = TRUE
"""
)
seat_map_row = cur.fetchone() or {}
cur.execute(
"""
SELECT COUNT(*)::bigint AS fixed_office_maps
FROM seat_maps
WHERE source_type = 'fixed_html'
"""
)
fixed_office_row = cur.fetchone() or {}
overview = {
"visible_tables": len(DB_STATUS_TABLES),
"total_tables": len(table_items),
"existing_tables": sum(1 for item in table_items if item["exists"]),
"registered_members": int(member_row.get("total_members") or 0),
"active_members": int(member_row.get("active_members") or 0),
"active_seat_maps": int(seat_map_row.get("active_seat_maps") or 0),
"fixed_office_maps": int(fixed_office_row.get("fixed_office_maps") or 0),
"import_batches": len(import_batches),
"binary_sources": len(binary_sources),
}
group_summary = {
"유지": [item["table_ref"] for item in table_items if item["group"] == "유지"],
"주의": [item["table_ref"] for item in table_items if item["group"] == "주의"],
"원본·추적": [item["table_ref"] for item in table_items if item["group"] == "원본·추적"],
"정리 후보": [item["table_ref"] for item in table_items if item["group"] == "정리 후보"],
}
return {
"generated_at": datetime.utcnow().isoformat() + "Z",
"overview": overview,
"tables": table_items,
"import_batches": import_batches,
"binary_sources": binary_sources,
"group_summary": group_summary,
"notes": [
"members / seat_positions / seat_maps 는 현재 운영 상태를 나타냅니다.",
"member_versions / seat_assignment_versions / history_revisions 는 시점 조회와 변경 이력을 위한 테이블입니다.",
"integration_raw_* / integration_* 는 원본 적재와 표준화 결과를 분리해서 보관합니다.",
"integration_binary_sources 는 사업관리대장 같은 바이너리 원본 보관용입니다.",
"재직 인원은 work_status 값이 '퇴직' 또는 '휴직'이 아닌 구성원 기준입니다.",
],
}
def fetch_db_table_preview(schema_name: str, table_name: str, limit: int = 50) -> dict[str, object]:
if schema_name not in {"public", "auth"}:
raise HTTPException(status_code=404, detail="Unknown schema.")
with get_conn() as conn:
with conn.cursor() as cur:
cur.execute(
"""
SELECT tablename
FROM pg_tables
WHERE schemaname = %s
AND tablename = %s
""",
(schema_name, table_name),
)
exists_row = cur.fetchone()
if exists_row is None:
raise HTTPException(status_code=404, detail="Unknown table.")
table_ref = f"{schema_name}.{table_name}"
spec = DB_STATUS_TABLE_META.get(table_ref, {})
cur.execute(
"""
SELECT column_name, data_type
FROM information_schema.columns
WHERE table_schema = %s
AND table_name = %s
ORDER BY ordinal_position
""",
(schema_name, table_name),
)
columns = [{"name": str(row["column_name"]), "type": str(row["data_type"])} for row in cur.fetchall()]
cur.execute(f"SELECT COUNT(*)::bigint AS row_count FROM {schema_name}.{table_name}")
row_count = int((cur.fetchone() or {}).get("row_count") or 0)
safe_limit = max(1, min(int(limit), 50))
cur.execute(f"SELECT * FROM {schema_name}.{table_name} LIMIT {safe_limit}")
rows = [make_json_safe(dict(row)) for row in cur.fetchall()]
return {
"table_ref": table_ref,
"schema": schema_name,
"table_name": table_name,
"label": str(spec.get("label") or table_name),
"domain": str(spec.get("domain") or "other"),
"description": str(spec.get("description") or "세부 보조/원본/운영 테이블"),
"related_views": spec.get("related_views") or [],
"row_count": row_count,
"limit": safe_limit,
"columns": columns,
"rows": rows,
}
return {
"generated_at": datetime.utcnow().isoformat() + "Z",
"overview": overview,
"tables": table_items,
"import_batches": import_batches,
"binary_sources": binary_sources,
"notes": [
"members / seat_positions / seat_maps 는 현재 운영 상태를 나타냅니다.",
"member_versions / seat_assignment_versions / history_revisions 는 시점 조회와 변경 이력을 위한 테이블입니다.",
"integration_raw_* / integration_* 는 원본 적재와 표준화 결과를 분리해서 보관합니다.",
"integration_binary_sources 는 사업관리대장 같은 바이너리 원본 보관용입니다.",
"재직 인원은 work_status 값이 '퇴직' 또는 '휴직'이 아닌 구성원 기준입니다.",
],
}
app.mount(
"/integrations/ledger-assets",
StaticFiles(directory=str(BUSINESS_LEDGER_SERVED_DIR), check_dir=False),
@@ -4366,7 +3927,7 @@ def startup() -> None:
init_db()
with get_conn() as conn:
with conn.cursor() as cur:
sync_default_business_ledger_source(cur)
sync_default_business_ledger_source(cur, INCOMING_FILES_DIR, INCOMING_SERVED_DIR)
sync_auth_users_from_members(cur)
conn.commit()
@@ -4421,31 +3982,7 @@ def admin_db_status_view() -> FileResponse:
def integration_business_ledger_default() -> Response:
with get_conn() as conn:
with conn.cursor() as cur:
cur.execute(
"""
SELECT filename, mime_type, content
FROM integration_binary_sources
WHERE source_key = %s
ORDER BY imported_at DESC
LIMIT 1
""",
(BUSINESS_LEDGER_DEFAULT_SOURCE_KEY,),
)
row = cur.fetchone()
if not row:
raise HTTPException(status_code=404, detail="Business ledger default source not found.")
filename = str(row["filename"] or "사업관리대장-1.xlsx")
headers = {
"Content-Disposition": 'inline; filename="business-ledger-default.xlsx"',
"X-Source-Filename": "business-ledger-default.xlsx",
"Cache-Control": "no-store, no-cache, must-revalidate, max-age=0",
"Pragma": "no-cache",
}
return Response(
content=bytes(row["content"]),
media_type=str(row["mime_type"] or "application/octet-stream"),
headers=headers,
)
return build_business_ledger_default_response(cur)
@app.post("/api/auth/login")
@@ -5020,13 +4557,7 @@ def integration_payment() -> FileResponse:
def integration_ledger() -> FileResponse:
# #21 phase-1: runtime no longer decodes reference wrapper HTML. Serve the promoted
# ledger entry file from incoming-files/served/ledger only.
target = BUSINESS_LEDGER_INDEX_PATH
if not target.exists():
raise HTTPException(status_code=404, detail="Business ledger integration file not found.")
response = FileResponse(target)
response.headers["Cache-Control"] = "no-store, no-cache, must-revalidate, max-age=0"
response.headers["Pragma"] = "no-cache"
return response
return build_ledger_index_response(BUSINESS_LEDGER_INDEX_PATH)
@app.get("/integrations/mh")