feat: add db status viewer and db cleanup baseline

This commit is contained in:
hyunho
2026-04-01 15:28:11 +09:00
parent e58e584a15
commit 1e82572e15
14 changed files with 2828 additions and 27 deletions

View File

@@ -281,6 +281,19 @@ CREATE TABLE IF NOT EXISTS integration_vouchers (
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);
CREATE TABLE IF NOT EXISTS integration_binary_sources (
id BIGSERIAL PRIMARY KEY,
source_key TEXT NOT NULL UNIQUE,
source_name TEXT NOT NULL,
filename TEXT NOT NULL DEFAULT '',
mime_type TEXT NOT NULL DEFAULT 'application/octet-stream',
content BYTEA NOT NULL,
content_sha256 TEXT NOT NULL DEFAULT '',
meta_json JSONB NOT NULL DEFAULT '{}'::jsonb,
imported_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);
CREATE TABLE IF NOT EXISTS history_revisions (
id BIGSERIAL PRIMARY KEY,
scope TEXT NOT NULL DEFAULT 'organization',
@@ -329,18 +342,6 @@ CREATE TABLE IF NOT EXISTS seat_assignment_versions (
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);
CREATE TABLE IF NOT EXISTS entity_change_events (
id BIGSERIAL PRIMARY KEY,
entity_type TEXT NOT NULL,
entity_id BIGINT NOT NULL,
action_type TEXT NOT NULL,
revision_no BIGINT NOT NULL,
changed_by_user_id BIGINT,
changed_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
change_reason TEXT NOT NULL DEFAULT '',
patch_json JSONB NOT NULL DEFAULT '{}'::jsonb
);
CREATE SCHEMA IF NOT EXISTS auth;
CREATE TABLE IF NOT EXISTS auth.users (
@@ -534,6 +535,9 @@ ON integration_vouchers (project_code, project_name);
CREATE UNIQUE INDEX IF NOT EXISTS integration_project_category_mappings_key_idx
ON integration_project_category_mappings (source_key, normalized_project_key);
CREATE UNIQUE INDEX IF NOT EXISTS integration_binary_sources_source_key_idx
ON integration_binary_sources (source_key);
CREATE INDEX IF NOT EXISTS member_versions_member_time_idx
ON member_versions (member_id, valid_from, valid_to);
@@ -543,9 +547,6 @@ ON seat_assignment_versions (member_id, valid_from, valid_to);
CREATE INDEX IF NOT EXISTS history_revisions_scope_created_idx
ON history_revisions (scope, created_at DESC);
CREATE INDEX IF NOT EXISTS entity_change_events_entity_idx
ON entity_change_events (entity_type, entity_id, changed_at DESC);
DO $$
BEGIN
IF NOT EXISTS (
@@ -611,6 +612,9 @@ ALTER TABLE auth.login_audit_logs ADD COLUMN IF NOT EXISTS user_id BIGINT NULL R
ALTER TABLE auth.login_audit_logs ADD COLUMN IF NOT EXISTS failure_reason TEXT;
ALTER TABLE auth.login_audit_logs ADD COLUMN IF NOT EXISTS ip_address INET;
ALTER TABLE auth.login_audit_logs ADD COLUMN IF NOT EXISTS user_agent TEXT;
DROP INDEX IF EXISTS entity_change_events_entity_idx;
DROP TABLE IF EXISTS entity_change_events;
"""

View File

@@ -44,6 +44,7 @@ LEGACY_STATIC_DIR = LEGACY_DIR / "static"
INCOMING_FILES_DIR = BASE_DIR / "incoming-files"
INCOMING_SERVED_DIR = INCOMING_FILES_DIR / "served"
INCOMING_REFERENCE_DIR = INCOMING_FILES_DIR / "reference"
DB_STATUS_SERVED_DIR = INCOMING_SERVED_DIR / "db-status"
BUSINESS_DASHBOARD_DIR = INCOMING_FILES_DIR / "사업관리대장"
BUSINESS_LEDGER_SERVED_DIR = INCOMING_SERVED_DIR / "ledger"
BUSINESS_LEDGER_INDEX_PATH = BUSINESS_LEDGER_SERVED_DIR / "index.html"
@@ -88,7 +89,157 @@ MH_HEADER_ORDER = [
"사업 종류", "연장근무 프로젝트 코드", "연장근무 프로젝트명", "연장근무 서브코드", "연장근무 시간(실제)", "연장근무 시간(가공)"
]
DB_STATUS_TABLES = [
{
"table_ref": "public.members",
"label": "구성원 마스터",
"domain": "organization",
"timestamp_column": "updated_at",
"related_views": ["조직 현황", "자리배치도"],
"description": "조직/구성원 화면의 기준이 되는 현재 인원 마스터",
},
{
"table_ref": "public.member_versions",
"label": "구성원 이력",
"domain": "history",
"timestamp_column": "created_at",
"related_views": ["조직 현황", "이력 비교"],
"description": "as-of 조회와 변경 이력을 위한 시점 버전",
},
{
"table_ref": "public.seat_maps",
"label": "자리배치도 도면",
"domain": "seatmap",
"timestamp_column": "updated_at",
"related_views": ["자리배치도"],
"description": "오피스별 도면 메타데이터와 활성 상태",
},
{
"table_ref": "public.seat_positions",
"label": "현재 좌석 배치",
"domain": "seatmap",
"timestamp_column": "updated_at",
"related_views": ["자리배치도"],
"description": "현재 인원의 실제 배치 좌표/슬롯 연결",
},
{
"table_ref": "public.seat_assignment_versions",
"label": "좌석 배치 이력",
"domain": "history",
"timestamp_column": "created_at",
"related_views": ["자리배치도", "이력 비교"],
"description": "자리 이동 이력과 시점 조회용 배치 버전",
},
{
"table_ref": "public.integration_import_batches",
"label": "원본 업로드 배치",
"domain": "integration",
"timestamp_column": "imported_at",
"related_views": ["프로젝트별 분석", "팀/개인별 분석", "조직 현황"],
"description": "원본 파일 적재 단위와 최근 import 기록",
},
{
"table_ref": "public.integration_projects",
"label": "통합 프로젝트 표준화",
"domain": "integration",
"timestamp_column": "updated_at",
"related_views": ["프로젝트별 분석", "팀/개인별 분석"],
"description": "프로젝트 코드/이름/카테고리 정규화 결과",
},
{
"table_ref": "public.integration_work_logs",
"label": "근무 로그 표준화",
"domain": "integration",
"timestamp_column": "updated_at",
"related_views": ["팀/개인별 분석"],
"description": "MH workbook 기준 일자별 근무 로그 본체",
},
{
"table_ref": "public.integration_work_log_segments",
"label": "근무 로그 세그먼트",
"domain": "integration",
"timestamp_column": "created_at",
"related_views": ["팀/개인별 분석"],
"description": "근무 로그를 프로젝트/활동 기준으로 분해한 상세 세그먼트",
},
{
"table_ref": "public.integration_vouchers",
"label": "전표 표준화",
"domain": "integration",
"timestamp_column": "created_at",
"related_views": ["프로젝트별 분석"],
"description": "payment CSV 기준 프로젝트별 수입/지출 전표",
},
{
"table_ref": "public.integration_binary_sources",
"label": "바이너리 원본 보관",
"domain": "integration",
"timestamp_column": "imported_at",
"related_views": ["사업관리대장"],
"description": "엑셀/바이너리 원본을 DB에 보관하는 저장소",
},
{
"table_ref": "auth.users",
"label": "인증 사용자",
"domain": "auth",
"timestamp_column": "updated_at",
"related_views": ["로그인", "권한"],
"description": "로그인 계정, role, 활성 상태",
},
{
"table_ref": "auth.sessions",
"label": "인증 세션",
"domain": "auth",
"timestamp_column": "created_at",
"related_views": ["로그인", "권한"],
"description": "현재/과거 로그인 세션과 만료 상태",
},
{
"table_ref": "auth.login_audit_logs",
"label": "로그인 감사 로그",
"domain": "auth",
"timestamp_column": "created_at",
"related_views": ["로그인", "권한"],
"description": "로그인 성공/실패 기록",
},
]
DB_STATUS_TABLE_META = {str(item["table_ref"]): item for item in DB_STATUS_TABLES}
DB_STATUS_TABLE_GROUPS = {
"public.members": "유지",
"public.member_versions": "유지",
"public.seat_maps": "유지",
"public.seat_positions": "유지",
"public.seat_slots": "유지",
"public.seat_assignment_versions": "유지",
"public.history_revisions": "유지",
"public.integration_import_batches": "유지",
"public.integration_projects": "유지",
"public.integration_work_logs": "유지",
"public.integration_work_log_segments": "유지",
"public.integration_vouchers": "유지",
"public.integration_binary_sources": "유지",
"auth.users": "유지",
"auth.sessions": "유지",
"auth.login_audit_logs": "유지",
"public.member_overrides": "주의",
"public.member_retirements": "주의",
"public.member_aliases": "주의",
"public.integration_project_aliases": "주의",
"public.integration_project_category_mappings": "주의",
"public.integration_project_pm_assignments": "주의",
"public.integration_raw_organization_rows": "원본·추적",
"public.integration_raw_mh_rows": "원본·추적",
"public.integration_raw_mh_pm_rows": "원본·추적",
"public.integration_raw_payment_rows": "원본·추적",
}
def sync_default_business_ledger_source(cur) -> None:
cur.execute("SELECT to_regclass('public.integration_binary_sources') IS NOT NULL AS table_exists")
row = cur.fetchone()
table_exists = bool(row["table_exists"]) if row is not None else False
if not table_exists:
return
candidates = [
BUSINESS_LEDGER_SERVED_DIR / "사업관리대장-1.xlsx",
BUSINESS_DASHBOARD_DIR / "사업관리대장-1.xlsx",
@@ -137,6 +288,250 @@ def sync_default_business_ledger_source(cur) -> None:
)
def make_json_safe(value: object) -> object:
if isinstance(value, datetime):
return value.isoformat()
if isinstance(value, date):
return value.isoformat()
if isinstance(value, Decimal):
return float(value)
if isinstance(value, bytes):
return f"<{len(value)} bytes>"
if isinstance(value, dict):
return {str(key): make_json_safe(val) for key, val in value.items()}
if isinstance(value, list):
return [make_json_safe(item) for item in value]
return value
def fetch_db_status_snapshot() -> dict[str, object]:
table_items: list[dict[str, object]] = []
with get_conn() as conn:
with conn.cursor() as cur:
cur.execute(
"""
SELECT schemaname, tablename
FROM pg_tables
WHERE schemaname IN ('public', 'auth')
ORDER BY schemaname, tablename
"""
)
all_tables = cur.fetchall()
for row in all_tables:
schema_name = str(row["schemaname"])
table_name = str(row["tablename"])
table_ref = f"{schema_name}.{table_name}"
spec = DB_STATUS_TABLE_META.get(table_ref, {})
cur.execute("SELECT to_regclass(%s) IS NOT NULL AS table_exists", (table_ref,))
exists_row = cur.fetchone()
exists = bool(exists_row["table_exists"]) if exists_row is not None else False
row_count = 0
last_event_at = None
if exists:
timestamp_column = str(spec.get("timestamp_column") or "")
query = f"SELECT COUNT(*)::bigint AS row_count"
if timestamp_column:
query += f", MAX({timestamp_column}) AS last_event_at"
else:
query += ", NULL::timestamptz AS last_event_at"
query += f" FROM {schema_name}.{table_name}"
cur.execute(query)
metric_row = cur.fetchone() or {}
row_count = int(metric_row.get("row_count") or 0)
last_event_at = metric_row.get("last_event_at")
table_items.append(
{
"table_ref": table_ref,
"schema": schema_name,
"table_name": table_name,
"label": str(spec.get("label") or table_name),
"domain": str(spec.get("domain") or "other"),
"description": str(spec.get("description") or "세부 보조/원본/운영 테이블"),
"related_views": spec.get("related_views") or [],
"group": DB_STATUS_TABLE_GROUPS.get(table_ref, "주의"),
"exists": exists,
"row_count": row_count,
"last_event_at": last_event_at.isoformat() if last_event_at else None,
}
)
cur.execute(
"""
SELECT source_key, source_name, row_count, source_path, imported_at
FROM integration_import_batches
ORDER BY imported_at DESC, id DESC
"""
)
import_batches = [
{
"source_key": str(row["source_key"] or ""),
"source_name": str(row["source_name"] or ""),
"row_count": int(row["row_count"] or 0),
"source_path": str(row["source_path"] or ""),
"imported_at": row["imported_at"].isoformat() if row.get("imported_at") else None,
}
for row in cur.fetchall()
]
binary_sources: list[dict[str, object]] = []
cur.execute("SELECT to_regclass('public.integration_binary_sources') IS NOT NULL AS table_exists")
binary_exists_row = cur.fetchone()
binary_exists = bool(binary_exists_row["table_exists"]) if binary_exists_row is not None else False
if binary_exists:
cur.execute(
"""
SELECT source_key, source_name, filename, mime_type, OCTET_LENGTH(content) AS byte_size,
content_sha256, imported_at
FROM integration_binary_sources
ORDER BY imported_at DESC, id DESC
"""
)
binary_sources = [
{
"source_key": str(row["source_key"] or ""),
"source_name": str(row["source_name"] or ""),
"filename": str(row["filename"] or ""),
"mime_type": str(row["mime_type"] or ""),
"byte_size": int(row["byte_size"] or 0),
"content_sha256": str(row["content_sha256"] or ""),
"imported_at": row["imported_at"].isoformat() if row.get("imported_at") else None,
}
for row in cur.fetchall()
]
cur.execute(
"""
SELECT COUNT(*)::bigint AS total_members,
COUNT(*) FILTER (
WHERE COALESCE(BTRIM(work_status), '') NOT IN ('퇴직', '휴직')
)::bigint AS active_members
FROM members
"""
)
member_row = cur.fetchone() or {}
cur.execute(
"""
SELECT COUNT(*)::bigint AS active_seat_maps
FROM seat_maps
WHERE is_active = TRUE
"""
)
seat_map_row = cur.fetchone() or {}
cur.execute(
"""
SELECT COUNT(*)::bigint AS fixed_office_maps
FROM seat_maps
WHERE source_type = 'fixed_html'
"""
)
fixed_office_row = cur.fetchone() or {}
overview = {
"visible_tables": len(DB_STATUS_TABLES),
"total_tables": len(table_items),
"existing_tables": sum(1 for item in table_items if item["exists"]),
"registered_members": int(member_row.get("total_members") or 0),
"active_members": int(member_row.get("active_members") or 0),
"active_seat_maps": int(seat_map_row.get("active_seat_maps") or 0),
"fixed_office_maps": int(fixed_office_row.get("fixed_office_maps") or 0),
"import_batches": len(import_batches),
"binary_sources": len(binary_sources),
}
group_summary = {
"유지": [item["table_ref"] for item in table_items if item["group"] == "유지"],
"주의": [item["table_ref"] for item in table_items if item["group"] == "주의"],
"원본·추적": [item["table_ref"] for item in table_items if item["group"] == "원본·추적"],
"정리 후보": [item["table_ref"] for item in table_items if item["group"] == "정리 후보"],
}
return {
"generated_at": datetime.utcnow().isoformat() + "Z",
"overview": overview,
"tables": table_items,
"import_batches": import_batches,
"binary_sources": binary_sources,
"group_summary": group_summary,
"notes": [
"members / seat_positions / seat_maps 는 현재 운영 상태를 나타냅니다.",
"member_versions / seat_assignment_versions / history_revisions 는 시점 조회와 변경 이력을 위한 테이블입니다.",
"integration_raw_* / integration_* 는 원본 적재와 표준화 결과를 분리해서 보관합니다.",
"integration_binary_sources 는 사업관리대장 같은 바이너리 원본 보관용입니다.",
"재직 인원은 work_status 값이 '퇴직' 또는 '휴직'이 아닌 구성원 기준입니다.",
],
}
def fetch_db_table_preview(schema_name: str, table_name: str, limit: int = 50) -> dict[str, object]:
if schema_name not in {"public", "auth"}:
raise HTTPException(status_code=404, detail="Unknown schema.")
with get_conn() as conn:
with conn.cursor() as cur:
cur.execute(
"""
SELECT tablename
FROM pg_tables
WHERE schemaname = %s
AND tablename = %s
""",
(schema_name, table_name),
)
exists_row = cur.fetchone()
if exists_row is None:
raise HTTPException(status_code=404, detail="Unknown table.")
table_ref = f"{schema_name}.{table_name}"
spec = DB_STATUS_TABLE_META.get(table_ref, {})
cur.execute(
"""
SELECT column_name, data_type
FROM information_schema.columns
WHERE table_schema = %s
AND table_name = %s
ORDER BY ordinal_position
""",
(schema_name, table_name),
)
columns = [{"name": str(row["column_name"]), "type": str(row["data_type"])} for row in cur.fetchall()]
cur.execute(f"SELECT COUNT(*)::bigint AS row_count FROM {schema_name}.{table_name}")
row_count = int((cur.fetchone() or {}).get("row_count") or 0)
safe_limit = max(1, min(int(limit), 50))
cur.execute(f"SELECT * FROM {schema_name}.{table_name} LIMIT {safe_limit}")
rows = [make_json_safe(dict(row)) for row in cur.fetchall()]
return {
"table_ref": table_ref,
"schema": schema_name,
"table_name": table_name,
"label": str(spec.get("label") or table_name),
"domain": str(spec.get("domain") or "other"),
"description": str(spec.get("description") or "세부 보조/원본/운영 테이블"),
"related_views": spec.get("related_views") or [],
"row_count": row_count,
"limit": safe_limit,
"columns": columns,
"rows": rows,
}
return {
"generated_at": datetime.utcnow().isoformat() + "Z",
"overview": overview,
"tables": table_items,
"import_batches": import_batches,
"binary_sources": binary_sources,
"notes": [
"members / seat_positions / seat_maps 는 현재 운영 상태를 나타냅니다.",
"member_versions / seat_assignment_versions / history_revisions 는 시점 조회와 변경 이력을 위한 테이블입니다.",
"integration_raw_* / integration_* 는 원본 적재와 표준화 결과를 분리해서 보관합니다.",
"integration_binary_sources 는 사업관리대장 같은 바이너리 원본 보관용입니다.",
"재직 인원은 work_status 값이 '퇴직' 또는 '휴직'이 아닌 구성원 기준입니다.",
],
}
app.mount(
"/integrations/ledger-assets",
StaticFiles(directory=str(BUSINESS_LEDGER_SERVED_DIR), check_dir=False),
@@ -4001,6 +4396,27 @@ def health() -> dict[str, object]:
}
@app.get("/api/admin/db-status")
def admin_db_status() -> dict[str, object]:
return fetch_db_status_snapshot()
@app.get("/api/admin/db-status/table")
def admin_db_status_table(schema: str, table: str, limit: int = 50) -> dict[str, object]:
return fetch_db_table_preview(schema, table, limit)
@app.get("/admin/db-status")
def admin_db_status_view() -> FileResponse:
target = DB_STATUS_SERVED_DIR / "index.html"
if not target.exists():
raise HTTPException(status_code=404, detail="DB status dashboard file not found.")
response = FileResponse(target)
response.headers["Cache-Control"] = "no-store, no-cache, must-revalidate, max-age=0"
response.headers["Pragma"] = "no-cache"
return response
@app.get("/api/integration/business-ledger-default")
def integration_business_ledger_default() -> Response:
with get_conn() as conn: