feat: improve lifecycle allocation popup flow and project cost visibility

This commit is contained in:
2026-04-22 17:19:26 +09:00
parent 6e8f606591
commit bc611c3ff7
2 changed files with 1866 additions and 66 deletions

View File

@@ -188,6 +188,9 @@ MANAGEMENT_EXCLUDED_ACCOUNT_CODES = {
"962", # 잡손실
"999", # 법인세등
}
# In project/project-lifecycle screens, these accounts should be hidden
# entirely from aggregates and detail rows.
PROJECT_VIEW_EXCLUDED_ACCOUNT_CODES = set(MANAGEMENT_EXCLUDED_ACCOUNT_CODES)
ACCOUNT_STRUCTURE_TEMPLATE = [
{"section": "수입", "group": "수입", "categories": ["공사수입", "용역수입", "기타수입", "당좌자산"]},
{"section": "영업외 수지", "group": "영업외수익", "categories": ["이자수입", "잡이익", "배당수익"]},
@@ -598,6 +601,28 @@ def init_db() -> None:
)
"""
)
cur.execute(
"""
create table if not exists project_relations (
project_code text not null,
related_project_code text not null,
updated_at text not null,
primary key (project_code, related_project_code)
)
"""
)
cur.execute(
"""
create table if not exists project_lifecycle_allocations (
base_project_code text not null,
source_project_code text not null,
allocation_numerator integer not null default 1,
allocation_denominator integer not null default 1,
updated_at text not null,
primary key (base_project_code, source_project_code)
)
"""
)
cur.execute(
"""
create table if not exists project_budget_lines (
@@ -661,6 +686,7 @@ def init_db() -> None:
cur.execute("create index if not exists idx_project_pile_progress_entries_project_code on project_pile_progress_entries(project_code)")
cur.execute("create index if not exists idx_project_budget_lines_project_code on project_budget_lines(project_code)")
cur.execute("create index if not exists idx_project_budget_account_lines_project_code on project_budget_account_lines(project_code)")
cur.execute("create index if not exists idx_project_lifecycle_allocations_base_project_code on project_lifecycle_allocations(base_project_code)")
existing_cols = [row["name"] for row in cur.execute("pragma table_info(project_master)").fetchall()]
if "construction_family" not in existing_cols:
cur.execute("alter table project_master add column construction_family text")
@@ -808,6 +834,558 @@ def fetch_project_defaults(conn: sqlite3.Connection, project_code: str) -> dict:
return dict(row) if row else {"project_code": project_code, "project_name": "", "project_type": ""}
def build_related_projects(conn: sqlite3.Connection, project_code: str, project_name: str = "") -> list[dict]:
excluded_values = sorted(PROJECT_VIEW_EXCLUDED_ACCOUNT_CODES)
excluded_placeholders = ",".join("?" for _ in excluded_values) if excluded_values else ""
excluded_clause = (
f"and coalesce(tx.account_code_final, '') not in ({excluded_placeholders})"
if excluded_placeholders
else ""
)
rows = conn.execute(
f"""
with recursive related_codes(project_code) as (
select ?
union
select pr.related_project_code
from project_relations pr
join related_codes rc on rc.project_code = pr.project_code
where coalesce(pr.related_project_code, '') <> ''
union
select pr.project_code
from project_relations pr
join related_codes rc on rc.project_code = pr.related_project_code
where coalesce(pr.project_code, '') <> ''
),
code_set as (
select distinct project_code
from related_codes
where coalesce(project_code, '') <> ''
),
project_summary as (
select
tx.project_code as project_code,
max(tx.project_name) as project_name,
max(tx.project_type) as transaction_project_type,
sum(case when tx.in_out = '입금' then coalesce(tx.supply_amount, 0) else 0 end) as income_supply,
sum(case when tx.in_out = '출금' then coalesce(tx.supply_amount, 0) else 0 end) as expense_supply,
count(*) as txn_count,
min(tx.transaction_date) as min_date,
max(tx.transaction_date) as max_date
from ptc_transactions tx
join code_set cs on cs.project_code = tx.project_code
where 1 = 1
{excluded_clause}
group by tx.project_code
),
master_rows as (
select
pm.project_code as project_code,
pm.project_name as project_name,
pm.project_type as master_project_type,
pm.construction_family as construction_family,
pm.construction_method as construction_method,
pm.start_date as start_date,
pm.end_date as end_date,
pm.note as note
from project_master pm
join code_set cs on cs.project_code = pm.project_code
)
select
cs.project_code,
coalesce(ps.project_name, mr.project_name, '') as project_name,
coalesce(ps.transaction_project_type, '') as transaction_project_type,
coalesce(mr.master_project_type, '') as master_project_type,
coalesce(mr.construction_family, '') as construction_family,
coalesce(mr.construction_method, '') as construction_method,
coalesce(mr.start_date, '') as start_date,
coalesce(mr.end_date, '') as end_date,
coalesce(mr.note, '') as note,
coalesce(ps.income_supply, 0) as income_supply,
coalesce(ps.expense_supply, 0) as expense_supply,
coalesce(ps.txn_count, 0) as txn_count,
coalesce(ps.min_date, '') as min_date,
coalesce(ps.max_date, '') as max_date
from code_set cs
left join project_summary ps on ps.project_code = cs.project_code
left join master_rows mr on mr.project_code = cs.project_code
order by cs.project_code
""",
[project_code, *excluded_values],
).fetchall()
role_order = {"영업": 0, "설계": 1, "시공": 2, "관리": 3}
items: list[dict] = []
seen_codes: set[str] = set()
for row in rows:
row_dict = dict(row)
code = (row_dict.get("project_code") or "").strip()
if not code or code in seen_codes:
continue
seen_codes.add(code)
resolved_type = resolve_project_type(
code,
row_dict.get("transaction_project_type") or "",
row_dict.get("master_project_type") or "",
)
income_supply = float(row_dict.get("income_supply") or 0)
expense_supply = float(row_dict.get("expense_supply") or 0)
item = {
"project_code": code,
"project_name": row_dict.get("project_name") or project_name or "",
"project_type": resolved_type,
"construction_family": resolve_construction_family(
row_dict.get("construction_method") or "",
row_dict.get("construction_family") or "",
),
"construction_method": row_dict.get("construction_method") or "",
"start_date": row_dict.get("start_date") or "",
"end_date": row_dict.get("end_date") or "",
"note": row_dict.get("note") or "",
"income_supply": income_supply,
"expense_supply": expense_supply,
"profit_supply": income_supply - expense_supply,
"txn_count": int(row_dict.get("txn_count") or 0),
"min_date": row_dict.get("min_date") or "",
"max_date": row_dict.get("max_date") or "",
"is_current": code == project_code,
}
items.append(item)
items.sort(key=lambda item: (role_order.get(item["project_type"], 9), item["project_code"]))
return items
def fetch_lifecycle_allocation_map(conn: sqlite3.Connection, base_project_code: str) -> dict[str, dict]:
if not base_project_code:
return {}
rows = conn.execute(
"""
select
source_project_code,
allocation_numerator,
allocation_denominator
from project_lifecycle_allocations
where base_project_code = ?
""",
(base_project_code,),
).fetchall()
allocation_map: dict[str, dict] = {}
for row in rows:
source_project_code = (row["source_project_code"] or "").strip()
numerator = int(row["allocation_numerator"] or 0)
denominator = int(row["allocation_denominator"] or 1)
if not source_project_code:
continue
if denominator <= 0:
denominator = 1
numerator = max(0, min(numerator, denominator))
allocation_map[source_project_code] = {
"allocation_numerator": numerator,
"allocation_denominator": denominator,
"allocation_ratio": (numerator / denominator) if denominator > 0 else 1.0,
"has_custom_allocation": True,
}
return allocation_map
def resolve_lifecycle_allocation(project_type: str, allocation_item: dict | None) -> tuple[int, int, float]:
if project_type in {"영업", "설계"}:
if allocation_item:
numerator = int(allocation_item.get("allocation_numerator") or 0)
denominator = int(allocation_item.get("allocation_denominator") or 1)
if denominator <= 0:
denominator = 1
numerator = max(0, min(numerator, denominator))
ratio = numerator / denominator
return numerator, denominator, ratio
return 1, 1, 1.0
return 1, 1, 1.0
def build_company_allocated_project_rows(
conn: sqlite3.Connection, project_rows: list[sqlite3.Row], source_project_type: str
) -> tuple[list[dict], dict]:
raw_rows = rows_to_dicts(project_rows)
if source_project_type not in {"영업", "설계"} or not raw_rows:
return raw_rows, {"enabled": False}
related_cache: dict[str, list[str]] = {}
project_meta_cache: dict[str, dict] = {}
allocated_map: dict[str, dict] = {}
def get_project_meta(project_code: str) -> dict:
cached = project_meta_cache.get(project_code)
if cached is not None:
return cached
master = fetch_project_master(conn, project_code) or {}
defaults = fetch_project_defaults(conn, project_code)
resolved_type = resolve_project_type(
project_code,
defaults.get("project_type", ""),
master.get("project_type", ""),
)
item = {
"project_code": project_code,
"project_name": (master.get("project_name") or defaults.get("project_name") or "").strip(),
"project_type": resolved_type,
}
project_meta_cache[project_code] = item
return item
def get_target_codes(source_project_code: str) -> list[str]:
cached = related_cache.get(source_project_code)
if cached is not None:
return cached
related = build_related_projects(conn, source_project_code)
candidates = [item for item in related if (item.get("project_code") or "").strip() and item.get("project_code") != source_project_code]
construction_targets = [item for item in candidates if (item.get("project_type") or "").strip() == "시공"]
target_items = construction_targets if construction_targets else candidates
target_codes = sorted({(item.get("project_code") or "").strip() for item in target_items if (item.get("project_code") or "").strip()})
if not target_codes:
target_codes = [source_project_code]
related_cache[source_project_code] = target_codes
return target_codes
for row in raw_rows:
source_code = (row.get("project_code") or "").strip()
if not source_code:
continue
target_codes = get_target_codes(source_code)
divisor = max(len(target_codes), 1)
income_supply = float(row.get("income_supply_sum") or 0)
expense_supply = float(row.get("expense_supply_sum") or 0)
supply_sum = float(row.get("supply_sum") or 0)
txn_count = float(row.get("txn_count") or 0)
income_count = float(row.get("income_count") or 0)
expense_count = float(row.get("expense_count") or 0)
for target_code in target_codes:
meta = get_project_meta(target_code)
entry = allocated_map.setdefault(
target_code,
{
"project_code": target_code,
"project_name": meta.get("project_name") or target_code,
"project_type": meta.get("project_type") or "",
"txn_count": 0.0,
"income_count": 0.0,
"expense_count": 0.0,
"income_supply_sum": 0.0,
"expense_supply_sum": 0.0,
"supply_sum": 0.0,
},
)
entry["txn_count"] += txn_count / divisor
entry["income_count"] += income_count / divisor
entry["expense_count"] += expense_count / divisor
entry["income_supply_sum"] += income_supply / divisor
entry["expense_supply_sum"] += expense_supply / divisor
entry["supply_sum"] += supply_sum / divisor
allocated_rows = list(allocated_map.values())
for row in allocated_rows:
row["txn_count"] = int(round(float(row.get("txn_count") or 0)))
row["income_count"] = int(round(float(row.get("income_count") or 0)))
row["expense_count"] = int(round(float(row.get("expense_count") or 0)))
allocated_rows.sort(key=lambda item: (-float(item.get("supply_sum") or 0), item.get("project_code") or ""))
return allocated_rows, {
"enabled": True,
"mode": "project_count_equal_split",
"source_project_type": source_project_type,
"source_project_count": len(raw_rows),
"target_project_count": len(allocated_rows),
}
def build_project_lifecycle_cost(
conn: sqlite3.Connection, related_projects: list[dict], current_project_type: str, base_project_code: str
) -> dict | None:
if current_project_type != "시공":
return None
allocation_map = fetch_lifecycle_allocation_map(conn, base_project_code)
role_order = ["영업", "설계", "시공", "관리"]
rows = [item for item in related_projects if item.get("project_type") in role_order]
if not rows:
return None
rows_with_allocation: list[dict] = []
for item in rows:
row = dict(item)
project_type = (row.get("project_type") or "").strip()
numerator, denominator, ratio = resolve_lifecycle_allocation(
project_type,
allocation_map.get((row.get("project_code") or "").strip()),
)
row["allocation_numerator"] = numerator
row["allocation_denominator"] = denominator
row["allocation_ratio"] = ratio
row["has_custom_allocation"] = bool(allocation_map.get((row.get("project_code") or "").strip()))
row["adjusted_expense_supply"] = float(row.get("expense_supply") or 0) * ratio
rows_with_allocation.append(row)
rows.sort(key=lambda item: (role_order.index(item["project_type"]), item["project_code"]))
total_income = sum(float(item.get("income_supply") or 0) for item in rows_with_allocation)
total_expense = sum(float(item.get("adjusted_expense_supply") or 0) for item in rows_with_allocation)
project_codes = [item.get("project_code") for item in rows_with_allocation if item.get("project_code")]
breakdown_totals = {"시공비": 0.0, "인건비": 0.0, "관리비": 0.0}
breakdown_project_maps: dict[str, dict[str, dict]] = {
"시공비": {},
"인건비": {},
"관리비": {},
}
breakdown_account_maps: dict[str, dict[str, dict]] = {
"시공비": {},
"인건비": {},
"관리비": {},
}
project_lookup = {item.get("project_code"): item for item in rows_with_allocation if item.get("project_code")}
if project_codes:
placeholders = ",".join("?" for _ in project_codes)
excluded_values = sorted(PROJECT_VIEW_EXCLUDED_ACCOUNT_CODES)
excluded_placeholders = ",".join("?" for _ in excluded_values) if excluded_values else ""
excluded_clause = (
f"and coalesce(t.account_code_final, '') not in ({excluded_placeholders})"
if excluded_placeholders
else ""
)
expense_rows = conn.execute(
f"""
select
t.project_code,
t.project_type,
t.account_code_final as account_code,
coalesce(sum(t.supply_amount), 0) as expense_supply
from ptc_transactions t
where t.in_out = '출금'
and coalesce(t.project_code, '') in ({placeholders})
{excluded_clause}
group by t.project_code, t.project_type, t.account_code_final
""",
[*project_codes, *excluded_values],
).fetchall()
for row in expense_rows:
account_code = (row["account_code"] or "").strip()
project_code = (row["project_code"] or "").strip()
project_type = (row["project_type"] or "").strip()
meta = ACCOUNT_MASTER.get(account_code)
bucket = classify_lifecycle_bucket(account_code, project_code, project_type, meta)
project_info = project_lookup.get(project_code, {})
numerator = int(project_info.get("allocation_numerator") or 1)
denominator = int(project_info.get("allocation_denominator") or 1)
allocation_ratio = float(project_info.get("allocation_ratio") or 1.0)
expense_supply = float(row["expense_supply"] or 0) * allocation_ratio
breakdown_totals[bucket] += expense_supply
project_entry = breakdown_project_maps[bucket].setdefault(
project_code,
{
"project_code": project_code,
"project_name": project_info.get("project_name") or "",
"project_type": project_info.get("project_type") or project_type or "",
"construction_family": project_info.get("construction_family") or "",
"construction_method": project_info.get("construction_method") or "",
"allocation_numerator": numerator,
"allocation_denominator": denominator,
"allocation_ratio": allocation_ratio,
"expense_supply": 0.0,
},
)
project_entry["expense_supply"] += expense_supply
account_entry = breakdown_account_maps[bucket].setdefault(
account_code or "미지정",
{
"account_code": account_code or "",
"account_name": (meta or {}).get("name") or (row["account_code"] or ""),
"expense_supply": 0.0,
},
)
account_entry["expense_supply"] += expense_supply
breakdown = [
{
"label": "시공비",
"expense_supply": breakdown_totals["시공비"],
"projects": sorted(
breakdown_project_maps["시공비"].values(),
key=lambda item: (-float(item.get("expense_supply") or 0), item.get("project_code") or ""),
),
"accounts": sorted(
breakdown_account_maps["시공비"].values(),
key=lambda item: (-float(item.get("expense_supply") or 0), item.get("account_code") or ""),
),
},
{
"label": "인건비",
"expense_supply": 0.0,
"projects": [],
"accounts": [],
},
{
"label": "관리비",
"expense_supply": breakdown_totals["관리비"],
"projects": sorted(
breakdown_project_maps["관리비"].values(),
key=lambda item: (-float(item.get("expense_supply") or 0), item.get("project_code") or ""),
),
"accounts": sorted(
breakdown_account_maps["관리비"].values(),
key=lambda item: (-float(item.get("expense_supply") or 0), item.get("account_code") or ""),
),
},
]
return {
"rows": sorted(
rows_with_allocation,
key=lambda item: (role_order.index(item["project_type"]), item["project_code"]),
),
"breakdown": breakdown,
"summary": {
"income_supply": total_income,
"expense_supply": total_expense,
"profit_supply": total_income - total_expense,
},
}
def classify_lifecycle_bucket(account_code: str, project_code: str, project_type: str, meta: dict | None = None) -> str:
meta = meta or ACCOUNT_MASTER.get(account_code)
if meta:
if meta.get("category") == "인건비":
return "시공비"
if meta.get("project_type") == "시공":
return "시공비"
return "관리비"
if "-시공-" in project_code or project_type == "시공":
return "시공비"
return "관리비"
def build_lifecycle_account_detail(
conn: sqlite3.Connection,
related_projects: list[dict],
base_project_code: str,
current_project_type: str,
bucket_label: str,
account_code: str,
) -> dict | None:
if current_project_type != "시공" or not account_code:
return None
role_order = ["영업", "설계", "시공", "관리"]
rows = [item for item in related_projects if item.get("project_type") in role_order]
if not rows:
return None
project_codes = [item.get("project_code") for item in rows if item.get("project_code")]
if not project_codes:
return None
placeholders = ",".join("?" for _ in project_codes)
excluded_values = sorted(PROJECT_VIEW_EXCLUDED_ACCOUNT_CODES)
excluded_placeholders = ",".join("?" for _ in excluded_values) if excluded_values else ""
excluded_clause = (
f"and coalesce(account_code_final, '') not in ({excluded_placeholders})"
if excluded_placeholders
else ""
)
query_values = [account_code, *project_codes, *excluded_values]
tx_rows = conn.execute(
f"""
select
source_row_no,
transaction_date,
in_out,
project_code,
project_name,
project_type,
vendor_name,
department_name,
description,
account_code_final as account_code,
account_name_final as account_name,
supply_amount
from ptc_transactions
where in_out = '출금'
and coalesce(account_code_final, '') = ?
and coalesce(project_code, '') in ({placeholders})
{excluded_clause}
order by transaction_date desc, source_row_no desc
""",
query_values,
).fetchall()
allocation_map = fetch_lifecycle_allocation_map(conn, base_project_code)
related_project_type_map = {
(item.get("project_code") or "").strip(): (item.get("project_type") or "").strip()
for item in related_projects
if (item.get("project_code") or "").strip()
}
filtered_transactions: list[dict] = []
project_map: dict[str, dict] = {}
for row in tx_rows:
row_dict = dict(row)
tx_project_code = (row_dict.get("project_code") or "").strip()
tx_project_type = related_project_type_map.get(tx_project_code) or (row_dict.get("project_type") or "").strip()
meta = ACCOUNT_MASTER.get(account_code)
bucket = classify_lifecycle_bucket(account_code, tx_project_code, tx_project_type, meta)
if bucket != bucket_label:
continue
numerator, denominator, ratio = resolve_lifecycle_allocation(tx_project_type, allocation_map.get(tx_project_code))
allocated_supply_amount = float(row_dict.get("supply_amount") or 0) * ratio
row_dict["allocated_supply_amount"] = allocated_supply_amount
row_dict["allocation_numerator"] = numerator
row_dict["allocation_denominator"] = denominator
row_dict["allocation_ratio"] = ratio
filtered_transactions.append(row_dict)
project_entry = project_map.setdefault(
tx_project_code or "-",
{
"project_code": tx_project_code or "",
"project_name": row_dict.get("project_name") or "",
"project_type": tx_project_type or "",
"allocation_numerator": numerator,
"allocation_denominator": denominator,
"allocation_ratio": ratio,
"expense_supply_sum": 0.0,
"txn_count": 0,
},
)
project_entry["expense_supply_sum"] += allocated_supply_amount
project_entry["txn_count"] += 1
summary = {
"account_code": account_code,
"account_name": (ACCOUNT_MASTER.get(account_code) or {}).get("name") or account_code,
"income_supply_sum": 0.0,
"expense_supply_sum": sum(float(row.get("allocated_supply_amount") or 0) for row in filtered_transactions),
"txn_count": len(filtered_transactions),
"min_date": min((row.get("transaction_date") or "" for row in filtered_transactions), default=""),
"max_date": max((row.get("transaction_date") or "" for row in filtered_transactions), default=""),
}
return {
"summary": summary,
"projects": sorted(
project_map.values(),
key=lambda item: (-float(item.get("expense_supply_sum") or 0), item.get("project_code") or ""),
),
"transactions": filtered_transactions[:100],
}
def get_project_account_issues(conn: sqlite3.Connection, project_code: str, resolved_project_type: str) -> list[dict]:
allowed_codes = ALLOWED_ACCOUNT_CODES_BY_PROJECT_TYPE.get(resolved_project_type)
if not allowed_codes:
@@ -1084,6 +1662,10 @@ def build_where(params: dict[str, list[str]]) -> tuple[str, list]:
def build_project_where(project_code: str, keyword: str = "", in_out: str = "전체") -> tuple[str, list]:
clauses = ["project_code = ?"]
values = [project_code]
if PROJECT_VIEW_EXCLUDED_ACCOUNT_CODES:
excluded_placeholders = ",".join("?" for _ in PROJECT_VIEW_EXCLUDED_ACCOUNT_CODES)
clauses.append(f"coalesce(account_code_final, '') not in ({excluded_placeholders})")
values.extend(sorted(PROJECT_VIEW_EXCLUDED_ACCOUNT_CODES))
if keyword.strip():
like = f"%{keyword.strip().lower()}%"
@@ -1241,6 +1823,15 @@ class Handler(BaseHTTPRequestHandler):
start_date = str(payload.get("start_date", "")).strip()
end_date = str(payload.get("end_date", "")).strip()
note = str(payload.get("note", "")).strip()
raw_related_project_codes = payload.get("related_project_codes", [])
if isinstance(raw_related_project_codes, str):
raw_related_project_codes = re.split(r"[\s,]+", raw_related_project_codes)
related_project_codes = []
for code in raw_related_project_codes if isinstance(raw_related_project_codes, list) else []:
normalized = str(code or "").strip()
if not normalized or normalized == project_code or normalized in related_project_codes:
continue
related_project_codes.append(normalized)
updated_at = datetime.now().isoformat()
conn.execute(
@@ -1260,8 +1851,103 @@ class Handler(BaseHTTPRequestHandler):
""",
(project_code, project_name, project_type, construction_family, construction_method, start_date, end_date, note, updated_at),
)
conn.execute(
"delete from project_relations where project_code = ? or related_project_code = ?",
(project_code, project_code),
)
for related_project_code in related_project_codes:
conn.execute(
"""
insert or replace into project_relations(project_code, related_project_code, updated_at)
values (?, ?, ?)
""",
(project_code, related_project_code, updated_at),
)
conn.execute(
"""
insert or replace into project_relations(project_code, related_project_code, updated_at)
values (?, ?, ?)
""",
(related_project_code, project_code, updated_at),
)
conn.commit()
self._send(200, {"ok": True, "item": fetch_project_master(conn, project_code)})
item = fetch_project_master(conn, project_code) or {}
item["related_projects"] = build_related_projects(conn, project_code, item.get("project_name") or project_name)
self._send(200, {"ok": True, "item": item})
return
if parsed.path == "/api/lifecycle-allocation/upsert":
payload = self._read_json()
base_project_code = str(payload.get("base_project_code", "")).strip()
source_project_code = str(payload.get("source_project_code", "")).strip()
allocation_numerator = int(payload.get("allocation_numerator", 1) or 0)
allocation_denominator = int(payload.get("allocation_denominator", 1) or 1)
if not base_project_code or not source_project_code:
self._send(400, {"ok": False, "message": "base_project_code and source_project_code are required"})
return
if allocation_denominator <= 0:
self._send(400, {"ok": False, "message": "allocation_denominator must be greater than 0"})
return
if allocation_numerator < 0:
self._send(400, {"ok": False, "message": "allocation_numerator must be 0 or greater"})
return
if allocation_numerator > allocation_denominator:
self._send(400, {"ok": False, "message": "allocation_numerator must be <= allocation_denominator"})
return
updated_at = datetime.now().isoformat()
conn.execute(
"""
insert into project_lifecycle_allocations (
base_project_code, source_project_code, allocation_numerator, allocation_denominator, updated_at
) values (?, ?, ?, ?, ?)
on conflict(base_project_code, source_project_code) do update set
allocation_numerator = excluded.allocation_numerator,
allocation_denominator = excluded.allocation_denominator,
updated_at = excluded.updated_at
""",
(
base_project_code,
source_project_code,
allocation_numerator,
allocation_denominator,
updated_at,
),
)
conn.commit()
self._send(
200,
{
"ok": True,
"item": {
"base_project_code": base_project_code,
"source_project_code": source_project_code,
"allocation_numerator": allocation_numerator,
"allocation_denominator": allocation_denominator,
"allocation_ratio": allocation_numerator / allocation_denominator if allocation_denominator > 0 else 1.0,
"updated_at": updated_at,
},
},
)
return
if parsed.path == "/api/lifecycle-allocation/delete":
payload = self._read_json()
base_project_code = str(payload.get("base_project_code", "")).strip()
source_project_code = str(payload.get("source_project_code", "")).strip()
if not base_project_code or not source_project_code:
self._send(400, {"ok": False, "message": "base_project_code and source_project_code are required"})
return
conn.execute(
"""
delete from project_lifecycle_allocations
where base_project_code = ?
and source_project_code = ?
""",
(base_project_code, source_project_code),
)
conn.commit()
self._send(200, {"ok": True})
return
if parsed.path == "/api/project-master/batch-update-method":
@@ -2178,18 +2864,10 @@ class Handler(BaseHTTPRequestHandler):
date_to = params.get("date_to", [""])[0].strip()
clauses = ["project_code is not null", "project_code <> ''"]
values = []
if keyword:
like = f"%{keyword}%"
clauses.append(
"""
(
lower(coalesce(project_code, '')) like ?
or lower(coalesce(project_name, '')) like ?
or lower(coalesce(project_type, '')) like ?
)
"""
)
values.extend([like, like, like])
if PROJECT_VIEW_EXCLUDED_ACCOUNT_CODES:
excluded_placeholders = ",".join("?" for _ in PROJECT_VIEW_EXCLUDED_ACCOUNT_CODES)
clauses.append(f"coalesce(account_code_final, '') not in ({excluded_placeholders})")
values.extend(sorted(PROJECT_VIEW_EXCLUDED_ACCOUNT_CODES))
if project_type and project_type != "전체":
clauses.append("project_type = ?")
values.append(project_type)
@@ -2252,6 +2930,38 @@ class Handler(BaseHTTPRequestHandler):
item["construction_family"] = ""
item["construction_method"] = ""
item["note"] = ""
related_projects = build_related_projects(
conn,
item["project_code"],
item.get("project_name") or "",
)
related_search_terms = []
for rel in related_projects:
related_search_terms.extend(
[
rel.get("project_code") or "",
rel.get("project_name") or "",
rel.get("project_type") or "",
]
)
item["related_projects"] = related_projects
item["related_search_text"] = " ".join(
filter(
None,
[
item.get("project_code") or "",
item.get("project_name") or "",
item.get("project_type") or "",
*related_search_terms,
],
)
).lower()
if keyword:
items = [
item
for item in items
if keyword in (item.get("related_search_text") or "")
]
self._send(200, {"items": items})
return
@@ -2945,11 +3655,16 @@ class Handler(BaseHTTPRequestHandler):
detail_values,
).fetchall()
allocated_projects, allocation_meta = build_company_allocated_project_rows(
conn, project_rows, project_type
)
self._send(
200,
{
"summary": dict(summary) if summary else None,
"projects": rows_to_dicts(project_rows),
"projects": allocated_projects,
"project_allocation": allocation_meta,
"transactions": rows_to_dicts(transaction_rows),
},
)
@@ -3162,6 +3877,32 @@ class Handler(BaseHTTPRequestHandler):
)
return
if parsed.path == "/api/lifecycle-account-detail":
project_code = params.get("project_code", [""])[0].strip()
bucket_label = params.get("bucket_label", [""])[0].strip()
account_code = params.get("account_code", [""])[0].strip()
if not project_code or not bucket_label or not account_code:
self._send(400, {"ok": False, "message": "project_code, bucket_label and account_code are required"})
return
master = fetch_project_master(conn, project_code) or fetch_project_defaults(conn, project_code)
project_name = (master or {}).get("project_name") or ""
resolved_project_type = resolve_project_type(project_code, (master or {}).get("project_type") or "")
related_projects = build_related_projects(conn, project_code, project_name)
detail = build_lifecycle_account_detail(
conn,
related_projects,
project_code,
resolved_project_type,
bucket_label,
account_code,
)
if not detail:
self._send(404, {"ok": False, "message": "lifecycle account detail not found"})
return
self._send(200, detail)
return
if parsed.path == "/api/management-excluded-account-detail":
account_code = params.get("account_code", [""])[0].strip()
date_from = params.get("date_from", [""])[0].strip()
@@ -3462,6 +4203,17 @@ class Handler(BaseHTTPRequestHandler):
summary_dict["start_date"] = ""
summary_dict["end_date"] = ""
summary_dict["note"] = ""
related_projects = build_related_projects(
conn,
project_code,
summary_dict.get("project_name") if summary_dict else "",
)
lifecycle_cost = build_project_lifecycle_cost(
conn,
related_projects,
summary_dict["project_type"] if summary_dict else "",
project_code,
)
account_issues = get_project_account_issues(
conn,
project_code,
@@ -3479,6 +4231,8 @@ class Handler(BaseHTTPRequestHandler):
"accounts": rows_to_dicts(account_rows),
"account_issues": account_issues,
"transactions": rows_to_dicts(transaction_rows),
"related_projects": related_projects,
"lifecycle_cost": lifecycle_cost,
},
)
return