Initial commit: Organized PTC project structure with .gitignore and README

This commit is contained in:
2026-03-23 14:44:39 +09:00
commit 35ababe236
21 changed files with 8921 additions and 0 deletions

208
db/import_ptc_xlsx.py Normal file
View File

@@ -0,0 +1,208 @@
#!/usr/bin/env python3
"""
Parse PTC(2023-2026.02).xlsx without external dependencies and export a CSV
that can be loaded into budget_app.staging_ptc_transactions.
Usage:
python3 db/import_ptc_xlsx.py \
--input "PTC(2023-2026.02).xlsx" \
--output db/ptc_staging.csv \
--batch ptc_20260323
"""
from __future__ import annotations
import argparse
import csv
import re
from collections import defaultdict
from datetime import datetime, timedelta
from pathlib import Path
from xml.etree import ElementTree as ET
from zipfile import ZipFile
NS = {"a": "http://schemas.openxmlformats.org/spreadsheetml/2006/main"}
EXPECTED_HEADERS = [
"거래일",
"입/출금",
"계정코드",
"구분",
"부서",
"거래처",
"프로젝트코드",
"프로젝트 구분(안)",
"프로젝트명",
"적요",
"공급가액",
"부가세",
"합계금액",
"비고",
]
def col_to_num(col: str) -> int:
value = 0
for ch in col:
if ch.isalpha():
value = value * 26 + ord(ch.upper()) - 64
return value
def read_shared_strings(book: ZipFile) -> list[str]:
strings = []
root = ET.fromstring(book.read("xl/sharedStrings.xml"))
for si in root.findall("a:si", NS):
text = "".join(node.text or "" for node in si.iterfind(".//a:t", NS))
strings.append(text)
return strings
def read_sheet_rows(book: ZipFile, shared_strings: list[str], sheet_path: str) -> list[list[str]]:
root = ET.fromstring(book.read(sheet_path))
rows = []
for row in root.find("a:sheetData", NS).findall("a:row", NS):
values = defaultdict(str)
for cell in row.findall("a:c", NS):
ref = cell.attrib.get("r", "")
match = re.match(r"([A-Z]+)(\d+)", ref)
col = col_to_num(match.group(1)) if match else None
value_node = cell.find("a:v", NS)
if value_node is None:
value = ""
else:
value = value_node.text or ""
if cell.attrib.get("t") == "s":
value = shared_strings[int(value)]
values[col] = value
width = max(values) if values else 0
rows.append([values[i] for i in range(1, width + 1)])
return rows
def excel_serial_to_date(value: str) -> str:
if not value:
return ""
try:
serial = float(value)
except ValueError:
return value
base = datetime(1899, 12, 30)
return (base + timedelta(days=serial)).strftime("%Y-%m-%d")
def parse_amount(value: str) -> str:
value = (value or "").strip()
if not value or value == "-":
return ""
normalized = value.replace(",", "")
return normalized
def normalize_transaction_type(in_out: str, account_name: str) -> str:
in_out = (in_out or "").strip()
account_name = (account_name or "").strip()
if in_out == "입금":
return "revenue"
if in_out == "출금":
if "수입" in account_name or "매출" in account_name:
return "revenue"
return "cost_expense"
return ""
def export_csv(input_path: Path, output_path: Path, batch_name: str) -> None:
with ZipFile(input_path) as book:
shared_strings = read_shared_strings(book)
rows = read_sheet_rows(book, shared_strings, "xl/worksheets/sheet1.xml")
if not rows:
raise ValueError("No rows found in workbook")
headers = rows[0]
if headers != EXPECTED_HEADERS:
raise ValueError(f"Unexpected headers: {headers}")
data_rows = rows[1:]
width = len(EXPECTED_HEADERS)
output_path.parent.mkdir(parents=True, exist_ok=True)
with output_path.open("w", newline="", encoding="utf-8-sig") as fp:
writer = csv.DictWriter(
fp,
fieldnames=[
"import_batch",
"source_file_name",
"source_sheet_name",
"source_row_no",
"transaction_date_raw",
"transaction_date",
"in_out",
"account_code_raw",
"account_name_raw",
"department_name_raw",
"vendor_name_raw",
"project_code_raw",
"project_type_raw",
"project_name_raw",
"description_raw",
"supply_amount_raw",
"vat_amount_raw",
"total_amount_raw",
"remarks_raw",
"supply_amount",
"vat_amount",
"total_amount",
"normalized_transaction_type",
"load_status",
"load_error",
],
)
writer.writeheader()
for index, row in enumerate(data_rows, start=2):
current = row + [""] * (width - len(row)) if len(row) < width else row[:width]
writer.writerow(
{
"import_batch": batch_name,
"source_file_name": input_path.name,
"source_sheet_name": "Sheet1",
"source_row_no": index,
"transaction_date_raw": current[0],
"transaction_date": excel_serial_to_date(current[0]),
"in_out": current[1],
"account_code_raw": current[2],
"account_name_raw": current[3],
"department_name_raw": current[4],
"vendor_name_raw": current[5],
"project_code_raw": current[6],
"project_type_raw": current[7],
"project_name_raw": current[8],
"description_raw": current[9],
"supply_amount_raw": current[10],
"vat_amount_raw": current[11],
"total_amount_raw": current[12],
"remarks_raw": current[13],
"supply_amount": parse_amount(current[10]),
"vat_amount": parse_amount(current[11]),
"total_amount": parse_amount(current[12]),
"normalized_transaction_type": normalize_transaction_type(current[1], current[3]),
"load_status": "loaded",
"load_error": "",
}
)
def main() -> None:
parser = argparse.ArgumentParser()
parser.add_argument("--input", required=True, help="Path to the xlsx file")
parser.add_argument("--output", required=True, help="Path to the output CSV file")
parser.add_argument("--batch", required=True, help="Import batch name")
args = parser.parse_args()
export_csv(Path(args.input), Path(args.output), args.batch)
print(f"CSV exported to {args.output}")
if __name__ == "__main__":
main()