Refactor app structure and simplify team docs

This commit is contained in:
hyunho
2026-04-02 11:13:43 +09:00
parent 8125193378
commit f8ea345882
47 changed files with 4921 additions and 10495 deletions

View File

@@ -24,20 +24,36 @@ from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import HTMLResponse
from fastapi.staticfiles import StaticFiles
from openpyxl import load_workbook
from pydantic import BaseModel, Field
from .auth_routes import register_auth_routes
from .config import BASE_DIR, LEGACY_DIR, MOCK_LOGIN_ENABLED, UPLOAD_DIR
from .db import get_conn, init_db
from .integration_routes import register_integration_routes
from .ledger_runtime import (
build_business_ledger_default_response,
build_ledger_index_response,
sync_default_business_ledger_source,
)
from .member_routes import register_member_routes
from .seatmap_routes import register_seatmap_routes
from .system_routes import register_system_routes
from .routes import (
register_auth_routes,
register_integration_routes,
register_member_routes,
register_seatmap_routes,
register_system_routes,
)
from .repositories import fetch_history_revision, fetch_history_revision_created_at, fetch_history_revisions, fetch_members, fetch_members_as_of
from .schemas import (
MemberBulkPayload,
MemberPayload,
SeatLayoutPayload,
SeatMapPayload,
)
from .services import (
create_history_revision,
normalize_phone,
replace_members,
serialize_member_payload,
sync_member_versions,
sync_seat_assignment_versions,
)
app = FastAPI(title="MH Dashboard Organization API")
@@ -104,61 +120,6 @@ app.mount(
)
class MemberPayload(BaseModel):
id: int | None = None
name: str = Field(min_length=1)
employee_id: str = ""
company: str = ""
rank: str = ""
role: str = ""
department: str = ""
grp: str = ""
division: str = ""
team: str = ""
cell: str = ""
work_status: str = ""
work_time: str = ""
phone: str = ""
email: str = ""
seat_label: str = ""
photo_url: str = ""
sort_order: int | None = None
class MemberBulkPayload(BaseModel):
items: list[MemberPayload]
class SeatMapPayload(BaseModel):
name: str = Field(min_length=1)
image_url: str = ""
source_type: str = "image"
source_url: str = ""
preview_svg: str = ""
view_box_min_x: float | None = None
view_box_min_y: float | None = None
view_box_width: float | None = None
view_box_height: float | None = None
image_width: int | None = None
image_height: int | None = None
grid_rows: int = Field(default=1, ge=1, le=200)
grid_cols: int = Field(default=1, ge=1, le=200)
cell_gap: int = Field(default=0, ge=0, le=24)
is_active: bool = True
class SeatPlacementPayload(BaseModel):
member_id: int
seat_slot_id: int | None = None
row_index: int = Field(default=0, ge=0)
col_index: int = Field(default=0, ge=0)
seat_label: str = ""
class SeatLayoutPayload(BaseModel):
placements: list[SeatPlacementPayload]
LEGACY_HEADER_MAP = {
"이름": "name",
"name": "name",
@@ -203,42 +164,6 @@ LEGACY_HEADER_MAP = {
}
def normalize_phone(value: object) -> str:
raw = str(value or "").strip()
digits = "".join(ch for ch in raw if ch.isdigit())
if not digits:
return ""
if len(digits) == 10 and not digits.startswith("0"):
digits = f"0{digits}"
if len(digits) == 11 and digits.startswith("0"):
return f"{digits[:3]}-{digits[3:7]}-{digits[7:]}"
if len(digits) == 10 and digits.startswith("0"):
return f"{digits[:3]}-{digits[3:6]}-{digits[6:]}"
return raw
def serialize_member_payload(item: MemberPayload, sort_order: int) -> tuple[object, ...]:
return (
item.name.strip(),
item.employee_id.strip(),
item.company.strip(),
item.rank.strip(),
item.role.strip(),
item.department.strip(),
item.grp.strip(),
item.division.strip(),
item.team.strip(),
item.cell.strip(),
item.work_status.strip(),
item.work_time.strip(),
normalize_phone(item.phone),
item.email.strip(),
item.seat_label.strip(),
item.photo_url.strip(),
sort_order,
)
def _encode_auth_bytes(value: bytes) -> str:
return base64.urlsafe_b64encode(value).decode("ascii").rstrip("=")
@@ -430,21 +355,6 @@ def sync_auth_users_from_members(cur) -> None:
ensure_default_admin_user(cur)
def fetch_members() -> list[dict[str, object]]:
with get_conn() as conn:
with conn.cursor() as cur:
cur.execute(
"""
SELECT id, name, employee_id, company, rank, role, department, grp, division, team, cell,
work_status, work_time, phone, email, seat_label, photo_url,
sort_order, created_at, updated_at
FROM members
ORDER BY sort_order ASC, id ASC
"""
)
return cur.fetchall()
def parse_as_of(as_of: str | None) -> datetime | None:
raw = str(as_of or "").strip()
if not raw:
@@ -461,223 +371,6 @@ def parse_as_of(as_of: str | None) -> datetime | None:
raise HTTPException(status_code=400, detail="Invalid as_of format. Use YYYY-MM-DD or ISO datetime.") from exc
def create_history_revision(cur, label_prefix: str, note: str) -> int:
cur.execute(
"""
INSERT INTO history_revisions (scope, revision_label, note)
VALUES ('organization', %s, %s)
RETURNING id
""",
(f"{label_prefix}-{datetime.now(APP_TIMEZONE).strftime('%Y%m%d-%H%M%S-%f')}", note),
)
return int(cur.fetchone()["id"])
def fetch_current_member_state(cur) -> dict[int, dict[str, object]]:
cur.execute(
"""
SELECT id, name, employee_id, company, rank, role, department, grp, division, team, cell,
work_status, work_time, phone, email, seat_label, photo_url, sort_order,
created_at, updated_at
FROM members
"""
)
return {int(row["id"]): row for row in cur.fetchall()}
def sync_member_versions(cur, member_ids: list[int], change_reason: str, revision_no: int) -> None:
if not member_ids:
return
unique_ids = sorted(set(int(member_id) for member_id in member_ids))
current_members = fetch_current_member_state(cur)
cur.execute(
"""
SELECT id, member_id, name, company, rank, role, department, grp, division, team, cell,
work_status, work_time, phone, email, photo_url, valid_from, valid_to
FROM member_versions
WHERE member_id = ANY(%s)
AND valid_to IS NULL
""",
(unique_ids,),
)
active_versions = {int(row["member_id"]): row for row in cur.fetchall()}
for member_id in unique_ids:
current = current_members.get(member_id)
active = active_versions.get(member_id)
if current is None:
if active is not None:
cur.execute(
"UPDATE member_versions SET valid_to = NOW() WHERE id = %s AND valid_to IS NULL",
(int(active["id"]),),
)
continue
current_tuple = (
str(current.get("name") or ""),
str(current.get("company") or ""),
str(current.get("rank") or ""),
str(current.get("role") or ""),
str(current.get("department") or ""),
str(current.get("grp") or ""),
str(current.get("division") or ""),
str(current.get("team") or ""),
str(current.get("cell") or ""),
str(current.get("work_status") or ""),
str(current.get("work_time") or ""),
str(current.get("phone") or ""),
str(current.get("email") or ""),
str(current.get("photo_url") or ""),
)
active_tuple = None
if active is not None:
active_tuple = (
str(active.get("name") or ""),
str(active.get("company") or ""),
str(active.get("rank") or ""),
str(active.get("role") or ""),
str(active.get("department") or ""),
str(active.get("grp") or ""),
str(active.get("division") or ""),
str(active.get("team") or ""),
str(active.get("cell") or ""),
str(active.get("work_status") or ""),
str(active.get("work_time") or ""),
str(active.get("phone") or ""),
str(active.get("email") or ""),
str(active.get("photo_url") or ""),
)
if active_tuple == current_tuple:
continue
if active is not None:
cur.execute(
"UPDATE member_versions SET valid_to = NOW() WHERE id = %s AND valid_to IS NULL",
(int(active["id"]),),
)
cur.execute(
"""
INSERT INTO member_versions (
member_id, name, company, rank, role, department, grp, division, team, cell,
work_status, work_time, phone, email, photo_url,
valid_from, valid_to, revision_no, changed_by_user_id, change_reason
)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, NOW(), NULL, %s, NULL, %s)
""",
(member_id, *current_tuple, revision_no, change_reason),
)
def fetch_current_seat_assignments(cur) -> dict[int, dict[str, object]]:
cur.execute(
"""
SELECT member_id, seat_map_id, seat_slot_id, seat_label, updated_at
FROM seat_positions
"""
)
return {int(row["member_id"]): row for row in cur.fetchall()}
def sync_seat_assignment_versions(cur, member_ids: list[int], change_reason: str, revision_no: int) -> None:
if not member_ids:
return
unique_ids = sorted(set(int(member_id) for member_id in member_ids))
current_assignments = fetch_current_seat_assignments(cur)
cur.execute(
"""
SELECT id, member_id, seat_map_id, seat_slot_id, seat_label
FROM seat_assignment_versions
WHERE member_id = ANY(%s)
AND valid_to IS NULL
""",
(unique_ids,),
)
active_versions = {int(row["member_id"]): row for row in cur.fetchall()}
for member_id in unique_ids:
current = current_assignments.get(member_id)
active = active_versions.get(member_id)
current_tuple = None
if current is not None:
current_tuple = (
current.get("seat_map_id"),
current.get("seat_slot_id"),
str(current.get("seat_label") or ""),
)
active_tuple = None
if active is not None:
active_tuple = (
active.get("seat_map_id"),
active.get("seat_slot_id"),
str(active.get("seat_label") or ""),
)
if active_tuple == current_tuple:
continue
if active is not None:
cur.execute(
"UPDATE seat_assignment_versions SET valid_to = NOW() WHERE id = %s AND valid_to IS NULL",
(int(active["id"]),),
)
if current is None:
continue
cur.execute(
"""
INSERT INTO seat_assignment_versions (
member_id, seat_map_id, seat_slot_id, seat_label,
valid_from, valid_to, revision_no, changed_by_user_id, change_reason
)
VALUES (%s, %s, %s, %s, NOW(), NULL, %s, NULL, %s)
""",
(
member_id,
current.get("seat_map_id"),
current.get("seat_slot_id"),
str(current.get("seat_label") or ""),
revision_no,
change_reason,
),
)
def fetch_members_as_of(cur, as_of: datetime) -> list[dict[str, object]]:
cur.execute(
"""
SELECT mv.member_id AS id,
mv.name,
COALESCE(m.employee_id, '') AS employee_id,
mv.company,
mv.rank,
mv.role,
mv.department,
mv.grp,
mv.division,
mv.team,
mv.cell,
mv.work_status,
mv.work_time,
mv.phone,
mv.email,
COALESCE(sav.seat_label, '') AS seat_label,
mv.photo_url,
COALESCE(m.sort_order, 2147483647) AS sort_order,
mv.created_at,
mv.valid_from AS updated_at,
mv.valid_to AS history_valid_to
FROM member_versions mv
LEFT JOIN members m
ON m.id = mv.member_id
LEFT JOIN seat_assignment_versions sav
ON sav.member_id = mv.member_id
AND sav.valid_from <= %s
AND (sav.valid_to IS NULL OR sav.valid_to > %s)
WHERE mv.valid_from <= %s
AND (mv.valid_to IS NULL OR mv.valid_to > %s)
ORDER BY COALESCE(m.sort_order, 2147483647) ASC, mv.member_id ASC
""",
(as_of, as_of, as_of, as_of),
)
return cur.fetchall()
def build_member_compare_items(from_items: list[dict[str, object]], to_items: list[dict[str, object]]) -> list[dict[str, object]]:
tracked_fields = (
("company", "소속회사", "기본"),
@@ -2297,9 +1990,15 @@ def save_seat_layout(seat_map_id: int, payload: SeatLayoutPayload) -> list[dict[
)
affected_member_ids = sorted(set(previous_member_ids + member_ids))
if affected_member_ids:
revision_no = create_history_revision(cur, "seat-layout", f"Seat layout saved for seat_map_id={seat_map_id}")
sync_seat_assignment_versions(cur, affected_member_ids, f"seat-layout:{seat_map_id}", revision_no)
sync_member_versions(cur, affected_member_ids, f"seat-layout:{seat_map_id}", revision_no)
revision_no = create_history_revision(
cur,
"seat-layout",
f"Seat layout saved for seat_map_id={seat_map_id}",
app_timezone=APP_TIMEZONE,
)
revision_created_at = fetch_history_revision_created_at(cur, revision_no)
sync_seat_assignment_versions(cur, affected_member_ids, f"seat-layout:{seat_map_id}", revision_no, revision_created_at)
sync_member_versions(cur, affected_member_ids, f"seat-layout:{seat_map_id}", revision_no, revision_created_at)
conn.commit()
return fetch_seat_layout(seat_map_id)["placements"]
@@ -2312,126 +2011,6 @@ def get_member_count() -> int:
return int(cur.fetchone()["count"])
def merge_import_member(item: MemberPayload, existing: dict[str, object] | None) -> MemberPayload:
if existing is None:
return item
payload = item.model_copy(deep=True)
if not payload.photo_url.strip():
payload.photo_url = str(existing.get("photo_url") or "")
if not payload.seat_label.strip():
payload.seat_label = str(existing.get("seat_label") or "")
return payload
def pick_existing_member(
item: MemberPayload,
existing_by_employee_id: dict[str, list[dict[str, object]]],
existing_by_name: dict[str, list[dict[str, object]]],
matched_ids: set[int],
) -> dict[str, object] | None:
employee_id = item.employee_id.strip()
if employee_id:
for candidate in existing_by_employee_id.get(employee_id, []):
candidate_id = int(candidate["id"])
if candidate_id not in matched_ids:
return candidate
name = item.name.strip()
if name:
available = [
candidate
for candidate in existing_by_name.get(name, [])
if int(candidate["id"]) not in matched_ids
]
if len(available) == 1:
return available[0]
return None
def replace_members(items: list[MemberPayload]) -> list[dict[str, object]]:
with get_conn() as conn:
with conn.cursor() as cur:
cur.execute(
"""
SELECT id, name, employee_id, company, rank, role, department, grp, division, team, cell,
work_status, work_time, phone, email, seat_label, photo_url,
sort_order, created_at, updated_at
FROM members
ORDER BY id ASC
"""
)
existing_members = cur.fetchall()
existing_by_employee_id: dict[str, list[dict[str, object]]] = {}
existing_by_name: dict[str, list[dict[str, object]]] = {}
for member in existing_members:
employee_id = str(member.get("employee_id") or "").strip()
name = str(member.get("name") or "").strip()
if employee_id:
existing_by_employee_id.setdefault(employee_id, []).append(member)
if name:
existing_by_name.setdefault(name, []).append(member)
matched_ids: set[int] = set()
for index, item in enumerate(items):
existing = pick_existing_member(item, existing_by_employee_id, existing_by_name, matched_ids)
merged_item = merge_import_member(item, existing)
if existing is None:
cur.execute(
"""
INSERT INTO members (
name, employee_id, company, rank, role, department, grp, division, team, cell,
work_status, work_time, phone, email, seat_label, photo_url, sort_order
)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
""",
serialize_member_payload(merged_item, index),
)
continue
matched_ids.add(int(existing["id"]))
cur.execute(
"""
UPDATE members
SET name = %s,
employee_id = %s,
company = %s,
rank = %s,
role = %s,
department = %s,
grp = %s,
division = %s,
team = %s,
cell = %s,
work_status = %s,
work_time = %s,
phone = %s,
email = %s,
seat_label = %s,
photo_url = %s,
sort_order = %s,
updated_at = NOW()
WHERE id = %s
""",
(*serialize_member_payload(merged_item, index), int(existing["id"])),
)
stale_ids = [int(member["id"]) for member in existing_members if int(member["id"]) not in matched_ids]
if stale_ids:
cur.execute("DELETE FROM members WHERE id = ANY(%s)", (stale_ids,))
sync_auth_users_from_members(cur)
cur.execute("SELECT id FROM members")
current_ids = [int(row["id"]) for row in cur.fetchall()]
affected_member_ids = sorted(set(current_ids + [int(member["id"]) for member in existing_members]))
if affected_member_ids:
revision_no = create_history_revision(cur, "members-bulk-sync", "Bulk member sync applied")
sync_member_versions(cur, affected_member_ids, "members-bulk-sync", revision_no)
sync_seat_assignment_versions(cur, affected_member_ids, "members-bulk-sync", revision_no)
conn.commit()
return fetch_members()
def rows_to_member_payloads(rows: list[list[object]]) -> list[MemberPayload]:
def normalize_header(value: object) -> str:
return str(value or "").strip().lower()
@@ -2933,7 +2512,7 @@ def parse_project_category_mapping_source(path: Path) -> list[dict[str, str]]:
def fetch_member_lookup() -> tuple[dict[str, dict[str, object]], dict[str, list[dict[str, object]]]]:
members = fetch_members()
members = fetch_members(get_conn)
by_employee_id = {
clean_text(member.get("employee_id")): member
for member in members
@@ -3119,7 +2698,12 @@ def import_integration_sources() -> dict[str, object]:
payment_raw_rows, payment_vouchers = parse_payment_source(payment_path)
project_category_mappings = parse_project_category_mapping_source(project_mapping_path) if project_mapping_path.exists() else []
replace_members(organization_members)
replace_members(
organization_members,
get_conn=get_conn,
sync_auth_users_from_members=sync_auth_users_from_members,
app_timezone=APP_TIMEZONE,
)
members_by_employee_id, members_by_name = fetch_member_lookup()
with get_conn() as conn:
@@ -4073,15 +3657,26 @@ register_member_routes(
member_payload_cls=MemberPayload,
member_bulk_payload_cls=MemberBulkPayload,
parse_as_of=parse_as_of,
fetch_members=fetch_members,
fetch_members=lambda: fetch_members(get_conn),
fetch_members_as_of=fetch_members_as_of,
build_member_compare_items=build_member_compare_items,
serialize_member_payload=serialize_member_payload,
sync_auth_users_from_members=sync_auth_users_from_members,
create_history_revision=create_history_revision,
create_history_revision=lambda cur, label_prefix, note: create_history_revision(
cur,
label_prefix,
note,
app_timezone=APP_TIMEZONE,
),
fetch_history_revision_created_at=fetch_history_revision_created_at,
sync_member_versions=sync_member_versions,
sync_seat_assignment_versions=sync_seat_assignment_versions,
replace_members=replace_members,
replace_members=lambda items: replace_members(
items,
get_conn=get_conn,
sync_auth_users_from_members=sync_auth_users_from_members,
app_timezone=APP_TIMEZONE,
),
parse_import_rows=parse_import_rows,
)
register_integration_routes(

View File

@@ -0,0 +1,19 @@
from .organization import (
fetch_current_member_state,
fetch_current_seat_assignments,
fetch_history_revision,
fetch_history_revision_created_at,
fetch_history_revisions,
fetch_members,
fetch_members_as_of,
)
__all__ = [
"fetch_current_member_state",
"fetch_current_seat_assignments",
"fetch_history_revision",
"fetch_history_revision_created_at",
"fetch_history_revisions",
"fetch_members",
"fetch_members_as_of",
]

View File

@@ -0,0 +1,145 @@
from __future__ import annotations
from datetime import date, datetime, time, timedelta
from typing import Callable
from fastapi import HTTPException
def fetch_members(get_conn) -> list[dict[str, object]]:
with get_conn() as conn:
with conn.cursor() as cur:
cur.execute(
"""
SELECT id, name, employee_id, company, rank, role, department, grp, division, team, cell,
work_status, work_time, phone, email, seat_label, photo_url,
sort_order, created_at, updated_at
FROM members
ORDER BY sort_order ASC, id ASC
"""
)
return cur.fetchall()
def fetch_history_revision(cur, revision_id: int) -> dict[str, object] | None:
cur.execute(
"""
SELECT id, revision_label, created_at, note
FROM history_revisions
WHERE scope = 'organization'
AND id = %s
""",
(revision_id,),
)
return cur.fetchone()
def fetch_history_revisions(
cur,
*,
app_timezone,
day: date | None = None,
limit: int = 100,
) -> list[dict[str, object]]:
safe_limit = max(1, min(int(limit), 500))
if day is None:
cur.execute(
"""
SELECT id, revision_label, created_at, note
FROM history_revisions
WHERE scope = 'organization'
ORDER BY created_at DESC, id DESC
LIMIT %s
""",
(safe_limit,),
)
return cur.fetchall()
day_start = datetime.combine(day, time.min, tzinfo=app_timezone)
day_end = day_start + timedelta(days=1)
cur.execute(
"""
SELECT id, revision_label, created_at, note
FROM history_revisions
WHERE scope = 'organization'
AND created_at >= %s
AND created_at < %s
ORDER BY created_at ASC, id ASC
LIMIT %s
""",
(day_start, day_end, safe_limit),
)
return cur.fetchall()
def fetch_history_revision_created_at(cur, revision_id: int) -> datetime:
revision = fetch_history_revision(cur, revision_id)
if revision is None:
raise HTTPException(status_code=404, detail="History revision not found.")
return revision["created_at"]
def fetch_current_member_state(cur) -> dict[int, dict[str, object]]:
cur.execute(
"""
SELECT id, name, employee_id, company, rank, role, department, grp, division, team, cell,
work_status, work_time, phone, email, seat_label, photo_url, sort_order,
created_at, updated_at
FROM members
"""
)
return {int(row["id"]): row for row in cur.fetchall()}
def fetch_current_seat_assignments(cur) -> dict[int, dict[str, object]]:
cur.execute(
"""
SELECT member_id, seat_map_id, seat_slot_id, seat_label, updated_at
FROM seat_positions
"""
)
return {int(row["member_id"]): row for row in cur.fetchall()}
def fetch_members_as_of(cur, as_of: datetime) -> list[dict[str, object]]:
cur.execute(
"""
SELECT mv.member_id AS id,
mv.name,
COALESCE(m.employee_id, '') AS employee_id,
mv.company,
mv.rank,
mv.role,
mv.department,
mv.grp,
mv.division,
mv.team,
mv.cell,
mv.work_status,
mv.work_time,
mv.phone,
mv.email,
COALESCE(sav.seat_label, '') AS seat_label,
mv.photo_url,
COALESCE(m.sort_order, 2147483647) AS sort_order,
mv.created_at,
mv.valid_from AS updated_at,
mv.valid_to AS history_valid_to,
mv.revision_no,
hr.created_at AS revision_created_at
FROM member_versions mv
LEFT JOIN members m
ON m.id = mv.member_id
LEFT JOIN history_revisions hr
ON hr.id = mv.revision_no
LEFT JOIN seat_assignment_versions sav
ON sav.member_id = mv.member_id
AND sav.valid_from <= %s
AND (sav.valid_to IS NULL OR sav.valid_to > %s)
WHERE mv.valid_from <= %s
AND (mv.valid_to IS NULL OR mv.valid_to > %s)
ORDER BY COALESCE(m.sort_order, 2147483647) ASC, mv.member_id ASC
""",
(as_of, as_of, as_of, as_of),
)
return cur.fetchall()

View File

@@ -0,0 +1,13 @@
from .auth import register_auth_routes
from .integration import register_integration_routes
from .organization import register_member_routes
from .seatmap import register_seatmap_routes
from .system import register_system_routes
__all__ = [
"register_auth_routes",
"register_integration_routes",
"register_member_routes",
"register_seatmap_routes",
"register_system_routes",
]

View File

@@ -0,0 +1,3 @@
from ..auth_routes import register_auth_routes
__all__ = ["register_auth_routes"]

View File

@@ -0,0 +1,3 @@
from ..integration_routes import register_integration_routes
__all__ = ["register_integration_routes"]

View File

@@ -0,0 +1,3 @@
from ..member_routes import register_member_routes
__all__ = ["register_member_routes"]

View File

@@ -0,0 +1,3 @@
from ..seatmap_routes import register_seatmap_routes
__all__ = ["register_seatmap_routes"]

View File

@@ -0,0 +1,3 @@
from ..system_routes import register_system_routes
__all__ = ["register_system_routes"]

View File

@@ -0,0 +1,15 @@
from .organization import (
MemberBulkPayload,
MemberPayload,
SeatLayoutPayload,
SeatLayoutPlacementPayload,
SeatMapPayload,
)
__all__ = [
"MemberBulkPayload",
"MemberPayload",
"SeatLayoutPayload",
"SeatLayoutPlacementPayload",
"SeatMapPayload",
]

View File

@@ -0,0 +1,58 @@
from __future__ import annotations
from pydantic import BaseModel, Field
class MemberPayload(BaseModel):
id: int | None = None
name: str = Field(min_length=1)
employee_id: str = ""
company: str = ""
rank: str = ""
role: str = ""
department: str = ""
grp: str = ""
division: str = ""
team: str = ""
cell: str = ""
work_status: str = ""
work_time: str = ""
phone: str = ""
email: str = ""
seat_label: str = ""
photo_url: str = ""
sort_order: int | None = None
class MemberBulkPayload(BaseModel):
items: list[MemberPayload]
class SeatMapPayload(BaseModel):
name: str = Field(min_length=1)
image_url: str = ""
source_type: str = "image"
source_url: str = ""
preview_svg: str = ""
view_box_min_x: float | None = None
view_box_min_y: float | None = None
view_box_width: float | None = None
view_box_height: float | None = None
image_width: int | None = None
image_height: int | None = None
grid_rows: int = Field(default=1, ge=1, le=200)
grid_cols: int = Field(default=1, ge=1, le=200)
cell_gap: int = Field(default=0, ge=0, le=24)
is_active: bool = True
class SeatLayoutPlacementPayload(BaseModel):
member_id: int
seat_slot_id: int | None = None
row_index: int = Field(default=0, ge=0)
col_index: int = Field(default=0, ge=0)
seat_label: str = ""
class SeatLayoutPayload(BaseModel):
placements: list[SeatLayoutPlacementPayload]

View File

@@ -0,0 +1,21 @@
from .organization import (
create_history_revision,
merge_import_member,
normalize_phone,
pick_existing_member,
replace_members,
serialize_member_payload,
sync_member_versions,
sync_seat_assignment_versions,
)
__all__ = [
"create_history_revision",
"merge_import_member",
"normalize_phone",
"pick_existing_member",
"replace_members",
"serialize_member_payload",
"sync_member_versions",
"sync_seat_assignment_versions",
]

View File

@@ -0,0 +1,350 @@
from __future__ import annotations
from datetime import datetime, timezone
from typing import Callable
from ..repositories import (
fetch_current_member_state,
fetch_current_seat_assignments,
fetch_history_revision_created_at,
fetch_members,
)
from ..schemas import MemberPayload
def normalize_phone(value: object) -> str:
raw = str(value or "").strip()
digits = "".join(ch for ch in raw if ch.isdigit())
if not digits:
return ""
if len(digits) == 10 and not digits.startswith("0"):
digits = f"0{digits}"
if len(digits) == 11 and digits.startswith("0"):
return f"{digits[:3]}-{digits[3:7]}-{digits[7:]}"
if len(digits) == 10 and digits.startswith("0"):
return f"{digits[:3]}-{digits[3:6]}-{digits[6:]}"
return raw
def serialize_member_payload(item, sort_order: int) -> tuple[object, ...]:
return (
item.name.strip(),
item.employee_id.strip(),
item.company.strip(),
item.rank.strip(),
item.role.strip(),
item.department.strip(),
item.grp.strip(),
item.division.strip(),
item.team.strip(),
item.cell.strip(),
item.work_status.strip(),
item.work_time.strip(),
normalize_phone(item.phone),
item.email.strip(),
item.seat_label.strip(),
item.photo_url.strip(),
sort_order,
)
def create_history_revision(cur, label_prefix: str, note: str, *, app_timezone) -> int:
cur.execute(
"""
INSERT INTO history_revisions (scope, revision_label, note)
VALUES ('organization', %s, %s)
RETURNING id
""",
(f"{label_prefix}-{datetime.now(app_timezone).strftime('%Y%m%d-%H%M%S-%f')}", note),
)
return int(cur.fetchone()["id"])
def sync_member_versions(
cur,
member_ids: list[int],
change_reason: str,
revision_no: int,
effective_at: datetime | None = None,
) -> None:
if not member_ids:
return
event_at = effective_at or datetime.now(timezone.utc)
unique_ids = sorted(set(int(member_id) for member_id in member_ids))
current_members = fetch_current_member_state(cur)
cur.execute(
"""
SELECT id, member_id, name, company, rank, role, department, grp, division, team, cell,
work_status, work_time, phone, email, photo_url, valid_from, valid_to
FROM member_versions
WHERE member_id = ANY(%s)
AND valid_to IS NULL
""",
(unique_ids,),
)
active_versions = {int(row["member_id"]): row for row in cur.fetchall()}
for member_id in unique_ids:
current = current_members.get(member_id)
active = active_versions.get(member_id)
if current is None:
if active is not None:
cur.execute(
"UPDATE member_versions SET valid_to = %s WHERE id = %s AND valid_to IS NULL",
(event_at, int(active["id"])),
)
continue
current_tuple = (
str(current.get("name") or ""),
str(current.get("company") or ""),
str(current.get("rank") or ""),
str(current.get("role") or ""),
str(current.get("department") or ""),
str(current.get("grp") or ""),
str(current.get("division") or ""),
str(current.get("team") or ""),
str(current.get("cell") or ""),
str(current.get("work_status") or ""),
str(current.get("work_time") or ""),
str(current.get("phone") or ""),
str(current.get("email") or ""),
str(current.get("photo_url") or ""),
)
active_tuple = None
if active is not None:
active_tuple = (
str(active.get("name") or ""),
str(active.get("company") or ""),
str(active.get("rank") or ""),
str(active.get("role") or ""),
str(active.get("department") or ""),
str(active.get("grp") or ""),
str(active.get("division") or ""),
str(active.get("team") or ""),
str(active.get("cell") or ""),
str(active.get("work_status") or ""),
str(active.get("work_time") or ""),
str(active.get("phone") or ""),
str(active.get("email") or ""),
str(active.get("photo_url") or ""),
)
if active_tuple == current_tuple:
continue
if active is not None:
cur.execute(
"UPDATE member_versions SET valid_to = %s WHERE id = %s AND valid_to IS NULL",
(event_at, int(active["id"])),
)
cur.execute(
"""
INSERT INTO member_versions (
member_id, name, company, rank, role, department, grp, division, team, cell,
work_status, work_time, phone, email, photo_url,
valid_from, valid_to, revision_no, changed_by_user_id, change_reason
)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, NULL, %s, NULL, %s)
""",
(member_id, *current_tuple, event_at, revision_no, change_reason),
)
def sync_seat_assignment_versions(
cur,
member_ids: list[int],
change_reason: str,
revision_no: int,
effective_at: datetime | None = None,
) -> None:
if not member_ids:
return
event_at = effective_at or datetime.now(timezone.utc)
unique_ids = sorted(set(int(member_id) for member_id in member_ids))
current_assignments = fetch_current_seat_assignments(cur)
cur.execute(
"""
SELECT id, member_id, seat_map_id, seat_slot_id, seat_label
FROM seat_assignment_versions
WHERE member_id = ANY(%s)
AND valid_to IS NULL
""",
(unique_ids,),
)
active_versions = {int(row["member_id"]): row for row in cur.fetchall()}
for member_id in unique_ids:
current = current_assignments.get(member_id)
active = active_versions.get(member_id)
current_tuple = None
if current is not None:
current_tuple = (
current.get("seat_map_id"),
current.get("seat_slot_id"),
str(current.get("seat_label") or ""),
)
active_tuple = None
if active is not None:
active_tuple = (
active.get("seat_map_id"),
active.get("seat_slot_id"),
str(active.get("seat_label") or ""),
)
if active_tuple == current_tuple:
continue
if active is not None:
cur.execute(
"UPDATE seat_assignment_versions SET valid_to = %s WHERE id = %s AND valid_to IS NULL",
(event_at, int(active["id"])),
)
if current is None:
continue
cur.execute(
"""
INSERT INTO seat_assignment_versions (
member_id, seat_map_id, seat_slot_id, seat_label,
valid_from, valid_to, revision_no, changed_by_user_id, change_reason
)
VALUES (%s, %s, %s, %s, %s, NULL, %s, NULL, %s)
""",
(
member_id,
current.get("seat_map_id"),
current.get("seat_slot_id"),
str(current.get("seat_label") or ""),
event_at,
revision_no,
change_reason,
),
)
def merge_import_member(item: MemberPayload, existing: dict[str, object] | None) -> MemberPayload:
if existing is None:
return item
payload = item.model_copy(deep=True)
if not payload.photo_url.strip():
payload.photo_url = str(existing.get("photo_url") or "")
if not payload.seat_label.strip():
payload.seat_label = str(existing.get("seat_label") or "")
return payload
def pick_existing_member(
item: MemberPayload,
existing_by_employee_id: dict[str, list[dict[str, object]]],
existing_by_name: dict[str, list[dict[str, object]]],
matched_ids: set[int],
) -> dict[str, object] | None:
employee_id = item.employee_id.strip()
if employee_id:
for candidate in existing_by_employee_id.get(employee_id, []):
candidate_id = int(candidate["id"])
if candidate_id not in matched_ids:
return candidate
name = item.name.strip()
if name:
available = [
candidate
for candidate in existing_by_name.get(name, [])
if int(candidate["id"]) not in matched_ids
]
if len(available) == 1:
return available[0]
return None
def replace_members(
items: list[MemberPayload],
*,
get_conn,
sync_auth_users_from_members: Callable[[object], None],
app_timezone,
) -> list[dict[str, object]]:
with get_conn() as conn:
with conn.cursor() as cur:
cur.execute(
"""
SELECT id, name, employee_id, company, rank, role, department, grp, division, team, cell,
work_status, work_time, phone, email, seat_label, photo_url,
sort_order, created_at, updated_at
FROM members
ORDER BY id ASC
"""
)
existing_members = cur.fetchall()
existing_by_employee_id: dict[str, list[dict[str, object]]] = {}
existing_by_name: dict[str, list[dict[str, object]]] = {}
for member in existing_members:
employee_id = str(member.get("employee_id") or "").strip()
name = str(member.get("name") or "").strip()
if employee_id:
existing_by_employee_id.setdefault(employee_id, []).append(member)
if name:
existing_by_name.setdefault(name, []).append(member)
matched_ids: set[int] = set()
for index, item in enumerate(items):
existing = pick_existing_member(item, existing_by_employee_id, existing_by_name, matched_ids)
merged_item = merge_import_member(item, existing)
if existing is None:
cur.execute(
"""
INSERT INTO members (
name, employee_id, company, rank, role, department, grp, division, team, cell,
work_status, work_time, phone, email, seat_label, photo_url, sort_order
)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
""",
serialize_member_payload(merged_item, index),
)
continue
matched_ids.add(int(existing["id"]))
cur.execute(
"""
UPDATE members
SET name = %s,
employee_id = %s,
company = %s,
rank = %s,
role = %s,
department = %s,
grp = %s,
division = %s,
team = %s,
cell = %s,
work_status = %s,
work_time = %s,
phone = %s,
email = %s,
seat_label = %s,
photo_url = %s,
sort_order = %s,
updated_at = NOW()
WHERE id = %s
""",
(*serialize_member_payload(merged_item, index), int(existing["id"])),
)
stale_ids = [int(member["id"]) for member in existing_members if int(member["id"]) not in matched_ids]
if stale_ids:
cur.execute("DELETE FROM members WHERE id = ANY(%s)", (stale_ids,))
sync_auth_users_from_members(cur)
cur.execute("SELECT id FROM members")
current_ids = [int(row["id"]) for row in cur.fetchall()]
affected_member_ids = sorted(set(current_ids + [int(member["id"]) for member in existing_members]))
if affected_member_ids:
revision_no = create_history_revision(
cur,
"members-bulk-sync",
"Bulk member sync applied",
app_timezone=app_timezone,
)
revision_created_at = fetch_history_revision_created_at(cur, revision_no)
sync_member_versions(cur, affected_member_ids, "members-bulk-sync", revision_no, revision_created_at)
sync_seat_assignment_versions(cur, affected_member_ids, "members-bulk-sync", revision_no, revision_created_at)
conn.commit()
return fetch_members(get_conn)