#!/usr/bin/env python3 """ Parse PTC(2023-2026.02).xlsx without external dependencies and export a CSV that can be loaded into budget_app.staging_ptc_transactions. Usage: python3 db/import_ptc_xlsx.py \ --input "PTC(2023-2026.02).xlsx" \ --output db/ptc_staging.csv \ --batch ptc_20260323 """ from __future__ import annotations import argparse import csv import re from collections import defaultdict from datetime import datetime, timedelta from pathlib import Path from xml.etree import ElementTree as ET from zipfile import ZipFile NS = {"a": "http://schemas.openxmlformats.org/spreadsheetml/2006/main"} EXPECTED_HEADERS = [ "거래일", "입/출금", "계정코드", "구분", "부서", "거래처", "프로젝트코드", "프로젝트 구분(안)", "프로젝트명", "적요", "공급가액", "부가세", "합계금액", "비고", ] def col_to_num(col: str) -> int: value = 0 for ch in col: if ch.isalpha(): value = value * 26 + ord(ch.upper()) - 64 return value def read_shared_strings(book: ZipFile) -> list[str]: strings = [] root = ET.fromstring(book.read("xl/sharedStrings.xml")) for si in root.findall("a:si", NS): text = "".join(node.text or "" for node in si.iterfind(".//a:t", NS)) strings.append(text) return strings def read_sheet_rows(book: ZipFile, shared_strings: list[str], sheet_path: str) -> list[list[str]]: root = ET.fromstring(book.read(sheet_path)) rows = [] for row in root.find("a:sheetData", NS).findall("a:row", NS): values = defaultdict(str) for cell in row.findall("a:c", NS): ref = cell.attrib.get("r", "") match = re.match(r"([A-Z]+)(\d+)", ref) col = col_to_num(match.group(1)) if match else None value_node = cell.find("a:v", NS) if value_node is None: value = "" else: value = value_node.text or "" if cell.attrib.get("t") == "s": value = shared_strings[int(value)] values[col] = value width = max(values) if values else 0 rows.append([values[i] for i in range(1, width + 1)]) return rows def excel_serial_to_date(value: str) -> str: if not value: return "" try: serial = float(value) except ValueError: return value base = datetime(1899, 12, 30) return (base + timedelta(days=serial)).strftime("%Y-%m-%d") def parse_amount(value: str) -> str: value = (value or "").strip() if not value or value == "-": return "" normalized = value.replace(",", "") return normalized def normalize_transaction_type(in_out: str, account_name: str) -> str: in_out = (in_out or "").strip() account_name = (account_name or "").strip() if in_out == "입금": return "revenue" if in_out == "출금": if "수입" in account_name or "매출" in account_name: return "revenue" return "cost_expense" return "" def export_csv(input_path: Path, output_path: Path, batch_name: str) -> None: with ZipFile(input_path) as book: shared_strings = read_shared_strings(book) rows = read_sheet_rows(book, shared_strings, "xl/worksheets/sheet1.xml") if not rows: raise ValueError("No rows found in workbook") headers = rows[0] if headers != EXPECTED_HEADERS: raise ValueError(f"Unexpected headers: {headers}") data_rows = rows[1:] width = len(EXPECTED_HEADERS) output_path.parent.mkdir(parents=True, exist_ok=True) with output_path.open("w", newline="", encoding="utf-8-sig") as fp: writer = csv.DictWriter( fp, fieldnames=[ "import_batch", "source_file_name", "source_sheet_name", "source_row_no", "transaction_date_raw", "transaction_date", "in_out", "account_code_raw", "account_name_raw", "department_name_raw", "vendor_name_raw", "project_code_raw", "project_type_raw", "project_name_raw", "description_raw", "supply_amount_raw", "vat_amount_raw", "total_amount_raw", "remarks_raw", "supply_amount", "vat_amount", "total_amount", "normalized_transaction_type", "load_status", "load_error", ], ) writer.writeheader() for index, row in enumerate(data_rows, start=2): current = row + [""] * (width - len(row)) if len(row) < width else row[:width] writer.writerow( { "import_batch": batch_name, "source_file_name": input_path.name, "source_sheet_name": "Sheet1", "source_row_no": index, "transaction_date_raw": current[0], "transaction_date": excel_serial_to_date(current[0]), "in_out": current[1], "account_code_raw": current[2], "account_name_raw": current[3], "department_name_raw": current[4], "vendor_name_raw": current[5], "project_code_raw": current[6], "project_type_raw": current[7], "project_name_raw": current[8], "description_raw": current[9], "supply_amount_raw": current[10], "vat_amount_raw": current[11], "total_amount_raw": current[12], "remarks_raw": current[13], "supply_amount": parse_amount(current[10]), "vat_amount": parse_amount(current[11]), "total_amount": parse_amount(current[12]), "normalized_transaction_type": normalize_transaction_type(current[1], current[3]), "load_status": "loaded", "load_error": "", } ) def main() -> None: parser = argparse.ArgumentParser() parser.add_argument("--input", required=True, help="Path to the xlsx file") parser.add_argument("--output", required=True, help="Path to the output CSV file") parser.add_argument("--batch", required=True, help="Import batch name") args = parser.parse_args() export_csv(Path(args.input), Path(args.output), args.batch) print(f"CSV exported to {args.output}") if __name__ == "__main__": main()