from contextlib import contextmanager import time from typing import Iterator from psycopg.rows import dict_row import psycopg from .config import DATABASE_URL SCHEMA_SQL = """ CREATE TABLE IF NOT EXISTS members ( id SERIAL PRIMARY KEY, name TEXT NOT NULL, company TEXT, rank TEXT, role TEXT, department TEXT, grp TEXT, division TEXT, team TEXT, cell TEXT, work_status TEXT, work_time TEXT, phone TEXT, email TEXT, seat_label TEXT, photo_url TEXT, sort_order INTEGER NOT NULL DEFAULT 0, created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW() ); CREATE TABLE IF NOT EXISTS seat_positions ( member_id INTEGER PRIMARY KEY REFERENCES members(id) ON DELETE CASCADE, x INTEGER NOT NULL DEFAULT 0, y INTEGER NOT NULL DEFAULT 0, floor_label TEXT, updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW() ); CREATE TABLE IF NOT EXISTS snapshots ( id SERIAL PRIMARY KEY, snapshot_month TEXT NOT NULL, file_path TEXT NOT NULL, created_at TIMESTAMPTZ NOT NULL DEFAULT NOW() ); """ MIGRATION_SQL = """ ALTER TABLE members ADD COLUMN IF NOT EXISTS sort_order INTEGER NOT NULL DEFAULT 0; """ @contextmanager def get_conn() -> Iterator[psycopg.Connection]: with psycopg.connect(DATABASE_URL, row_factory=dict_row) as conn: yield conn def init_db(max_retries: int = 20, retry_delay: float = 2.0) -> None: last_error: Exception | None = None for _ in range(max_retries): try: with get_conn() as conn: with conn.cursor() as cur: cur.execute(SCHEMA_SQL) cur.execute(MIGRATION_SQL) conn.commit() return except psycopg.OperationalError as exc: last_error = exc time.sleep(retry_delay) if last_error is not None: raise last_error