#!/usr/bin/env python3 import json import os import sqlite3 import socket import traceback import re import csv import requests from datetime import datetime, timedelta, time from http.server import BaseHTTPRequestHandler, ThreadingHTTPServer from urllib.parse import parse_qs, urlparse BASE_DIR = os.path.dirname(os.path.abspath(__file__)) DB_PATH = os.path.join(BASE_DIR, 'matching.db') INDEX_HTML = os.path.join(BASE_DIR, 'index.html') DETAIL_HTML = os.path.join(BASE_DIR, 'detail-view.html') DETAIL_PROJECT_HTML = os.path.join(BASE_DIR, 'detail-view-project.html') MYSQL_PREVIEW_HTML = os.path.join(BASE_DIR, 'mysql-preview.html') SALARY_AVG_CSV_PATH = os.path.join(os.path.dirname(BASE_DIR), 'erp_salary_avg_by_title_year_2026-05-27.csv') # MySQL direct source (user requested) MYSQL_HOST = os.environ.get('MYSQL_HOST', '172.16.42.111') MYSQL_USER = os.environ.get('MYSQL_USER', 'root') MYSQL_PASSWORD = os.environ.get('MYSQL_PASSWORD', 'hanmacerp!') MYSQL_DB = os.environ.get('MYSQL_DB', 'jangheon_manhour') MYSQL_PORT = int(os.environ.get('MYSQL_PORT', '3306')) ERP_BASE_URL = os.environ.get('ERP_BASE_URL', 'http://erp.jangheon.co.kr/projt_mng') ERP_LOGIN_ID = os.environ.get('ERP_LOGIN_ID', 'g25001') ERP_LOGIN_PW = os.environ.get('ERP_LOGIN_PW', '00000') INTRANET_BASE_URL = os.environ.get('INTRANET_BASE_URL', 'http://erp.jangheon.co.kr/intranet/') INTRANET_LOGIN_ID = os.environ.get('INTRANET_LOGIN_ID', 'G25001') INTRANET_LOGIN_PW = os.environ.get('INTRANET_LOGIN_PW', '00000') def init_db(conn): conn.executescript( ''' CREATE TABLE IF NOT EXISTS member ( MemberNo TEXT PRIMARY KEY, korName TEXT, rankCode TEXT DEFAULT '', rankName TEXT DEFAULT '', groupCode TEXT DEFAULT '', teamName TEXT DEFAULT '', retireFlag TEXT DEFAULT '', isRetired INTEGER DEFAULT 0 ); CREATE TABLE IF NOT EXISTS dallyproject ( id INTEGER PRIMARY KEY AUTOINCREMENT, MemberNo TEXT, EntryTime TEXT, EntryPCode TEXT, EntryJobCode TEXT, EntryJob TEXT, LeaveTime TEXT, LeavePCode TEXT, LeaveJobCode TEXT, LeaveJob TEXT, OverTime TEXT, ConnectIP TEXT, OverWorkIP TEXT, EndWorkIP TEXT, SortKey TEXT, Note TEXT, modify TEXT, ConfirmDate TEXT, WorkDate TEXT, TotalHours REAL DEFAULT 0, RegularHours REAL DEFAULT 0, OvertimeHours REAL DEFAULT 0, BusinessTripHours REAL DEFAULT 0 ); CREATE TABLE IF NOT EXISTS worker_tardy ( MemberNo TEXT, s_date TEXT, e_date TEXT, tardy_h INTEGER, tardy_m INTEGER ); CREATE TABLE IF NOT EXISTS project_alias ( projectCode TEXT PRIMARY KEY, shortName TEXT ); CREATE INDEX IF NOT EXISTS idx_dally_memberno ON dallyproject(MemberNo); CREATE INDEX IF NOT EXISTS idx_tardy_member ON worker_tardy(MemberNo); ''' ) # Backward-compatible migration for existing DB files. cols = {row[1] for row in conn.execute("PRAGMA table_info(dallyproject)").fetchall()} mcols = {row[1] for row in conn.execute("PRAGMA table_info(member)").fetchall()} if 'rankCode' not in mcols: conn.execute("ALTER TABLE member ADD COLUMN rankCode TEXT DEFAULT ''") if 'rankName' not in mcols: conn.execute("ALTER TABLE member ADD COLUMN rankName TEXT DEFAULT ''") if 'groupCode' not in mcols: conn.execute("ALTER TABLE member ADD COLUMN groupCode TEXT DEFAULT ''") if 'teamName' not in mcols: conn.execute("ALTER TABLE member ADD COLUMN teamName TEXT DEFAULT ''") if 'retireFlag' not in mcols: conn.execute("ALTER TABLE member ADD COLUMN retireFlag TEXT DEFAULT ''") if 'isRetired' not in mcols: conn.execute("ALTER TABLE member ADD COLUMN isRetired INTEGER DEFAULT 0") if 'WorkDate' not in cols: conn.execute("ALTER TABLE dallyproject ADD COLUMN WorkDate TEXT") if 'TotalHours' not in cols: conn.execute("ALTER TABLE dallyproject ADD COLUMN TotalHours REAL DEFAULT 0") if 'RegularHours' not in cols: conn.execute("ALTER TABLE dallyproject ADD COLUMN RegularHours REAL DEFAULT 0") if 'OvertimeHours' not in cols: conn.execute("ALTER TABLE dallyproject ADD COLUMN OvertimeHours REAL DEFAULT 0") if 'BusinessTripHours' not in cols: conn.execute("ALTER TABLE dallyproject ADD COLUMN BusinessTripHours REAL DEFAULT 0") conn.execute("CREATE INDEX IF NOT EXISTS idx_dally_workdate ON dallyproject(WorkDate)") conn.execute("CREATE INDEX IF NOT EXISTS idx_project_alias_code ON project_alias(projectCode)") conn.commit() def normalize_member_no(v): return _as_text(v).strip().upper() def _norm_key(k): return re.sub(r'[^a-z0-9]+', '', str(k or '').lower()) def _as_text(v): if v is None: return '' if isinstance(v, bytes): for enc in ('utf-8', 'cp949', 'euc-kr', 'latin1'): try: return v.decode(enc, errors='ignore') except Exception: continue return '' return str(v) def _find_col_key(cols, aliases): if not cols: return '' alias_set = {_norm_key(a) for a in aliases} for k in cols.keys(): if _norm_key(k) in alias_set: return k return '' def _norm_code_token(v): s = _as_text(v).strip() if not s: return '' if re.fullmatch(r'\d+', s): return str(int(s)) return s.upper() def insert_member_rows(conn, rows): conn.execute('DELETE FROM member') conn.executemany( 'INSERT OR REPLACE INTO member (MemberNo, korName, rankCode, rankName, groupCode, teamName, retireFlag, isRetired) VALUES (?, ?, ?, ?, ?, ?, ?, ?)', rows, ) def parse_retired_flag(raw_value, col_key=''): s = _as_text(raw_value).strip().lower() if not s: return 0 col_key = _norm_key(col_key) # LeaveDate 계열: 유효 날짜가 오늘 이전/같으면 퇴사로 간주 if col_key in ('leavedate', 'retiredate', 'quitdate', 'resigndate'): if s in ('0000-00-00', '0000-00-00 00:00:00'): return 0 m = re.match(r'^(\d{4})-(\d{2})-(\d{2})', s) if m: try: d = datetime(int(m.group(1)), int(m.group(2)), int(m.group(3))).date() return 1 if d <= datetime.now().date() else 0 except Exception: return 0 return 0 # useYn/workYn 계열은 N/0 이 퇴사 의미인 경우가 많음 if col_key in ('useyn', 'workyn', 'isuse', 'iswork'): if s in ('n', '0', 'false', 'inactive'): return 1 return 0 if s in ('재직', '근무', '정상', 'active'): return 0 retired_tokens = ('y', '1', 'true', 'retire', 'resign', '퇴사', '퇴직', 'inactive', 'leave', 'quit') return 1 if any(tok in s for tok in retired_tokens) else 0 def update_member_ranks(conn, rank_map): if not rank_map: return 0 rows = [(rank_map[k], k) for k in rank_map.keys()] conn.executemany('UPDATE member SET rankName = ? WHERE MemberNo = ?', rows) conn.commit() return len(rows) def update_member_ranks_by_rankcode(conn, code_to_name): if not code_to_name: return 0 cur = conn.execute('SELECT MemberNo, IFNULL(rankCode, "") FROM member') updates = [] for mno, rc in cur.fetchall(): key = _as_text(rc).strip() if not key: continue rn = _as_text(code_to_name.get(key) or code_to_name.get(_norm_code_token(key))).strip() if rn: updates.append((rn, mno)) if updates: conn.executemany('UPDATE member SET rankName = ? WHERE MemberNo = ?', updates) conn.commit() return len(updates) def insert_daily_rows(conn, rows): conn.executemany( ''' INSERT INTO dallyproject ( MemberNo, EntryTime, EntryPCode, EntryJobCode, EntryJob, LeaveTime, LeavePCode, LeaveJobCode, LeaveJob, OverTime, ConnectIP, OverWorkIP, EndWorkIP, SortKey, Note, modify, ConfirmDate, WorkDate, TotalHours, RegularHours, OvertimeHours, BusinessTripHours ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) ''', rows, ) def parse_dt(value): s = _as_text(value).strip() if not s: return None fmts = ('%Y-%m-%d %H:%M:%S', '%Y-%m-%d %H:%M') for f in fmts: try: return datetime.strptime(s, f) except Exception: pass return None def parse_d(value): s = _as_text(value).strip() if not s: return None for f in ('%Y-%m-%d', '%Y/%m/%d'): try: return datetime.strptime(s, f).date() except Exception: pass return None def insert_worker_tardy(conn, rows): conn.execute('DELETE FROM worker_tardy') if rows: conn.executemany( 'INSERT INTO worker_tardy (MemberNo, s_date, e_date, tardy_h, tardy_m) VALUES (?, ?, ?, ?, ?)', rows ) def build_tardy_map(conn): cur = conn.execute('SELECT MemberNo, s_date, e_date, tardy_h, tardy_m FROM worker_tardy') out = {} for mno, sd, ed, th, tm in cur.fetchall(): sdd = parse_d(sd) edd = parse_d(ed) if not sdd or not edd: continue out.setdefault(mno, []).append((sdd, edd, int(th or 9), int(tm or 0))) for k in out: out[k].sort(key=lambda x: (x[0], x[1])) return out def tardy_start_for_day(tardy_map, member_no, day): rules = tardy_map.get(member_no, []) for sd, ed, th, tm in rules: if sd <= day <= ed: return th, tm return 9, 0 def overlap_hours(a_start, a_end, b_start, b_end): s = max(a_start, b_start) e = min(a_end, b_end) if e <= s: return 0.0 return (e - s).total_seconds() / 3600.0 def _is_business_trip_text(*texts): for t in texts: s = _as_text(t).strip() if '출장' in s: return True return False def compute_hours( entry_time, leave_time, overtime_time, member_no, tardy_map, entry_job='', leave_job='', entry_job_code='', leave_job_code='' ): ent = parse_dt(entry_time) lea = parse_dt(leave_time) is_trip = _is_business_trip_text(entry_job, leave_job, entry_job_code, leave_job_code) if not ent: return '', 0.0, 0.0, 0.0, 0.0 # 출장: LeaveTime 없어도 8시간 인정(별도 컬럼 분리) if not lea and is_trip: day = ent.date() return day.isoformat(), 8.0, 0.0, 0.0, 8.0 if not lea: return '', 0.0, 0.0, 0.0, 0.0 if lea < ent: lea = lea + timedelta(days=1) base_hours = max(0.0, (lea - ent).total_seconds() / 3600.0) ot_raw = (overtime_time or '').strip() extra_hours = 0.0 day = ent.date() th, tm = tardy_start_for_day(tardy_map, member_no, day) regular_start = datetime.combine(day, time(th, tm)) regular_end = regular_start + timedelta(hours=8) # If OverTime is empty/zero datetime, treat as no overtime. if ot_raw and not ot_raw.startswith('0000-00-00'): ot_dt = parse_dt(ot_raw) if ot_dt: # Case 1: explicit overtime end time if ot_dt > lea: extra_hours = max(0.0, (ot_dt - lea).total_seconds() / 3600.0) # Case 2: legacy/dirty rows where OverTime is equal to or earlier than leave # In this case infer overtime from actual leave against regular end. elif ot_dt <= lea: extra_hours = max(0.0, (lea - regular_end).total_seconds() / 3600.0) # Regular work policy: # - Weekday only # - Deduct lunch overlap (12:00~13:00) # - Convert to whole hours only (no 0.5 etc., floor) # - Cap at 8 hours/day lunch_start = datetime.combine(day, time(12, 0)) lunch_end = datetime.combine(day, time(13, 0)) lunch_overlap = overlap_hours(ent, lea, lunch_start, lunch_end) regular_raw = max(0.0, base_hours - lunch_overlap) is_weekend = day.weekday() >= 5 # 5: Sat, 6: Sun regular = 0.0 if is_weekend else float(min(8, int(regular_raw))) # Overtime is counted only from OverTime field difference (OverTime - LeaveTime). # Overtime policy by date (approved-unit mode): # - Weekday: if >= 2h, 인정 3h (otherwise 0h) # - Weekend/holiday: use worked hours, if >= 3h then cap at 5h (otherwise 0h) if is_weekend: weekend_hours = max(0.0, regular_raw) overtime = 0.0 if weekend_hours < 3.0 else min(float(int(weekend_hours)), 5.0) else: overtime = 3.0 if extra_hours >= 2.0 else 0.0 business_trip = 0.0 # 출장 행은 근무시간(regular)을 출장시간으로 분리 표기 # 총시간(total)은 유지되도록 regular -> business_trip 이동. if is_trip: business_trip = regular regular = 0.0 total = regular + overtime + business_trip return day.isoformat(), round(total, 2), round(regular, 2), round(overtime, 2), round(business_trip, 2) def backfill_daily_hours_if_needed(conn): # Recalculate all workable rows to keep rules consistent across code updates. need = conn.execute( ''' SELECT COUNT(*) FROM dallyproject WHERE IFNULL(EntryTime, '') <> '' AND IFNULL(LeaveTime, '') <> '' ''' ).fetchone()[0] if need == 0: return 0 tardy_map = build_tardy_map(conn) cur = conn.execute('SELECT id, MemberNo, EntryTime, LeaveTime, OverTime FROM dallyproject') updates = [] cur = conn.execute('SELECT id, MemberNo, EntryTime, LeaveTime, OverTime, EntryJob, LeaveJob, EntryJobCode, LeaveJobCode FROM dallyproject') updates = [] for rid, mno, ent, lea, ot, entry_job, leave_job, entry_job_code, leave_job_code in cur.fetchall(): work_date, total_h, regular_h, overtime_h, business_trip_h = compute_hours( ent, lea, ot, mno or '', tardy_map, entry_job or '', leave_job or '', entry_job_code or '', leave_job_code or '' ) updates.append((work_date, total_h, regular_h, overtime_h, business_trip_h, rid)) if len(updates) >= 5000: conn.executemany( 'UPDATE dallyproject SET WorkDate=?, TotalHours=?, RegularHours=?, OvertimeHours=?, BusinessTripHours=? WHERE id=?', updates ) updates = [] if updates: conn.executemany( 'UPDATE dallyproject SET WorkDate=?, TotalHours=?, RegularHours=?, OvertimeHours=?, BusinessTripHours=? WHERE id=?', updates ) conn.commit() return need def load_project_alias_from_erp(): login_url = ERP_BASE_URL.rstrip('/') + '/sys/controller/common/main_controller.php' s = requests.Session() login_payload = { 'ActionMode': 'LoginCheck', 'memberID': ERP_LOGIN_ID, 'LoginID': ERP_LOGIN_ID, 'passwd': ERP_LOGIN_PW, 'CheckSave': '', 'login': '1', } r = s.post(login_url, data=login_payload, timeout=20) if r.text.strip().lower() != 'success': raise RuntimeError(f'ERP login failed: {r.text[:120]}') pages = ['sales', 'design', 'const', 'make', 'research'] out = {} for page in pages: payload = { 'ActionMode': 'getMain', 'SubAction': 'getProjectList', 'page': page, 'searchText': '', } rr = s.post(login_url + '?', data=payload, timeout=20) body = rr.text.strip() if not body: continue data = json.loads(body) for item in data.get('list_data', []) or []: code = _as_text(item.get('ProjectCode')).strip().upper() nick = _as_text(item.get('ProjectNickname')).strip() if code and nick: out[code] = nick return sorted(out.items()) def _strip_tags(s): s = re.sub(r'<[^>]+>', '', s or '') s = s.replace(' ', ' ').replace(' ', ' ') return s.strip() def load_member_ranks_from_intranet(): login_url = INTRANET_BASE_URL.rstrip('/') + '/sys/popup/login_ok.php' search_url = INTRANET_BASE_URL.rstrip('/') + '/sys/controller/codeSearch_controller.php?ActionMode=searchMember' s = requests.Session() # session init s.get(INTRANET_BASE_URL, timeout=20) payload = { 'user_id': INTRANET_LOGIN_ID, 'user_pw': INTRANET_LOGIN_PW, 'user_ip': '', 'user_contact_area': 'OUT', 'checksaveid': '', 'is_ajax': 1, } r = s.post(login_url, data=payload, timeout=20) if str(r.text).strip() != '1': raise RuntimeError(f'Intranet login failed: {r.text[:120]}') rr = s.post( search_url, data={'ajax_insertStr': '', 'ajax_returnId01': 'a', 'ajax_returnId02': 'b'}, timeout=20 ) html = rr.content.decode('utf-8', errors='ignore') rank_map = {} for tr in re.findall(r']*>(.*?)', html, re.I | re.S): tds = re.findall(r']*>(.*?)', tr, re.I | re.S) if len(tds) < 5: continue member_no = normalize_member_no(_strip_tags(tds[3])) rank_name = _strip_tags(tds[4]) if member_no and rank_name: rank_map[member_no] = rank_name return rank_map def replace_project_alias(conn, rows): conn.execute('DELETE FROM project_alias') if rows: conn.executemany( 'INSERT OR REPLACE INTO project_alias (projectCode, shortName) VALUES (?, ?)', rows ) conn.commit() def get_project_alias_map(conn): cur = conn.execute('SELECT projectCode, shortName FROM project_alias') return {code: short for code, short in cur.fetchall() if code} def _mysql_connect(): try: import pymysql except Exception as e: raise RuntimeError('pymysql 모듈이 필요합니다. `pip install pymysql` 실행 후 다시 시도하세요.') from e last_err = None for cs in ('utf8mb4', 'utf8'): try: return pymysql.connect( host=MYSQL_HOST, user=MYSQL_USER, password=MYSQL_PASSWORD, database=MYSQL_DB, port=MYSQL_PORT, charset=cs, cursorclass=pymysql.cursors.DictCursor, autocommit=True, ) except Exception as e: last_err = e raise last_err def fetch_mysql_tables(): mysql_conn = _mysql_connect() try: with mysql_conn.cursor() as cur: cur.execute("SHOW TABLES") rows = cur.fetchall() or [] out = [] for row in rows: if isinstance(row, dict): out.extend([_as_text(v).strip() for v in row.values() if _as_text(v).strip()]) else: out.extend([_as_text(v).strip() for v in row if _as_text(v).strip()]) return sorted(set(out)) finally: try: mysql_conn.close() except Exception: pass def fetch_mysql_table_preview(table_name, limit=100): tn = _as_text(table_name).strip() if not re.fullmatch(r'[A-Za-z0-9_]+', tn): raise ValueError('table 파라미터가 올바르지 않습니다.') row_limit = max(1, min(int(limit or 100), 500)) mysql_conn = _mysql_connect() try: with mysql_conn.cursor() as cur: cur.execute(f"SELECT * FROM `{tn}` LIMIT {row_limit}") rows = cur.fetchall() or [] return rows finally: try: mysql_conn.close() except Exception: pass def _quote_ident(name): return '`' + str(name).replace('`', '``') + '`' def discover_mysql_tables(mysql_conn): with mysql_conn.cursor() as cur: cur.execute( ''' SELECT TABLE_NAME, COLUMN_NAME FROM INFORMATION_SCHEMA.COLUMNS WHERE TABLE_SCHEMA = %s ''', (MYSQL_DB,), ) rows = cur.fetchall() table_cols = {} for r in rows: t = r['TABLE_NAME'] c = r['COLUMN_NAME'] table_cols.setdefault(t, {})[c.lower()] = c member_table = None daily_tables = [] for t, cols in table_cols.items(): if 'memberno' in cols and 'korname' in cols and member_table is None: member_table = t for t, cols in table_cols.items(): if 'memberno' in cols and 'entrytime' in cols and 'entrypcode' in cols: daily_tables.append(t) # avoid loading member table as daily table if member_table in daily_tables: daily_tables = [t for t in daily_tables if t != member_table] tardy_tables = [] for t, cols in table_cols.items(): if 'memberno' in cols and 's_date' in cols and 'e_date' in cols and 'tardy_h' in cols and 'tardy_m' in cols: tardy_tables.append(t) return member_table, sorted(daily_tables), sorted(tardy_tables), table_cols def load_rank_map_from_systemconfig(mysql_conn, table_cols): # rankCode는 systemconfig의 PositionCode 기준으로 매핑 mapped = load_syskey_code_name_map(mysql_conn, table_cols, 'PositionCode') if mapped: return mapped # fallback return load_syskey_code_name_map(mysql_conn, table_cols, 'PositionRank') def load_syskey_code_name_map(mysql_conn, table_cols, target_syskey): target = str(target_syskey or '').strip().lower() if not target: return {} candidates = [] for t, cols in table_cols.items(): keys = {k.lower(): v for k, v in cols.items()} syskey_k = _find_col_key(keys, ('syskey', 'sys_key')) code_k = _find_col_key(keys, ('code',)) name_k = _find_col_key(keys, ('name', 'codename', 'rankname', 'value')) if syskey_k and code_k and name_k: score = 10 + (40 if 'systemconfig' in t.lower() else 0) candidates.append((score, t, keys[syskey_k], keys[code_k], keys[name_k])) if not candidates: return {} candidates.sort(reverse=True) _, table_name, syskey_col, code_col, name_col = candidates[0] q = f''' SELECT {_quote_ident(code_col)} AS code, {_quote_ident(name_col)} AS name FROM {_quote_ident(table_name)} WHERE LOWER(TRIM({_quote_ident(syskey_col)})) = %s ''' out = {} with mysql_conn.cursor() as cur: cur.execute(q, (target,)) for r in cur.fetchall(): code = _as_text(r.get('code')).strip() name = _as_text(r.get('name')).strip() if code: out[code] = name nk = _norm_code_token(code) if nk and nk not in out: out[nk] = name return out def load_from_mysql_into_sqlite(sqlite_conn): mysql_conn = _mysql_connect() try: member_table, daily_tables, tardy_tables, table_cols = discover_mysql_tables(mysql_conn) if not member_table: raise RuntimeError('MySQL에서 member 테이블(컬럼: MemberNo, korName)을 찾지 못했습니다.') if not daily_tables: raise RuntimeError('MySQL에서 일지 테이블(컬럼: MemberNo, EntryTime, EntryPCode)을 찾지 못했습니다.') # load member mcols = table_cols[member_table] m_memberno = _quote_ident(mcols['memberno']) m_korname = _quote_ident(mcols['korname']) rank_col_key = _find_col_key(mcols, ('rankcode', 'rank_code', 'rkcode', 'jobgrade', 'gradecode')) m_rankcode = _quote_ident(mcols[rank_col_key]) if rank_col_key else "''" group_col_key = _find_col_key(mcols, ('groupcode', 'group_code', 'teamcode', 'team_code', 'groupid', 'group_id')) m_groupcode = _quote_ident(mcols[group_col_key]) if group_col_key else "''" workpos_col_key = _find_col_key(mcols, ('workposition', 'work_position', 'workstate', 'work_state', 'statecode', 'state_code')) m_workpos = _quote_ident(mcols[workpos_col_key]) if workpos_col_key else "''" retire_col_key = _find_col_key( mcols, ( 'isretired', 'retired', 'retireyn', 'retire_yn', 'quityn', 'quit_yn', 'resignyn', 'resign_yn', 'outyn', 'out_yn', 'useyn', 'use_yn', 'workyn', 'work_yn', 'status', 'state', 'empstate', 'emp_state', 'retireflag', 'retire_flag', 'quitdate', 'leave_date', 'leavedate', 'retiredate' ), ) m_retire = _quote_ident(mcols[retire_col_key]) if retire_col_key else "''" mt = _quote_ident(member_table) workpos_map = load_syskey_code_name_map(mysql_conn, table_cols, 'WorkPositionCode') group_map = load_syskey_code_name_map(mysql_conn, table_cols, 'GroupCode') group_map_del = load_syskey_code_name_map(mysql_conn, table_cols, 'GroupCode_del') if group_map_del: for k, v in group_map_del.items(): if k not in group_map: group_map[k] = v member_rows = [] with mysql_conn.cursor() as cur: cur.execute( f"SELECT {m_memberno} AS MemberNo, {m_korname} AS korName, {m_rankcode} AS rankCode, {m_groupcode} AS groupCode, {m_workpos} AS workPosition, {m_retire} AS retireFlag FROM {mt}" ) for r in cur.fetchall(): mno = normalize_member_no(r.get('MemberNo')) if not mno: continue retire_raw = _as_text(r.get('retireFlag')).strip() workpos_code = _as_text(r.get('workPosition')).strip() workpos_name = _as_text(workpos_map.get(workpos_code) or workpos_map.get(_norm_code_token(workpos_code))).strip() group_code = _as_text(r.get('groupCode')).strip() team_name = _as_text(group_map.get(group_code) or group_map.get(_norm_code_token(group_code))).strip() is_ret = 0 is_ret = max(is_ret, parse_retired_flag(retire_raw, retire_col_key)) if workpos_code == '9' or ('퇴사' in workpos_name): is_ret = 1 # retireFlag에 원본 퇴사값이 없으면 WorkPosition 기반 상태명을 사용 retire_flag = retire_raw or workpos_name or workpos_code member_rows.append( ( mno, _as_text(r.get('korName')).strip(), _as_text(r.get('rankCode')).strip(), '', group_code, team_name, retire_flag, is_ret, ) ) insert_member_rows(sqlite_conn, member_rows) rank_map = load_rank_map_from_systemconfig(mysql_conn, table_cols) rank_loaded = update_member_ranks_by_rankcode(sqlite_conn, rank_map) # load tardy rules tardy_rows = [] for t in tardy_tables: cols = table_cols[t] q = f''' SELECT {_quote_ident(cols['memberno'])} AS memberno, {_quote_ident(cols['s_date'])} AS s_date, {_quote_ident(cols['e_date'])} AS e_date, {_quote_ident(cols['tardy_h'])} AS tardy_h, {_quote_ident(cols['tardy_m'])} AS tardy_m FROM {_quote_ident(t)} ''' with mysql_conn.cursor() as cur: cur.execute(q) for r in cur.fetchall(): mno = normalize_member_no(r.get('memberno')) sd = (r.get('s_date') or '').__str__()[:10] ed = (r.get('e_date') or '').__str__()[:10] try: th = int(float(r.get('tardy_h') or 9)) tm = int(float(r.get('tardy_m') or 0)) except Exception: th, tm = 9, 0 if mno and sd and ed: tardy_rows.append((mno, sd, ed, th, tm)) insert_worker_tardy(sqlite_conn, tardy_rows) tardy_map = build_tardy_map(sqlite_conn) # load daily tables with dedupe sqlite_conn.execute('DELETE FROM dallyproject') seen_rows = set() loaded_files = [] total_rows = 0 total_dups = 0 aliases = [ 'MemberNo', 'EntryTime', 'EntryPCode', 'EntryJobCode', 'EntryJob', 'LeaveTime', 'LeavePCode', 'LeaveJobCode', 'LeaveJob', 'OverTime', 'ConnectIP', 'OverWorkIP', 'EndWorkIP', 'SortKey', 'Note', 'modify', 'ConfirmDate' ] for t in daily_tables: cols = table_cols[t] select_parts = [] for a in aliases: key = a.lower() if key in cols: select_parts.append(f"COALESCE({_quote_ident(cols[key])}, '') AS {a}") else: select_parts.append(f"'' AS {a}") query = f"SELECT {', '.join(select_parts)} FROM {_quote_ident(t)}" inserted = 0 dups = 0 batch = [] with mysql_conn.cursor() as cur: cur.execute(query) while True: chunk = cur.fetchmany(5000) if not chunk: break for r in chunk: row = ( normalize_member_no(r.get('MemberNo')), _as_text(r.get('EntryTime')).strip(), _as_text(r.get('EntryPCode')).strip(), _as_text(r.get('EntryJobCode')).strip(), _as_text(r.get('EntryJob')).strip(), _as_text(r.get('LeaveTime')).strip(), _as_text(r.get('LeavePCode')).strip(), _as_text(r.get('LeaveJobCode')).strip(), _as_text(r.get('LeaveJob')).strip(), _as_text(r.get('OverTime')).strip(), _as_text(r.get('ConnectIP')).strip(), _as_text(r.get('OverWorkIP')).strip(), _as_text(r.get('EndWorkIP')).strip(), _as_text(r.get('SortKey')).strip(), _as_text(r.get('Note')).strip(), _as_text(r.get('modify')).strip(), _as_text(r.get('ConfirmDate')).strip(), ) if row in seen_rows: dups += 1 continue seen_rows.add(row) work_date, total_h, regular_h, overtime_h, business_trip_h = compute_hours( row[1], row[5], row[9], row[0], tardy_map, row[4], row[8], row[3], row[7] ) batch.append(row + (work_date, total_h, regular_h, overtime_h, business_trip_h)) if batch: insert_daily_rows(sqlite_conn, batch) inserted += len(batch) batch = [] loaded_files.append({'file': t, 'rows': inserted, 'duplicatesSkipped': dups}) total_rows += inserted total_dups += dups sqlite_conn.commit() return { 'source': 'mysql', 'member_loaded': len(member_rows), 'member_rank_source': 'mysql_systemconfig' if rank_map else 'mysql_systemconfig_not_found', 'member_rank_loaded': rank_loaded, 'dally_loaded': total_rows, 'duplicates_skipped': total_dups, 'daily_files': loaded_files, } finally: try: mysql_conn.close() except Exception: pass def get_stats(conn): total = conn.execute('SELECT COUNT(*) FROM dallyproject').fetchone()[0] matched = conn.execute( ''' SELECT COUNT(*) FROM dallyproject d JOIN member m ON m.MemberNo = d.MemberNo WHERE IFNULL(m.korName, '') <> '' ''' ).fetchone()[0] return {'total': total, 'matched': matched, 'unmatched': total - matched} def _duration_sql(alias='d'): return f''' CASE WHEN {alias}.EntryTime IS NULL OR {alias}.LeaveTime IS NULL OR {alias}.EntryTime = '' OR {alias}.LeaveTime = '' THEN 0 ELSE ( ( strftime('%s', CASE WHEN length({alias}.LeaveTime) = 16 THEN {alias}.LeaveTime || ':00' ELSE {alias}.LeaveTime END) - strftime('%s', CASE WHEN length({alias}.EntryTime) = 16 THEN {alias}.EntryTime || ':00' ELSE {alias}.EntryTime END) + CASE WHEN strftime('%s', CASE WHEN length({alias}.LeaveTime) = 16 THEN {alias}.LeaveTime || ':00' ELSE {alias}.LeaveTime END) < strftime('%s', CASE WHEN length({alias}.EntryTime) = 16 THEN {alias}.EntryTime || ':00' ELSE {alias}.EntryTime END) THEN 86400 ELSE 0 END ) / 3600.0 ) END ''' def get_people_summary(conn, start_date, end_date): q = f''' SELECT m.MemberNo, IFNULL(m.korName, '') AS korName, IFNULL(m.rankName, '') AS rankName, IFNULL(m.teamName, '') AS teamName, IFNULL(m.retireFlag, '') AS retireFlag, IFNULL(m.isRetired, 0) AS isRetired, ROUND(IFNULL(SUM(CASE WHEN date(substr(d.EntryTime,1,10)) BETWEEN date(?) AND date(?) THEN d.TotalHours ELSE 0 END), 0), 2) AS totalHours, IFNULL(SUM(CASE WHEN date(substr(d.EntryTime,1,10)) BETWEEN date(?) AND date(?) THEN 1 ELSE 0 END), 0) AS totalRows FROM member m LEFT JOIN dallyproject d ON d.MemberNo = m.MemberNo GROUP BY m.MemberNo, IFNULL(m.korName, ''), IFNULL(m.rankName, ''), IFNULL(m.teamName, ''), IFNULL(m.retireFlag, ''), IFNULL(m.isRetired, 0) ORDER BY CASE WHEN IFNULL(m.korName,'')='' THEN 1 ELSE 0 END, korName ASC, m.MemberNo ASC ''' cur = conn.execute(q, (start_date, end_date, start_date, end_date)) cols = [d[0] for d in cur.description] return [dict(zip(cols, row)) for row in cur.fetchall()] def get_project_summary(conn, start_date, end_date): q = f''' SELECT d.EntryPCode AS projectCode, ROUND(SUM(d.TotalHours), 2) AS totalHours, ROUND(SUM(d.RegularHours + IFNULL(d.BusinessTripHours, 0)), 2) AS regularHours, ROUND(SUM(d.OvertimeHours), 2) AS overtimeHours, COUNT(*) AS totalRows, COUNT(DISTINCT d.MemberNo) AS peopleCount FROM dallyproject d WHERE date(substr(d.EntryTime, 1, 10)) >= date(?) AND date(substr(d.EntryTime, 1, 10)) <= date(?) AND IFNULL(d.EntryPCode, '') <> '' GROUP BY d.EntryPCode HAVING totalHours > 0 ORDER BY totalHours DESC, totalRows DESC ''' cur = conn.execute(q, (start_date, end_date)) cols = [d[0] for d in cur.description] return [dict(zip(cols, row)) for row in cur.fetchall()] def get_member_dashboard(conn, start_date, end_date, member_no): where = "date(substr(d.EntryTime,1,10)) >= date(?) AND date(substr(d.EntryTime,1,10)) <= date(?)" params = [start_date, end_date] if member_no: where += " AND d.MemberNo = ?" params.append(member_no) summary_q = f''' SELECT ROUND(SUM(d.TotalHours), 2) AS totalHours, ROUND(SUM(d.RegularHours + IFNULL(d.BusinessTripHours, 0)), 2) AS regularHours, ROUND(SUM(d.OvertimeHours), 2) AS overtimeHours, ROUND(SUM(d.BusinessTripHours), 2) AS businessTripHours, COUNT(*) AS totalRows, COUNT(DISTINCT CASE WHEN d.TotalHours > 0 THEN d.MemberNo END) AS peopleCount, COUNT(DISTINCT CASE WHEN d.TotalHours > 0 AND IFNULL(d.EntryPCode,'') <> '' THEN d.EntryPCode END) AS projectCount FROM dallyproject d WHERE {where} ''' total_hours, regular_hours, overtime_hours, business_trip_hours, total_rows, people_count, project_count = conn.execute(summary_q, params).fetchone() project_q = f''' SELECT d.EntryPCode AS projectCode, ROUND(SUM(d.TotalHours), 2) AS hours, ROUND(SUM(d.RegularHours + IFNULL(d.BusinessTripHours, 0)), 2) AS regularHours, ROUND(SUM(d.OvertimeHours), 2) AS overtimeHours, ROUND(SUM(d.BusinessTripHours), 2) AS businessTripHours, COUNT(*) AS rowsCount FROM dallyproject d WHERE {where} GROUP BY d.EntryPCode HAVING hours > 0 ORDER BY hours DESC ''' cur = conn.execute(project_q, params) cols = [d[0] for d in cur.description] projects = [dict(zip(cols, row)) for row in cur.fetchall()] yearly_q = f''' SELECT substr(d.EntryTime, 1, 7) AS yearMonth, substr(d.EntryTime, 1, 4) AS yearCode, d.EntryPCode AS projectCode, CASE WHEN instr(d.EntryPCode, '-') = 0 THEN '기타' ELSE CASE WHEN instr(substr(d.EntryPCode, instr(d.EntryPCode, '-') + 1), '-') = 0 THEN '기타' ELSE substr(d.EntryPCode, instr(d.EntryPCode, '-') + 1, instr(substr(d.EntryPCode, instr(d.EntryPCode, '-') + 1), '-') - 1) END END AS typeCode, ROUND(SUM(d.TotalHours), 2) AS hours, ROUND(SUM(d.RegularHours + IFNULL(d.BusinessTripHours, 0)), 2) AS regularHours, ROUND(SUM(d.OvertimeHours), 2) AS overtimeHours, ROUND(SUM(d.BusinessTripHours), 2) AS businessTripHours, COUNT(*) AS rowsCount FROM dallyproject d WHERE {where} GROUP BY yearMonth, yearCode, projectCode, typeCode HAVING hours > 0 ORDER BY yearMonth ASC, hours DESC ''' cur2 = conn.execute(yearly_q, params) cols2 = [d[0] for d in cur2.description] project_yearly = [dict(zip(cols2, row)) for row in cur2.fetchall()] member_name = '' member_rank = '' member_team = '' if member_no: r = conn.execute( 'SELECT IFNULL(korName, ""), IFNULL(rankName, ""), IFNULL(teamName, "") FROM member WHERE MemberNo = ?', (member_no,), ).fetchone() if r: member_name, member_rank, member_team = r[0], r[1], r[2] return { 'start': start_date, 'end': end_date, 'memberNo': member_no, 'korName': member_name, 'rankName': member_rank, 'teamName': member_team, 'totalHours': total_hours or 0, 'regularHours': regular_hours or 0, 'overtimeHours': overtime_hours or 0, 'businessTripHours': business_trip_hours or 0, 'totalRows': total_rows or 0, 'peopleCount': people_count or 0, 'projectCount': project_count or 0, 'projects': projects, 'projectYearly': project_yearly, } def get_project_dashboard(conn, start_date, end_date, project_code): where = "date(substr(d.EntryTime,1,10)) >= date(?) AND date(substr(d.EntryTime,1,10)) <= date(?)" params = [start_date, end_date] if project_code: where += " AND d.EntryPCode = ?" params.append(project_code) summary_q = f''' SELECT ROUND(SUM(d.TotalHours), 2) AS totalHours, ROUND(SUM(d.RegularHours + IFNULL(d.BusinessTripHours, 0)), 2) AS regularHours, ROUND(SUM(d.OvertimeHours), 2) AS overtimeHours, ROUND(SUM(d.BusinessTripHours), 2) AS businessTripHours, COUNT(*) AS totalRows, COUNT(DISTINCT CASE WHEN d.TotalHours > 0 THEN d.MemberNo END) AS peopleCount, COUNT(DISTINCT CASE WHEN d.TotalHours > 0 AND IFNULL(d.EntryPCode,'') <> '' THEN d.EntryPCode END) AS projectCount FROM dallyproject d WHERE {where} ''' total_hours, regular_hours, overtime_hours, business_trip_hours, total_rows, people_count, project_count = conn.execute(summary_q, params).fetchone() people_q = f''' SELECT d.MemberNo, IFNULL(m.korName, '') AS korName, IFNULL(m.rankName, '') AS rankName, IFNULL(m.teamName, '') AS teamName, ROUND(SUM(d.TotalHours), 2) AS hours, ROUND(SUM(d.RegularHours + IFNULL(d.BusinessTripHours, 0)), 2) AS regularHours, ROUND(SUM(d.OvertimeHours), 2) AS overtimeHours, ROUND(SUM(d.BusinessTripHours), 2) AS businessTripHours, COUNT(*) AS rowsCount FROM dallyproject d LEFT JOIN member m ON m.MemberNo = d.MemberNo WHERE {where} GROUP BY d.MemberNo, IFNULL(m.korName, ''), IFNULL(m.rankName, ''), IFNULL(m.teamName, '') HAVING hours > 0 ORDER BY hours DESC ''' cur = conn.execute(people_q, params) cols = [d[0] for d in cur.description] people = [dict(zip(cols, row)) for row in cur.fetchall()] people_year_q = f''' SELECT substr(d.EntryTime, 1, 4) AS yearCode, d.MemberNo, IFNULL(m.korName, '') AS korName, IFNULL(m.rankName, '') AS rankName, IFNULL(m.teamName, '') AS teamName, ROUND(SUM(d.TotalHours), 2) AS hours, ROUND(SUM(d.RegularHours + IFNULL(d.BusinessTripHours, 0)), 2) AS regularHours, ROUND(SUM(d.OvertimeHours), 2) AS overtimeHours, ROUND(SUM(d.BusinessTripHours), 2) AS businessTripHours, COUNT(*) AS rowsCount FROM dallyproject d LEFT JOIN member m ON m.MemberNo = d.MemberNo WHERE {where} GROUP BY yearCode, d.MemberNo, IFNULL(m.korName, ''), IFNULL(m.rankName, ''), IFNULL(m.teamName, '') HAVING hours > 0 ORDER BY yearCode ASC, hours DESC ''' cur2 = conn.execute(people_year_q, params) cols2 = [d[0] for d in cur2.description] people_yearly = [dict(zip(cols2, row)) for row in cur2.fetchall()] return { 'start': start_date, 'end': end_date, 'projectCode': project_code, 'totalHours': total_hours or 0, 'regularHours': regular_hours or 0, 'overtimeHours': overtime_hours or 0, 'businessTripHours': business_trip_hours or 0, 'totalRows': total_rows or 0, 'peopleCount': people_count or 0, 'projectCount': project_count or 0, 'people': people, 'peopleYearly': people_yearly, } def get_project_monthly_detail(conn, start_date, end_date, project_code): where = "date(substr(d.EntryTime,1,10)) >= date(?) AND date(substr(d.EntryTime,1,10)) <= date(?)" params = [start_date, end_date] if project_code: where += " AND d.EntryPCode = ?" params.append(project_code) q = f''' SELECT substr(d.EntryTime, 1, 7) AS yearMonth, substr(d.EntryTime, 1, 4) AS yearCode, d.MemberNo, IFNULL(m.korName, '') AS korName, IFNULL(m.rankName, '') AS rankName, IFNULL(m.teamName, '') AS teamName, ROUND(SUM(d.TotalHours), 2) AS hours, ROUND(SUM(d.RegularHours + IFNULL(d.BusinessTripHours, 0)), 2) AS regularHours, ROUND(SUM(d.OvertimeHours), 2) AS overtimeHours FROM dallyproject d LEFT JOIN member m ON m.MemberNo = d.MemberNo WHERE {where} GROUP BY yearMonth, yearCode, d.MemberNo, IFNULL(m.korName, ''), IFNULL(m.rankName, ''), IFNULL(m.teamName, '') HAVING hours > 0 ORDER BY yearMonth ASC, hours DESC ''' cur = conn.execute(q, params) cols = [d[0] for d in cur.description] return {'rows': [dict(zip(cols, row)) for row in cur.fetchall()]} def get_member_yearly_type_breakdown(conn, start_date, end_date, member_no): where = "date(substr(d.EntryTime,1,10)) >= date(?) AND date(substr(d.EntryTime,1,10)) <= date(?)" params = [start_date, end_date] if member_no: where += " AND d.MemberNo = ?" params.append(member_no) q = f''' SELECT substr(d.EntryPCode, 1, 2) AS yearCode, CASE WHEN instr(d.EntryPCode, '-') = 0 THEN '기타' ELSE CASE WHEN instr(substr(d.EntryPCode, instr(d.EntryPCode, '-') + 1), '-') = 0 THEN '기타' ELSE substr(d.EntryPCode, instr(d.EntryPCode, '-') + 1, instr(substr(d.EntryPCode, instr(d.EntryPCode, '-') + 1), '-') - 1) END END AS typeCode, ROUND(SUM(d.TotalHours), 2) AS hours, ROUND(SUM(d.RegularHours + IFNULL(d.BusinessTripHours, 0)), 2) AS regularHours, ROUND(SUM(d.OvertimeHours), 2) AS overtimeHours, COUNT(*) AS rowsCount FROM dallyproject d WHERE {where} GROUP BY yearCode, typeCode HAVING hours > 0 ORDER BY yearCode ASC, hours DESC ''' cur = conn.execute(q, params) rows = cur.fetchall() by_year = {} for year_code, type_code, hours, regular_hours, overtime_hours, rows_count in rows: y = year_code or '기타' by_year.setdefault(y, {'yearCode': y, 'totalHours': 0, 'types': []}) by_year[y]['totalHours'] += float(hours or 0) by_year[y]['types'].append({ 'typeCode': type_code or '기타', 'hours': hours or 0, 'regularHours': regular_hours or 0, 'overtimeHours': overtime_hours or 0, 'rowsCount': rows_count or 0 }) result = [] for y in sorted(by_year.keys()): item = by_year[y] total = item['totalHours'] for t in item['types']: t['sharePct'] = round((float(t['hours']) / total * 100), 1) if total > 0 else 0 item['types'].sort(key=lambda x: float(x['hours']), reverse=True) item['totalHours'] = round(total, 2) result.append(item) return {'years': result} def get_date_range(conn): q = ''' SELECT MIN(date(substr(EntryTime, 1, 10))) AS minDate, MAX(date(substr(EntryTime, 1, 10))) AS maxDate FROM dallyproject WHERE IFNULL(EntryTime, '') <> '' AND date(substr(EntryTime, 1, 10)) IS NOT NULL ''' min_date, max_date = conn.execute(q).fetchone() return {'minDate': min_date or '', 'maxDate': max_date or ''} def get_monthly_person_project(conn, start_date, end_date): q = ''' SELECT substr(d.EntryTime, 1, 7) AS yearMonth, d.MemberNo, IFNULL(m.korName, '') AS korName, IFNULL(m.rankName, '') AS rankName, IFNULL(m.teamName, '') AS teamName, d.EntryPCode AS projectCode, ROUND(SUM(d.TotalHours), 2) AS hours, ROUND(SUM(d.RegularHours + IFNULL(d.BusinessTripHours, 0)), 2) AS regularHours, ROUND(SUM(d.BusinessTripHours), 2) AS businessTripHours, ROUND(SUM(d.OvertimeHours), 2) AS overtimeHours FROM dallyproject d LEFT JOIN member m ON m.MemberNo = d.MemberNo WHERE date(substr(d.EntryTime, 1, 10)) >= date(?) AND date(substr(d.EntryTime, 1, 10)) <= date(?) AND IFNULL(d.MemberNo, '') <> '' AND IFNULL(d.EntryPCode, '') <> '' GROUP BY yearMonth, d.MemberNo, IFNULL(m.korName, ''), IFNULL(m.rankName, ''), IFNULL(m.teamName, ''), d.EntryPCode HAVING hours > 0 ORDER BY yearMonth ASC, d.MemberNo ASC, hours DESC ''' cur = conn.execute(q, (start_date, end_date)) cols = [d[0] for d in cur.description] return {'rows': [dict(zip(cols, row)) for row in cur.fetchall()]} def _norm_title(s): v = _as_text(s).strip() if not v: return '' v = re.sub(r'\s+', '', v) return v def load_salary_avg_by_title_year(): if not os.path.exists(SALARY_AVG_CSV_PATH): return {'rows': [], 'byTitle': {}} enc = 'cp949' rows = [] with open(SALARY_AVG_CSV_PATH, 'r', encoding=enc, newline='') as f: r = csv.DictReader(f) for x in r: try: year = int(_as_text(x.get('year')).strip()) title = _as_text(x.get('title')).strip() avg = int(float(_as_text(x.get('avg_monthly_salary')).strip() or '0')) sample = int(float(_as_text(x.get('sample_count')).strip() or '0')) except Exception: continue if not title or avg <= 0: continue rows.append({ 'year': year, 'title': title, 'titleNorm': _norm_title(title), 'avgMonthlySalary': avg, 'sampleCount': sample, }) # same (title, year) may appear multiple times; merge with weighted average by sample_count. by_title_year = {} for row in rows: k = (row['titleNorm'], row['year']) prev = by_title_year.get(k, {'title': row['title'], 'sumPaid': 0, 'sumSample': 0}) prev['sumPaid'] += int(row['avgMonthlySalary'] or 0) * int(max(1, row['sampleCount'] or 0)) prev['sumSample'] += int(max(1, row['sampleCount'] or 0)) by_title_year[k] = prev merged_rows = [] for (tnorm, year), v in by_title_year.items(): avg = int(round(v['sumPaid'] / max(1, v['sumSample']))) merged_rows.append({ 'year': year, 'title': v['title'], 'titleNorm': tnorm, 'avgMonthlySalary': avg, 'sampleCount': v['sumSample'], }) by_title = {} for row in merged_rows: by_title.setdefault(row['titleNorm'], []).append(row) for k in by_title.keys(): by_title[k].sort(key=lambda z: z['year']) out = {} for k, arr in by_title.items(): years = [a['year'] for a in arr] vals = [a['avgMonthlySalary'] for a in arr] min_y, max_y = min(years), max(years) min_v = vals[years.index(min_y)] max_v = vals[years.index(max_y)] growth = 0.03 if max_y > min_y and min_v > 0 and max_v > 0: try: growth = (max_v / min_v) ** (1.0 / (max_y - min_y)) - 1.0 except Exception: growth = 0.03 # 최근 5개 연도 값 기준, 최대/최소 제외 평균(직급 고정 월급) arr_desc = sorted(arr, key=lambda z: z['year'], reverse=True) last5_vals = [int(x['avgMonthlySalary']) for x in arr_desc[:5] if int(x['avgMonthlySalary'] or 0) > 0] trimmed_vals = last5_vals[:] if len(trimmed_vals) >= 3: mn = min(trimmed_vals) mx = max(trimmed_vals) mn_removed = False mx_removed = False kept = [] for v in trimmed_vals: if (not mn_removed) and v == mn: mn_removed = True continue if (not mx_removed) and v == mx: mx_removed = True continue kept.append(v) trimmed_vals = kept if kept else trimmed_vals fixed_salary = int(round(sum(trimmed_vals) / len(trimmed_vals))) if trimmed_vals else 0 out[k] = { 'title': arr[0]['title'], 'growthRate': growth, 'minYear': min_y, 'maxYear': max_y, 'yearToSalary': {str(a['year']): a['avgMonthlySalary'] for a in arr}, 'fixedSalary': fixed_salary, 'fixedBaseYears': [int(x['year']) for x in arr_desc[:5]], 'rows': arr, } return {'rows': rows, 'byTitle': out} class Handler(BaseHTTPRequestHandler): def _json(self, status, payload): body = json.dumps(payload, ensure_ascii=False).encode('utf-8') self.send_response(status) self.send_header('Content-Type', 'application/json; charset=utf-8') self.send_header('Content-Length', str(len(body))) self.end_headers() self.wfile.write(body) def _html(self, path): with open(path, 'rb') as f: body = f.read() self.send_response(200) self.send_header('Content-Type', 'text/html; charset=utf-8') self.send_header('Content-Length', str(len(body))) self.end_headers() self.wfile.write(body) def do_GET(self): parsed = urlparse(self.path) if parsed.path == '/': return self._html(INDEX_HTML) if parsed.path == '/detail-view.html': return self._html(DETAIL_HTML) if parsed.path == '/detail-view-project.html': return self._html(DETAIL_PROJECT_HTML) if parsed.path == '/mysql-preview.html': return self._html(MYSQL_PREVIEW_HTML) try: if parsed.path == '/api/mysql-tables': tables = fetch_mysql_tables() return self._json(200, {'tables': tables, 'count': len(tables)}) if parsed.path == '/api/mysql-table-preview': q = parse_qs(parsed.query) table_name = q.get('table', [''])[0] row_limit = q.get('limit', ['100'])[0] rows = fetch_mysql_table_preview(table_name, row_limit) return self._json(200, {'table': table_name, 'rows': rows, 'count': len(rows)}) if parsed.path == '/api/stats': with sqlite3.connect(DB_PATH) as conn: return self._json(200, get_stats(conn)) if parsed.path == '/api/people-summary': q = parse_qs(parsed.query) start_date = q.get('start', ['2016-01-01'])[0] end_date = q.get('end', ['2026-12-31'])[0] with sqlite3.connect(DB_PATH) as conn: return self._json(200, {'people': get_people_summary(conn, start_date, end_date)}) if parsed.path == '/api/project-summary': q = parse_qs(parsed.query) start_date = q.get('start', ['2016-01-01'])[0] end_date = q.get('end', ['2026-12-31'])[0] with sqlite3.connect(DB_PATH) as conn: return self._json(200, {'projects': get_project_summary(conn, start_date, end_date)}) if parsed.path == '/api/project-aliases': with sqlite3.connect(DB_PATH) as conn: return self._json(200, {'aliases': get_project_alias_map(conn)}) if parsed.path == '/api/member-dashboard': q = parse_qs(parsed.query) start_date = q.get('start', ['2016-01-01'])[0] end_date = q.get('end', ['2026-12-31'])[0] member_no = normalize_member_no(q.get('memberNo', [''])[0]) with sqlite3.connect(DB_PATH) as conn: return self._json(200, get_member_dashboard(conn, start_date, end_date, member_no)) if parsed.path == '/api/project-dashboard': q = parse_qs(parsed.query) start_date = q.get('start', ['2016-01-01'])[0] end_date = q.get('end', ['2026-12-31'])[0] project_code = (q.get('projectCode', [''])[0] or '').strip() with sqlite3.connect(DB_PATH) as conn: return self._json(200, get_project_dashboard(conn, start_date, end_date, project_code)) if parsed.path == '/api/project-monthly-detail': q = parse_qs(parsed.query) start_date = q.get('start', ['2016-01-01'])[0] end_date = q.get('end', ['2026-12-31'])[0] project_code = (q.get('projectCode', [''])[0] or '').strip() with sqlite3.connect(DB_PATH) as conn: return self._json(200, get_project_monthly_detail(conn, start_date, end_date, project_code)) if parsed.path == '/api/member-yearly-breakdown': q = parse_qs(parsed.query) start_date = q.get('start', ['2016-01-01'])[0] end_date = q.get('end', ['2026-12-31'])[0] member_no = normalize_member_no(q.get('memberNo', [''])[0]) with sqlite3.connect(DB_PATH) as conn: return self._json(200, get_member_yearly_type_breakdown(conn, start_date, end_date, member_no)) if parsed.path == '/api/date-range': with sqlite3.connect(DB_PATH) as conn: return self._json(200, get_date_range(conn)) if parsed.path == '/api/monthly-person-project': q = parse_qs(parsed.query) start_date = q.get('start', ['2016-01-01'])[0] end_date = q.get('end', ['2026-12-31'])[0] with sqlite3.connect(DB_PATH) as conn: return self._json(200, get_monthly_person_project(conn, start_date, end_date)) if parsed.path == '/api/salary-avg-by-title': return self._json(200, load_salary_avg_by_title_year()) self._json(404, {'error': 'Not found'}) except Exception as e: traceback.print_exc() self._json(500, {'ok': False, 'error': str(e)}) def do_POST(self): parsed = urlparse(self.path) if parsed.path == '/api/sync-ranks': try: with sqlite3.connect(DB_PATH) as conn: init_db(conn) rank_map = load_member_ranks_from_intranet() updated = update_member_ranks(conn, rank_map) return self._json(200, {'ok': True, 'member_rank_source': 'intranet', 'member_rank_loaded': updated}) except Exception as e: traceback.print_exc() return self._json(500, {'ok': False, 'error': str(e)}) if parsed.path != '/api/load': return self._json(404, {'error': 'Not found'}) try: with sqlite3.connect(DB_PATH) as conn: init_db(conn) used = load_from_mysql_into_sqlite(conn) try: alias_rows = load_project_alias_from_erp() used['project_alias_source'] = 'erp' replace_project_alias(conn, alias_rows) used['project_alias_loaded'] = len(alias_rows) except Exception as e: used['project_alias_source'] = 'erp_failed_keep_existing' used['project_alias_loaded'] = 0 used['project_alias_error'] = str(e) # member rank는 MySQL member.rankCode -> systemconfig 매핑을 우선 사용 # (used['member_rank_source'], used['member_rank_loaded']는 load_from_mysql_into_sqlite에서 설정) stats = get_stats(conn) payload = { 'ok': True, **used, 'stats': stats, } self._json(200, payload) except Exception as e: traceback.print_exc() self._json(500, {'ok': False, 'error': str(e)}) def local_ip(): try: s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) s.connect(('8.8.8.8', 80)) return s.getsockname()[0] except Exception: return '127.0.0.1' finally: try: s.close() except Exception: pass if __name__ == '__main__': os.makedirs(BASE_DIR, exist_ok=True) with sqlite3.connect(DB_PATH) as conn: init_db(conn) try: alias_rows = load_project_alias_from_erp() print(f'Loaded project aliases from ERP: {len(alias_rows)}') replace_project_alias(conn, alias_rows) except sqlite3.OperationalError as e: print(f'Load project aliases skipped due to SQLite lock: {e}') except Exception as e: print(f'Load project aliases from ERP failed, keep existing aliases: {e}') # 직급은 MySQL member.rankCode -> systemconfig 매핑 기반으로 /api/load 시 반영 try: fixed = backfill_daily_hours_if_needed(conn) if fixed: print(f'Backfilled hour columns for {fixed} rows.') except sqlite3.OperationalError as e: print(f'Backfill skipped due to SQLite lock: {e}') port = int(os.environ.get('PORT', '8090')) host = '0.0.0.0' print(f'Server running: http://{local_ip()}:{port}') print(f'Local access: http://127.0.0.1:{port}') print('Tip: click "DB 로드/갱신" to load from MySQL + ERP alias sync.') ThreadingHTTPServer((host, port), Handler).serve_forever()