363 lines
12 KiB
Python
363 lines
12 KiB
Python
# -*- coding: utf-8 -*-
|
||
"""
|
||
Aptabase 훅 공통 유틸.
|
||
|
||
상태 파일: <git-root>/.claude/state/aptabase-accum.json
|
||
- accum : 누적 토큰 (input/cache_creation/cache_read/output)
|
||
- offsets : 트랜스크립트별 읽은 byte offset
|
||
- last_sent_commit : 마지막으로 Aptabase 로 전송한 HEAD 커밋 해시
|
||
"""
|
||
import json
|
||
import os
|
||
import platform
|
||
import random
|
||
import re
|
||
import socket
|
||
import subprocess
|
||
import sys
|
||
import time
|
||
import urllib.error
|
||
import urllib.request
|
||
from datetime import datetime, timezone
|
||
from pathlib import Path
|
||
|
||
HOOK_DIR = Path(__file__).resolve().parent
|
||
CONFIG_PATH = HOOK_DIR / "aptabase.json"
|
||
# OAuth 정보는 ~/.claude.json (top-level) 에 저장됨. ~/.claude/config.json 은 API 키 캐시용.
|
||
CLAUDE_OAUTH_CONFIG = Path.home() / ".claude.json"
|
||
CLAUDE_API_CONFIG = Path.home() / ".claude" / "config.json"
|
||
SDK_VERSION = "claude-hook-aptabase@0.3.0"
|
||
|
||
# Aptabase 이벤트 이름은 고정 — 커밋 단위 flush 하나뿐
|
||
EVENT_NAME = "claude_commit"
|
||
|
||
EMPTY_TOTALS = {
|
||
"input_tokens": 0,
|
||
"cache_creation_tokens": 0,
|
||
"cache_read_tokens": 0,
|
||
"output_tokens": 0,
|
||
}
|
||
|
||
# ─── 쿼터 차감 가중치 ─────────────────────────────────────────────────────
|
||
# Anthropic 은 각 토큰 타입을 다른 요율로 쿼터에서 차감한다.
|
||
# "input 등가 토큰(Input-Equivalent Tokens)" = 실제 구독 한도에서 차감되는 양.
|
||
#
|
||
# 가중치 (모든 모델 공통, 2026-04 기준):
|
||
# input × 1.00 — 기본 기준
|
||
# cache_creation × 1.25 — 캐시 쓰기 프리미엄
|
||
# cache_read × 0.10 — 캐시 읽기 90% 할인
|
||
# output × 5.00 — 출력이 입력의 5배
|
||
TOKEN_WEIGHTS = {
|
||
"input_tokens": 1.00,
|
||
"cache_creation_tokens": 1.25,
|
||
"cache_read_tokens": 0.10,
|
||
"output_tokens": 5.00,
|
||
}
|
||
|
||
# 커밋 메시지에서 이슈 번호 추출 — 가장 구체적인 패턴부터
|
||
ISSUE_PATTERNS = [
|
||
r"(?:FEAT|BUG|FIX|TASK|PROJ|ISSUE)[-\s](\d+)",
|
||
r"(?:GH|gh|issue|close[sd]?|fix(?:e[sd])?|resolve[sd]?)[\s#-]+(\d+)",
|
||
r"#(\d+)",
|
||
]
|
||
|
||
|
||
# ─── 설정 ─────────────────────────────────────────────────────────────────
|
||
|
||
def load_config() -> dict | None:
|
||
try:
|
||
cfg = json.loads(CONFIG_PATH.read_text(encoding="utf-8"))
|
||
except Exception:
|
||
return None
|
||
if not cfg.get("enabled", True):
|
||
return None
|
||
if not cfg.get("app_key") or not cfg.get("aptabase_host"):
|
||
return None
|
||
return cfg
|
||
|
||
|
||
def _read_json(path: Path) -> dict:
|
||
try:
|
||
return json.loads(path.read_text(encoding="utf-8"))
|
||
except Exception:
|
||
return {}
|
||
|
||
|
||
def get_claude_oauth_config() -> dict:
|
||
"""~/.claude.json — OAuth 로그인 / 구독 정보가 들어 있음."""
|
||
return _read_json(CLAUDE_OAUTH_CONFIG)
|
||
|
||
|
||
def get_claude_api_config() -> dict:
|
||
"""~/.claude/config.json — API 키 캐시 등 레거시 필드."""
|
||
return _read_json(CLAUDE_API_CONFIG)
|
||
|
||
|
||
# ─── Claude / User 식별 ────────────────────────────────────────────────────
|
||
|
||
def get_claude_oauth_id() -> str:
|
||
"""OAuth 로그인 이메일. 없으면 anonymous."""
|
||
oauth = get_claude_oauth_config().get("oauthAccount") or {}
|
||
return oauth.get("emailAddress") or oauth.get("email") or "anonymous"
|
||
|
||
|
||
def get_plan() -> str:
|
||
"""구독 플랜 식별자.
|
||
|
||
우선순위:
|
||
1. oauthAccount.subscriptionType (max/pro/team/enterprise)
|
||
2. oauthAccount.billingType (stripe_subscription / apple_subscription / ...)
|
||
3. apikey (primaryApiKey 만 있는 경우)
|
||
4. unknown
|
||
"""
|
||
oauth = get_claude_oauth_config().get("oauthAccount") or {}
|
||
plan = (
|
||
oauth.get("subscriptionType")
|
||
or oauth.get("plan")
|
||
or oauth.get("billingType")
|
||
)
|
||
if plan:
|
||
return str(plan)
|
||
if get_claude_api_config().get("primaryApiKey"):
|
||
return "apikey"
|
||
return "unknown"
|
||
|
||
|
||
# ─── 네트워크 ──────────────────────────────────────────────────────────────
|
||
|
||
def get_local_ip() -> str:
|
||
try:
|
||
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
|
||
try:
|
||
s.connect(("8.8.8.8", 80))
|
||
return s.getsockname()[0]
|
||
finally:
|
||
s.close()
|
||
except Exception:
|
||
try:
|
||
return socket.gethostbyname(socket.gethostname())
|
||
except Exception:
|
||
return "unknown"
|
||
|
||
|
||
def get_public_ip(timeout: float = 2.0) -> str:
|
||
for url in ("https://api.ipify.org", "https://ifconfig.me/ip"):
|
||
try:
|
||
req = urllib.request.Request(url, headers={"User-Agent": "claude-hook"})
|
||
with urllib.request.urlopen(req, timeout=timeout) as resp:
|
||
ip = resp.read().decode().strip()
|
||
if ip:
|
||
return ip
|
||
except Exception:
|
||
continue
|
||
return "unknown"
|
||
|
||
|
||
# ─── Git ──────────────────────────────────────────────────────────────────
|
||
|
||
def run_git(args: list, cwd: str | None = None) -> str:
|
||
try:
|
||
result = subprocess.run(
|
||
["git"] + args,
|
||
cwd=cwd,
|
||
capture_output=True,
|
||
text=True,
|
||
timeout=5,
|
||
encoding="utf-8",
|
||
errors="replace",
|
||
)
|
||
if result.returncode == 0:
|
||
return result.stdout.strip()
|
||
except Exception:
|
||
pass
|
||
return ""
|
||
|
||
|
||
def get_git_root(cwd: str | None = None) -> Path | None:
|
||
root = run_git(["rev-parse", "--show-toplevel"], cwd=cwd)
|
||
return Path(root) if root else None
|
||
|
||
|
||
def get_repository_info(cwd: str | None = None) -> dict:
|
||
"""저장소 식별 정보.
|
||
|
||
- remote.origin.url 이 있으면: 그 URL + owner/repo 추출
|
||
- 없으면 (로컬 저장소): git root 의 절대 경로를 URL 자리에 넣고, 디렉터리명을 name 으로
|
||
"""
|
||
url = run_git(["config", "--get", "remote.origin.url"], cwd=cwd)
|
||
root = get_git_root(cwd)
|
||
|
||
name = ""
|
||
if url:
|
||
m = re.search(r"[:/]([^/:]+/[^/]+?)(?:\.git)?/?$", url)
|
||
if m:
|
||
name = m.group(1)
|
||
if not name and root:
|
||
name = root.name
|
||
|
||
# 로컬 저장소 (remote 미설정): 경로를 URL 대용으로 기록
|
||
if not url and root:
|
||
url = str(root).replace("\\", "/")
|
||
|
||
return {"name": name or "unknown", "url": url or ""}
|
||
|
||
|
||
def extract_issue_number(commit_message: str) -> str | None:
|
||
if not commit_message:
|
||
return None
|
||
for pattern in ISSUE_PATTERNS:
|
||
m = re.search(pattern, commit_message)
|
||
if m:
|
||
return m.group(1)
|
||
return None
|
||
|
||
|
||
# ─── 상태 파일 ────────────────────────────────────────────────────────────
|
||
|
||
def get_state_path(cwd: str | None = None) -> Path:
|
||
root = get_git_root(cwd)
|
||
base = root if root else Path(cwd or os.getcwd())
|
||
return base / ".claude" / "state" / "aptabase-accum.json"
|
||
|
||
|
||
def load_state(cwd: str | None = None) -> dict:
|
||
path = get_state_path(cwd)
|
||
try:
|
||
state = json.loads(path.read_text(encoding="utf-8"))
|
||
except Exception:
|
||
state = {}
|
||
state.setdefault("accum", dict(EMPTY_TOTALS))
|
||
for k in EMPTY_TOTALS:
|
||
state["accum"].setdefault(k, 0)
|
||
state.setdefault("offsets", {})
|
||
state.setdefault("last_sent_commit", "")
|
||
# 최근 본 트랜스크립트 경로 — git post-commit 훅이 stdin 없이 호출될 때
|
||
# fallback 으로 사용해서 턴 중간 커밋에서도 최신 토큰을 flush 할 수 있게 함
|
||
state.setdefault("last_transcript", "")
|
||
return state
|
||
|
||
|
||
def save_state(state: dict, cwd: str | None = None) -> None:
|
||
path = get_state_path(cwd)
|
||
path.parent.mkdir(parents=True, exist_ok=True)
|
||
path.write_text(json.dumps(state, indent=2), encoding="utf-8")
|
||
|
||
|
||
def total_tokens(accum: dict) -> int:
|
||
"""원시 토큰 합계 (단순 합, 참고용)."""
|
||
return sum(int(accum.get(k, 0) or 0) for k in EMPTY_TOTALS)
|
||
|
||
|
||
def consumed_tokens(accum: dict) -> int:
|
||
"""쿼터에서 실제 차감되는 "input 등가 토큰" 수.
|
||
|
||
각 토큰 타입에 가중치를 곱해 합산한다. 이 값이 구독 한도 소모를
|
||
가장 정확히 반영한다.
|
||
"""
|
||
return int(round(sum(
|
||
int(accum.get(k, 0) or 0) * w
|
||
for k, w in TOKEN_WEIGHTS.items()
|
||
)))
|
||
|
||
|
||
# ─── 트랜스크립트 파싱 ────────────────────────────────────────────────────
|
||
|
||
def accumulate_from_transcript(state: dict, transcript_path: str) -> None:
|
||
"""트랜스크립트의 새 영역(offset 이후)만 파싱해 state.accum 에 추가.
|
||
|
||
append-only JSONL 이라 byte offset 기반이 안전하다.
|
||
"""
|
||
if not transcript_path or not os.path.exists(transcript_path):
|
||
return
|
||
|
||
normalized = str(Path(transcript_path))
|
||
offset = int(state["offsets"].get(normalized, 0))
|
||
try:
|
||
size = os.path.getsize(transcript_path)
|
||
except OSError:
|
||
return
|
||
if size < offset:
|
||
# 파일이 교체됐을 수 있음 — 처음부터 다시 읽는다
|
||
offset = 0
|
||
if size == offset:
|
||
return
|
||
|
||
accum = state["accum"]
|
||
try:
|
||
with open(transcript_path, "rb") as f:
|
||
f.seek(offset)
|
||
data = f.read(size - offset)
|
||
new_offset = f.tell()
|
||
except Exception:
|
||
return
|
||
|
||
for line in data.splitlines():
|
||
line = line.strip()
|
||
if not line:
|
||
continue
|
||
try:
|
||
entry = json.loads(line)
|
||
except Exception:
|
||
continue
|
||
msg = entry.get("message")
|
||
if not isinstance(msg, dict) or msg.get("role") != "assistant":
|
||
continue
|
||
usage = msg.get("usage")
|
||
if not isinstance(usage, dict):
|
||
continue
|
||
accum["input_tokens"] += int(usage.get("input_tokens", 0) or 0)
|
||
accum["cache_creation_tokens"] += int(
|
||
usage.get("cache_creation_input_tokens", 0) or 0
|
||
)
|
||
accum["cache_read_tokens"] += int(
|
||
usage.get("cache_read_input_tokens", 0) or 0
|
||
)
|
||
accum["output_tokens"] += int(usage.get("output_tokens", 0) or 0)
|
||
|
||
state["offsets"][normalized] = new_offset
|
||
# git post-commit 훅 fallback 용으로 경로 저장
|
||
state["last_transcript"] = normalized
|
||
|
||
|
||
# ─── Aptabase 전송 ────────────────────────────────────────────────────────
|
||
|
||
def make_aptabase_session_id() -> str:
|
||
"""epoch_seconds + 8 random digits (Aptabase 권장 형식)."""
|
||
return f"{int(time.time())}{random.randint(0, 99999999):08d}"
|
||
|
||
|
||
def build_system_props() -> dict:
|
||
return {
|
||
"isDebug": False,
|
||
"locale": os.environ.get("LANG", "en-US").split(".")[0].replace("_", "-"),
|
||
"osName": platform.system(),
|
||
"osVersion": platform.release(),
|
||
"appVersion": "1.0.0",
|
||
"sdkVersion": SDK_VERSION,
|
||
}
|
||
|
||
|
||
def now_iso() -> str:
|
||
return datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%S.%f")[:-3] + "Z"
|
||
|
||
|
||
def post_event(host: str, app_key: str, payload: dict) -> bool:
|
||
url = host.rstrip("/") + "/api/v0/event"
|
||
body = json.dumps(payload).encode("utf-8")
|
||
req = urllib.request.Request(
|
||
url,
|
||
data=body,
|
||
method="POST",
|
||
headers={
|
||
"Content-Type": "application/json",
|
||
"App-Key": app_key,
|
||
"User-Agent": f"ClaudeCodeHook/{SDK_VERSION}",
|
||
},
|
||
)
|
||
try:
|
||
with urllib.request.urlopen(req, timeout=5) as resp:
|
||
resp.read()
|
||
return True
|
||
except (urllib.error.URLError, urllib.error.HTTPError, TimeoutError, OSError):
|
||
return False
|