Add multi-office seat maps and dev/prod DB sync protocol

This commit is contained in:
hyunho
2026-03-27 16:34:43 +09:00
parent 1d15cf9b9b
commit d66614123e
16 changed files with 3427 additions and 146 deletions

View File

@@ -8,6 +8,7 @@ import hmac
from io import BytesIO, StringIO
import json
import math
from decimal import Decimal, ROUND_HALF_UP
from pathlib import Path
import re
import secrets
@@ -42,9 +43,24 @@ app.add_middleware(
LEGACY_STATIC_DIR = LEGACY_DIR / "static"
INCOMING_FILES_DIR = BASE_DIR / "incoming-files"
FIXED_OFFICE_SOURCE_KEY = "technical-development-center"
FIXED_OFFICE_NAME = "기술개발센터"
FIXED_OFFICE_TEMPLATE_PATH = Path(__file__).with_name("center_chair_viewer_template.html")
_fixed_office_cache: dict[str, object] | None = None
FIXED_OFFICE_CONFIGS = {
"technical-development-center": {
"name": "기술개발센터",
"html_path": INCOMING_FILES_DIR / "seat" / "center_chair_people_map.html",
"payload_path": INCOMING_FILES_DIR / "seat" / "center_chair_people_payload.js",
},
"hanmac-building-6f": {
"name": "한맥빌딩 6층",
"html_path": INCOMING_FILES_DIR / "seat" / "center_chair_people_map_6f.html",
"payload_path": INCOMING_FILES_DIR / "seat" / "center_chair_people_payload_6f.js",
},
"hanmac-building-7f": {
"name": "한맥빌딩 7층",
"html_path": INCOMING_FILES_DIR / "seat" / "center_chair_people_map_7f.html",
"payload_path": INCOMING_FILES_DIR / "seat" / "center_chair_people_payload_7f.js",
},
}
_fixed_office_cache: dict[str, dict[str, object]] = {}
AUTH_DEFAULT_PASSWORD = "1111"
AUTH_PASSWORD_ITERATIONS = 390000
AUTH_SESSION_HOURS = 12
@@ -462,8 +478,11 @@ def fetch_active_seat_map() -> dict[str, object] | None:
return cur.fetchone()
def ensure_fixed_office_seat_map(activate: bool = True) -> dict[str, object]:
template = parse_fixed_office_template()
def ensure_fixed_office_seat_map(office_key: str = FIXED_OFFICE_SOURCE_KEY, activate: bool = True) -> dict[str, object]:
config = FIXED_OFFICE_CONFIGS.get(office_key)
if not config:
raise HTTPException(status_code=404, detail="Fixed office configuration not found.")
template = parse_fixed_office_template(office_key)
slots = template["slots"]
with get_conn() as conn:
with conn.cursor() as cur:
@@ -475,7 +494,7 @@ def ensure_fixed_office_seat_map(activate: bool = True) -> dict[str, object]:
AND source_url = %s
LIMIT 1
""",
(FIXED_OFFICE_SOURCE_KEY,),
(office_key,),
)
row = cur.fetchone()
if activate:
@@ -491,7 +510,7 @@ def ensure_fixed_office_seat_map(activate: bool = True) -> dict[str, object]:
VALUES (%s, '', 'fixed_html', %s, '', NULL, NULL, NULL, NULL, NULL, NULL, 1, 1, 0, %s)
RETURNING id
""",
(FIXED_OFFICE_NAME, FIXED_OFFICE_SOURCE_KEY, activate),
(str(config["name"]), office_key, activate),
)
seat_map_id = int(cur.fetchone()["id"])
else:
@@ -511,7 +530,7 @@ def ensure_fixed_office_seat_map(activate: bool = True) -> dict[str, object]:
updated_at = NOW()
WHERE id = %s
""",
(FIXED_OFFICE_NAME, FIXED_OFFICE_SOURCE_KEY, activate, seat_map_id),
(str(config["name"]), office_key, activate, seat_map_id),
)
cur.execute("SELECT id, slot_key FROM seat_slots WHERE seat_map_id = %s", (seat_map_id,))
@@ -591,18 +610,36 @@ def decode_segment_values(raw_base64: str) -> list[int]:
return [item[0] for item in struct.iter_unpack("<i", decoded)]
def parse_fixed_office_template() -> dict[str, object]:
global _fixed_office_cache
if _fixed_office_cache is not None:
return _fixed_office_cache
if not FIXED_OFFICE_TEMPLATE_PATH.exists():
raise HTTPException(status_code=500, detail="Fixed office viewer template not found.")
def parse_fixed_office_template(office_key: str = FIXED_OFFICE_SOURCE_KEY) -> dict[str, object]:
cached = _fixed_office_cache.get(office_key)
if cached is not None:
return cached
html = FIXED_OFFICE_TEMPLATE_PATH.read_text(encoding="utf-8")
match = re.search(r"const DATA = (\{.*?\});\n\s*function decodeSegments", html, flags=re.S)
if not match:
raise HTTPException(status_code=500, detail="Fixed office viewer data not found.")
data = json.loads(match.group(1))
config = FIXED_OFFICE_CONFIGS.get(office_key)
if not config:
raise HTTPException(status_code=404, detail="Fixed office configuration not found.")
html_path = Path(str(config["html_path"]))
payload_path = Path(str(config["payload_path"]))
if not html_path.exists():
raise HTTPException(status_code=500, detail=f"Fixed office viewer template not found: {office_key}")
if not payload_path.exists():
raise HTTPException(status_code=500, detail=f"Fixed office payload not found: {office_key}")
html = html_path.read_text(encoding="utf-8")
payload_js = payload_path.read_text(encoding="utf-8")
payload_match = re.search(r"window\.CHAIR_MAP_DATA\s*=\s*(\{.*\});?\s*$", payload_js, flags=re.S)
if not payload_match:
raise HTTPException(status_code=500, detail=f"Fixed office viewer data not found: {office_key}")
html = re.sub(
r'<script\s+src="\./[^"]+payload[^"]*\.js"></script>',
f"<script>{payload_js}</script>",
html,
count=1,
)
data = json.loads(payload_match.group(1))
chair_values = decode_segment_values(str(data["chairSegsB64"]))
slots: list[dict[str, object]] = []
for index, chair in enumerate(data["chairs"]):
@@ -633,12 +670,13 @@ def parse_fixed_office_template() -> dict[str, object]:
"layer_name": str(name),
}
)
_fixed_office_cache = {
parsed = {
"html": html,
"data": data,
"slots": slots,
}
return _fixed_office_cache
_fixed_office_cache[office_key] = parsed
return parsed
def is_chair_layer(layer_name: str) -> bool:
@@ -1170,12 +1208,14 @@ def fetch_seat_layout(seat_map_id: int) -> dict[str, object]:
)
placements = cur.fetchall()
viewer_data: dict[str, object] | None = None
if seat_map["source_type"] == "fixed_html" and seat_map.get("source_url") == FIXED_OFFICE_SOURCE_KEY:
template = parse_fixed_office_template()
office_key = str(seat_map.get("source_url") or FIXED_OFFICE_SOURCE_KEY)
fixed_office = FIXED_OFFICE_CONFIGS.get(office_key)
if seat_map["source_type"] == "fixed_html" and fixed_office:
template = parse_fixed_office_template(office_key)
viewer_data = {
"meta": {
"chair_count": len(template["slots"]),
"office": FIXED_OFFICE_NAME,
"office": str(fixed_office["name"]),
}
}
elif seat_map["source_type"] == "dxf" and seat_map.get("source_url"):
@@ -1230,7 +1270,8 @@ def build_center_chair_viewer_html(layout: dict[str, object]) -> str:
placed_literal = json.dumps(sorted(set(placed_keys)), ensure_ascii=False, separators=(",", ":"))
assignments_literal = json.dumps(assignment_items, ensure_ascii=False, separators=(",", ":"))
if seat_map.get("source_type") == "fixed_html":
html = parse_fixed_office_template()["html"]
office_key = str(seat_map.get("source_url") or FIXED_OFFICE_SOURCE_KEY)
html = parse_fixed_office_template(office_key)["html"]
else:
viewer_data = layout.get("viewer_data")
if not isinstance(viewer_data, dict):
@@ -2806,8 +2847,35 @@ def fetch_project_metrics(limit: int = 500, start_date: str | None = None, end_d
with conn.cursor() as cur:
cur.execute(
"""
WITH work_by_project AS (
WITH project_base AS (
SELECT
CASE
WHEN COALESCE(project_code, '') <> '' THEN project_code
ELSE regexp_replace(
lower(COALESCE(NULLIF(project_name, ''), NULLIF(display_name, ''), NULLIF(intranet_name, ''), '')),
'[^0-9a-z가-힣]+',
'',
'g'
)
END AS project_key,
project_code,
project_name,
display_name,
business_area,
business_subarea
FROM integration_projects
),
work_by_project AS (
SELECT
CASE
WHEN COALESCE(project_code, '') <> '' THEN project_code
ELSE regexp_replace(
lower(COALESCE(NULLIF(project_name, ''), '')),
'[^0-9a-z가-힣]+',
'',
'g'
)
END AS project_key,
COALESCE(project_code, '') AS project_code,
COALESCE(NULLIF(project_name, ''), COALESCE(project_code, '')) AS project_name,
SUM(hours) AS total_hours,
@@ -2817,10 +2885,19 @@ def fetch_project_metrics(limit: int = 500, start_date: str | None = None, end_d
JOIN integration_work_logs ON integration_work_logs.id = integration_work_log_segments.work_log_id
WHERE (%s::date IS NULL OR integration_work_logs.work_date >= %s::date)
AND (%s::date IS NULL OR integration_work_logs.work_date <= %s::date)
GROUP BY 1, 2
GROUP BY 1, 2, 3
),
voucher_by_project AS (
SELECT
CASE
WHEN COALESCE(project_code, '') <> '' THEN project_code
ELSE regexp_replace(
lower(COALESCE(NULLIF(project_name, ''), '')),
'[^0-9a-z가-힣]+',
'',
'g'
)
END AS project_key,
COALESCE(project_code, '') AS project_code,
COALESCE(NULLIF(project_name, ''), COALESCE(project_code, '')) AS project_name,
SUM(income_amount) AS total_income,
@@ -2829,7 +2906,8 @@ def fetch_project_metrics(limit: int = 500, start_date: str | None = None, end_d
FROM integration_vouchers
WHERE (%s::date IS NULL OR COALESCE(issue_date, claim_date) >= %s::date)
AND (%s::date IS NULL OR COALESCE(issue_date, claim_date) <= %s::date)
GROUP BY 1, 2
AND COALESCE(voucher_type, '') <> '제외'
GROUP BY 1, 2, 3
)
SELECT
COALESCE(p.project_code, w.project_code, v.project_code) AS project_code,
@@ -2844,9 +2922,9 @@ def fetch_project_metrics(limit: int = 500, start_date: str | None = None, end_d
COALESCE(w.overtime_hours, 0) AS overtime_hours,
COALESCE(v.voucher_count, 0) AS voucher_count,
COALESCE(w.work_log_count, 0) AS work_log_count
FROM integration_projects p
FULL OUTER JOIN work_by_project w ON w.project_code = p.project_code
FULL OUTER JOIN voucher_by_project v ON v.project_code = COALESCE(p.project_code, w.project_code)
FROM project_base p
FULL OUTER JOIN work_by_project w ON w.project_key = p.project_key
FULL OUTER JOIN voucher_by_project v ON v.project_key = COALESCE(p.project_key, w.project_key)
ORDER BY project_code ASC
LIMIT %s
""",
@@ -2986,6 +3064,19 @@ def payment_analysis_parse_number(value: object) -> float:
return 0.0
def round_half_up_to_int(value: float | Decimal) -> int:
return int(Decimal(str(value)).quantize(Decimal("1"), rounding=ROUND_HALF_UP))
def round_half_up_to_2(value: float | Decimal) -> float:
return float(Decimal(str(value)).quantize(Decimal("0.01"), rounding=ROUND_HALF_UP))
def calculate_labor_cost(hours: float | Decimal, rate: int | float | Decimal, multiplier: float | Decimal) -> int:
amount = Decimal(str(hours)) * Decimal(str(rate)) * Decimal(str(multiplier))
return round_half_up_to_int(amount)
def build_payment_work_rows_from_raw_mh(
raw_rows: list[list[object]],
category_by_project_key: dict[str, dict[str, str]],
@@ -3057,6 +3148,7 @@ def build_payment_work_rows_from_raw_mh(
position = clean_text(payment_analysis_get_value(row, ["직책", "직급"]))
user_state = clean_text(payment_analysis_get_value(row, ["user_state", "User State", "user state", "userstate", "User_State"]))
weekend_flag = clean_text(payment_analysis_get_value(row, ["주말/지각"]))
is_weekend = "주말" in user_state or "주말" in weekend_flag
member_name = clean_text(payment_analysis_get_value(row, ["이름"]))
work_date = clean_text(payment_analysis_get_value(row, ["근무일자", "날짜", "일자"]))
imported_labor = payment_analysis_parse_number(payment_analysis_get_value(row, ["산정금액", "인건비"]))
@@ -3073,7 +3165,7 @@ def build_payment_work_rows_from_raw_mh(
weighted = []
total_weight = 0.0
for idx, segment in enumerate(segments):
multiplier = 1.5 if ("주말" in user_state or segment["overtime"]) else 1.0
multiplier = 1.5 if (is_weekend or segment["overtime"]) else 1.0
weight = segment["hours"] * multiplier
weighted.append((idx, weight))
total_weight += weight
@@ -3108,8 +3200,8 @@ def build_payment_work_rows_from_raw_mh(
if allocations:
labor = int(allocations[idx] or 0)
else:
multiplier = 1.5 if ("주말" in user_state or segment["overtime"]) else 1.0
labor = round(hours * rate * multiplier)
multiplier = 1.5 if (is_weekend or segment["overtime"]) else 1.0
labor = calculate_labor_cost(hours, rate, multiplier)
parsed_row = {
"__values": [
work_date,
@@ -3245,8 +3337,8 @@ def fetch_payment_source_rows() -> dict[str, object]:
position = clean_text(row["title"])
raw_hours = float(row["hours"] or 0)
adjusted_overtime_hours = float(row["overtime_hours_adjusted"] or 0)
hours = adjusted_overtime_hours if bool(row["is_overtime"]) and adjusted_overtime_hours > 0 else raw_hours
hours = round(hours, 2)
hours = adjusted_overtime_hours if bool(row["is_overtime"]) else raw_hours
hours = round_half_up_to_2(hours)
rate = 28900
if "이사" in position or "수석" in position:
rate = 46600
@@ -3254,7 +3346,11 @@ def fetch_payment_source_rows() -> dict[str, object]:
rate = 40500
elif "선임" in position:
rate = 35300
labor = round(hours * rate * (1.5 if bool(row["is_overtime"]) or "주말" in clean_text(row["weekend_late_flag"]) else 1))
labor = calculate_labor_cost(
hours,
rate,
1.5 if bool(row["is_overtime"]) or "주말" in clean_text(row["weekend_late_flag"]) else 1,
)
parsed_row = {
"__values": [
clean_text(row["work_date"]),
@@ -3785,8 +3881,9 @@ async def create_dxf_seat_map(file: UploadFile = File(...), name: str = Form(...
@app.get("/api/seat-maps/active")
def get_active_seat_map() -> dict[str, dict[str, object]]:
seat_map = ensure_fixed_office_seat_map(activate=True)
def get_active_seat_map(office_key: str | None = None) -> dict[str, dict[str, object]]:
requested_key = (office_key or "").strip() or FIXED_OFFICE_SOURCE_KEY
seat_map = ensure_fixed_office_seat_map(requested_key, activate=requested_key == FIXED_OFFICE_SOURCE_KEY)
if seat_map is None:
raise HTTPException(status_code=404, detail="Active seat map not found.")
return {"item": seat_map}