from __future__ import annotations from datetime import datetime, timezone from typing import Callable from ..repositories import ( fetch_current_member_state, fetch_current_seat_assignments, fetch_history_revision_created_at, fetch_members, ) from ..schemas import MemberPayload def normalize_phone(value: object) -> str: raw = str(value or "").strip() digits = "".join(ch for ch in raw if ch.isdigit()) if not digits: return "" if len(digits) == 10 and not digits.startswith("0"): digits = f"0{digits}" if len(digits) == 11 and digits.startswith("0"): return f"{digits[:3]}-{digits[3:7]}-{digits[7:]}" if len(digits) == 10 and digits.startswith("0"): return f"{digits[:3]}-{digits[3:6]}-{digits[6:]}" return raw def serialize_member_payload(item, sort_order: int) -> tuple[object, ...]: return ( item.name.strip(), item.employee_id.strip(), item.company.strip(), item.rank.strip(), item.role.strip(), item.department.strip(), item.grp.strip(), item.division.strip(), item.team.strip(), item.cell.strip(), item.work_status.strip(), item.work_time.strip(), normalize_phone(item.phone), item.email.strip(), item.seat_label.strip(), item.photo_url.strip(), sort_order, ) def create_history_revision(cur, label_prefix: str, note: str, *, app_timezone) -> int: cur.execute( """ INSERT INTO history_revisions (scope, revision_label, note) VALUES ('organization', %s, %s) RETURNING id """, (f"{label_prefix}-{datetime.now(app_timezone).strftime('%Y%m%d-%H%M%S-%f')}", note), ) return int(cur.fetchone()["id"]) def sync_member_versions( cur, member_ids: list[int], change_reason: str, revision_no: int, effective_at: datetime | None = None, ) -> None: if not member_ids: return event_at = effective_at or datetime.now(timezone.utc) unique_ids = sorted(set(int(member_id) for member_id in member_ids)) current_members = fetch_current_member_state(cur) cur.execute( """ SELECT id, member_id, name, company, rank, role, department, grp, division, team, cell, work_status, work_time, phone, email, photo_url, valid_from, valid_to FROM member_versions WHERE member_id = ANY(%s) AND valid_to IS NULL """, (unique_ids,), ) active_versions = {int(row["member_id"]): row for row in cur.fetchall()} for member_id in unique_ids: current = current_members.get(member_id) active = active_versions.get(member_id) if current is None: if active is not None: cur.execute( "UPDATE member_versions SET valid_to = %s WHERE id = %s AND valid_to IS NULL", (event_at, int(active["id"])), ) continue current_tuple = ( str(current.get("name") or ""), str(current.get("company") or ""), str(current.get("rank") or ""), str(current.get("role") or ""), str(current.get("department") or ""), str(current.get("grp") or ""), str(current.get("division") or ""), str(current.get("team") or ""), str(current.get("cell") or ""), str(current.get("work_status") or ""), str(current.get("work_time") or ""), str(current.get("phone") or ""), str(current.get("email") or ""), str(current.get("photo_url") or ""), ) active_tuple = None if active is not None: active_tuple = ( str(active.get("name") or ""), str(active.get("company") or ""), str(active.get("rank") or ""), str(active.get("role") or ""), str(active.get("department") or ""), str(active.get("grp") or ""), str(active.get("division") or ""), str(active.get("team") or ""), str(active.get("cell") or ""), str(active.get("work_status") or ""), str(active.get("work_time") or ""), str(active.get("phone") or ""), str(active.get("email") or ""), str(active.get("photo_url") or ""), ) if active_tuple == current_tuple: continue if active is not None: cur.execute( "UPDATE member_versions SET valid_to = %s WHERE id = %s AND valid_to IS NULL", (event_at, int(active["id"])), ) cur.execute( """ INSERT INTO member_versions ( member_id, name, company, rank, role, department, grp, division, team, cell, work_status, work_time, phone, email, photo_url, valid_from, valid_to, revision_no, changed_by_user_id, change_reason ) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, NULL, %s, NULL, %s) """, (member_id, *current_tuple, event_at, revision_no, change_reason), ) def sync_seat_assignment_versions( cur, member_ids: list[int], change_reason: str, revision_no: int, effective_at: datetime | None = None, ) -> None: if not member_ids: return event_at = effective_at or datetime.now(timezone.utc) unique_ids = sorted(set(int(member_id) for member_id in member_ids)) current_assignments = fetch_current_seat_assignments(cur) cur.execute( """ SELECT id, member_id, seat_map_id, seat_slot_id, seat_label FROM seat_assignment_versions WHERE member_id = ANY(%s) AND valid_to IS NULL """, (unique_ids,), ) active_versions = {int(row["member_id"]): row for row in cur.fetchall()} for member_id in unique_ids: current = current_assignments.get(member_id) active = active_versions.get(member_id) current_tuple = None if current is not None: current_tuple = ( current.get("seat_map_id"), current.get("seat_slot_id"), str(current.get("seat_label") or ""), ) active_tuple = None if active is not None: active_tuple = ( active.get("seat_map_id"), active.get("seat_slot_id"), str(active.get("seat_label") or ""), ) if active_tuple == current_tuple: continue if active is not None: cur.execute( "UPDATE seat_assignment_versions SET valid_to = %s WHERE id = %s AND valid_to IS NULL", (event_at, int(active["id"])), ) if current is None: continue cur.execute( """ INSERT INTO seat_assignment_versions ( member_id, seat_map_id, seat_slot_id, seat_label, valid_from, valid_to, revision_no, changed_by_user_id, change_reason ) VALUES (%s, %s, %s, %s, %s, NULL, %s, NULL, %s) """, ( member_id, current.get("seat_map_id"), current.get("seat_slot_id"), str(current.get("seat_label") or ""), event_at, revision_no, change_reason, ), ) def merge_import_member(item: MemberPayload, existing: dict[str, object] | None) -> MemberPayload: if existing is None: return item payload = item.model_copy(deep=True) if not payload.photo_url.strip(): payload.photo_url = str(existing.get("photo_url") or "") if not payload.seat_label.strip(): payload.seat_label = str(existing.get("seat_label") or "") return payload def pick_existing_member( item: MemberPayload, existing_by_employee_id: dict[str, list[dict[str, object]]], existing_by_name: dict[str, list[dict[str, object]]], matched_ids: set[int], ) -> dict[str, object] | None: employee_id = item.employee_id.strip() if employee_id: for candidate in existing_by_employee_id.get(employee_id, []): candidate_id = int(candidate["id"]) if candidate_id not in matched_ids: return candidate name = item.name.strip() if name: available = [ candidate for candidate in existing_by_name.get(name, []) if int(candidate["id"]) not in matched_ids ] if len(available) == 1: return available[0] return None def replace_members( items: list[MemberPayload], *, get_conn, sync_auth_users_from_members: Callable[[object], None], app_timezone, ) -> list[dict[str, object]]: with get_conn() as conn: with conn.cursor() as cur: cur.execute( """ SELECT id, name, employee_id, company, rank, role, department, grp, division, team, cell, work_status, work_time, phone, email, seat_label, photo_url, sort_order, created_at, updated_at FROM members ORDER BY id ASC """ ) existing_members = cur.fetchall() existing_by_employee_id: dict[str, list[dict[str, object]]] = {} existing_by_name: dict[str, list[dict[str, object]]] = {} for member in existing_members: employee_id = str(member.get("employee_id") or "").strip() name = str(member.get("name") or "").strip() if employee_id: existing_by_employee_id.setdefault(employee_id, []).append(member) if name: existing_by_name.setdefault(name, []).append(member) matched_ids: set[int] = set() for index, item in enumerate(items): existing = pick_existing_member(item, existing_by_employee_id, existing_by_name, matched_ids) merged_item = merge_import_member(item, existing) if existing is None: cur.execute( """ INSERT INTO members ( name, employee_id, company, rank, role, department, grp, division, team, cell, work_status, work_time, phone, email, seat_label, photo_url, sort_order ) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s) """, serialize_member_payload(merged_item, index), ) continue matched_ids.add(int(existing["id"])) cur.execute( """ UPDATE members SET name = %s, employee_id = %s, company = %s, rank = %s, role = %s, department = %s, grp = %s, division = %s, team = %s, cell = %s, work_status = %s, work_time = %s, phone = %s, email = %s, seat_label = %s, photo_url = %s, sort_order = %s, updated_at = NOW() WHERE id = %s """, (*serialize_member_payload(merged_item, index), int(existing["id"])), ) stale_ids = [int(member["id"]) for member in existing_members if int(member["id"]) not in matched_ids] if stale_ids: cur.execute("DELETE FROM members WHERE id = ANY(%s)", (stale_ids,)) sync_auth_users_from_members(cur) cur.execute("SELECT id FROM members") current_ids = [int(row["id"]) for row in cur.fetchall()] affected_member_ids = sorted(set(current_ids + [int(member["id"]) for member in existing_members])) if affected_member_ids: revision_no = create_history_revision( cur, "members-bulk-sync", "Bulk member sync applied", app_timezone=app_timezone, ) revision_created_at = fetch_history_revision_created_at(cur, revision_no) sync_member_versions(cur, affected_member_ids, "members-bulk-sync", revision_no, revision_created_at) sync_seat_assignment_versions(cur, affected_member_ids, "members-bulk-sync", revision_no, revision_created_at) conn.commit() return fetch_members(get_conn)