From 86f6a6f1fe3d7660d10dc646e065cd0e5914a1af Mon Sep 17 00:00:00 2001 From: kyy Date: Wed, 3 Sep 2025 11:20:46 +0900 Subject: [PATCH] First commit --- .env | 4 + .gitattributes | 32 +++++ .gitignore | 11 ++ gitea-172.16.10.175.crt | 0 run_main.sh | 26 +++++ src/__init__.py | 0 src/clients/__init__.py | 0 src/clients/grafana_client.py | 50 ++++++++ src/clients/loki.py | 64 ++++++++++ src/clients/prom.py | 57 +++++++++ src/main.py | 123 +++++++++++++++++++ src/services/__init__.py | 0 src/services/backup/summarizer_avg.py | 70 +++++++++++ src/services/backup/summarizer_prom.py | 156 +++++++++++++++++++++++++ src/services/dashboard.py | 75 ++++++++++++ src/services/reporter.py | 27 +++++ src/services/summarizer.py | 93 +++++++++++++++ src/setting/__init__.py | 0 src/setting/config.py | 29 +++++ src/utils/timeutils.py | 56 +++++++++ 20 files changed, 873 insertions(+) create mode 100644 .env create mode 100644 .gitattributes create mode 100644 .gitignore create mode 100644 gitea-172.16.10.175.crt create mode 100644 run_main.sh create mode 100644 src/__init__.py create mode 100644 src/clients/__init__.py create mode 100644 src/clients/grafana_client.py create mode 100644 src/clients/loki.py create mode 100644 src/clients/prom.py create mode 100644 src/main.py create mode 100644 src/services/__init__.py create mode 100644 src/services/backup/summarizer_avg.py create mode 100644 src/services/backup/summarizer_prom.py create mode 100644 src/services/dashboard.py create mode 100644 src/services/reporter.py create mode 100644 src/services/summarizer.py create mode 100644 src/setting/__init__.py create mode 100644 src/setting/config.py create mode 100644 src/utils/timeutils.py diff --git a/.env b/.env new file mode 100644 index 0000000..9d5c0e9 --- /dev/null +++ b/.env @@ -0,0 +1,4 @@ +GRAFANA_URL=https://grafana.hmac.kr +GRAFANA_API_KEY=glsa_pYc0nl8UhGk8Hk8W8q1fH8pKLh7n5fl7_fe30a7ce +MATTERMOST_WEBHOOK=https://mm.hmac.kr/hooks/87rouf6cwif3u84x4jxfbmmf8o +GRAFANA_DASHBOARD_UID=f52ae37b-037b-4110-adc0-2171ec068704 \ No newline at end of file diff --git a/.gitattributes b/.gitattributes new file mode 100644 index 0000000..b60060f --- /dev/null +++ b/.gitattributes @@ -0,0 +1,32 @@ +*.7z filter=lfs diff=lfs merge=lfs -text +*.arrow filter=lfs diff=lfs merge=lfs -text +*.bin filter=lfs diff=lfs merge=lfs -text +*.bz2 filter=lfs diff=lfs merge=lfs -text +*.ftz filter=lfs diff=lfs merge=lfs -text +*.gz filter=lfs diff=lfs merge=lfs -text +*.h5 filter=lfs diff=lfs merge=lfs -text +*.joblib filter=lfs diff=lfs merge=lfs -text +*.lfs.* filter=lfs diff=lfs merge=lfs -text +*.model filter=lfs diff=lfs merge=lfs -text +*.msgpack filter=lfs diff=lfs merge=lfs -text +*.npy filter=lfs diff=lfs merge=lfs -text +*.npz filter=lfs diff=lfs merge=lfs -text +*.onnx filter=lfs diff=lfs merge=lfs -text +*.ot filter=lfs diff=lfs merge=lfs -text +*.parquet filter=lfs diff=lfs merge=lfs -text +*.pickle filter=lfs diff=lfs merge=lfs -text +*.pkl filter=lfs diff=lfs merge=lfs -text +*.pb filter=lfs diff=lfs merge=lfs -text +*.pt filter=lfs diff=lfs merge=lfs -text +*.pth filter=lfs diff=lfs merge=lfs -text +*.rar filter=lfs diff=lfs merge=lfs -text +saved_model/**/* filter=lfs diff=lfs merge=lfs -text +*.tar.* filter=lfs diff=lfs merge=lfs -text +*.tflite filter=lfs diff=lfs merge=lfs -text +*.tgz filter=lfs diff=lfs merge=lfs -text +*.wasm filter=lfs diff=lfs merge=lfs -text +*.xz filter=lfs diff=lfs merge=lfs -text +*.zip filter=lfs diff=lfs merge=lfs -text +*.zst filter=lfs diff=lfs merge=lfs -text +*tfevents* filter=lfs diff=lfs merge=lfs -text +*.pdf filter=lfs diff=lfs merge=lfs -text diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..88bd011 --- /dev/null +++ b/.gitignore @@ -0,0 +1,11 @@ +# 캐시 및 임시 파일 무시 +__pycache__/ +**/__pycache__/ +**/**/__pycache__/ +*.py[cod] +.ruff_cache/ +.pytest_cache/ + +# 로그/업로드 디렉토리 무시 +logs/ +cached/ \ No newline at end of file diff --git a/gitea-172.16.10.175.crt b/gitea-172.16.10.175.crt new file mode 100644 index 0000000..e69de29 diff --git a/run_main.sh b/run_main.sh new file mode 100644 index 0000000..c36147a --- /dev/null +++ b/run_main.sh @@ -0,0 +1,26 @@ +#!/usr/bin/env bash + +set -a +source .env +set +a + +LOG_DIR="logs" +mkdir -p "${LOG_DIR}" + +ABSOLUTE_RANGE=7d # 24h +TS="$(date +%Y%m%d_%H%M%S)" + +LOG_FILE="${LOG_DIR}/report_${ABSOLUTE_RANGE}_${TS}.log" + +(cd src && python3 -m main \ + --range "${ABSOLUTE_RANGE}") \ + >> "${LOG_FILE}" 2>&1 & + +echo "[OK] Started background job." +echo "[OK] Logging to ${LOG_FILE}" + +# # crontab -e +# # 매일 09:00 KST에 지난 24시간 보고 +# 0 9 * * * /usr/bin/env bash -lc 'cd /opt/monitor && /usr/bin/python3 grafana_dash_pull_and_alert.py --range 24h' +# # 매주 월요일 09:30 KST에 지난 7일 보고 +# 30 9 * * 1 /usr/bin/env bash -lc 'cd /opt/monitor && /usr/bin/python3 grafana_dash_pull_and_alert.py --range 7d' \ No newline at end of file diff --git a/src/__init__.py b/src/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/clients/__init__.py b/src/clients/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/clients/grafana_client.py b/src/clients/grafana_client.py new file mode 100644 index 0000000..031fa21 --- /dev/null +++ b/src/clients/grafana_client.py @@ -0,0 +1,50 @@ +import requests +from typing import Any, Dict, Optional + +class GrafanaClient: + def __init__(self, base_url: str, api_key: str): + self.base_url = base_url.rstrip("/") + self.headers = { + "Authorization": f"Bearer {api_key}", + "Content-Type": "application/json", + "Accept": "application/json", + } + + # ---- Dashboards ---- + def get_dashboard_by_uid(self, uid: str) -> Dict[str, Any]: + r = requests.get(f"{self.base_url}/api/dashboards/uid/{uid}", headers=self.headers, timeout=30) + r.raise_for_status() + return r.json()["dashboard"] + + # ---- Datasources ---- + def get_datasource_by_uid(self, uid: str) -> Optional[Dict[str, Any]]: + r = requests.get(f"{self.base_url}/api/datasources/uid/{uid}", headers=self.headers, timeout=30) + if r.status_code != 200: + return None + return r.json() + + def list_datasources(self) -> Dict[str, Any]: + r = requests.get(f"{self.base_url}/api/datasources", headers=self.headers, timeout=30) + r.raise_for_status() + return r.json() + + # ---- Proxy calls ---- + def prom_query_range(self, ds_id: int, expr: str, start_epoch: int, end_epoch: int, step_sec: int) -> Dict[str, Any]: + r = requests.get( + f"{self.base_url}/api/datasources/proxy/{ds_id}/api/v1/query_range", + headers=self.headers, + params={"query": expr, "start": start_epoch, "end": end_epoch, "step": step_sec}, + timeout=60 + ) + r.raise_for_status() + return r.json() + + def loki_query_range(self, ds_id: int, query: str, start_epoch: int, end_epoch: int, step_sec: int) -> Dict[str, Any]: + r = requests.get( + f"{self.base_url}/api/datasources/proxy/{ds_id}/loki/api/v1/query_range", + headers=self.headers, + params={"query": query, "start": start_epoch*1000000000, "end": end_epoch*1000000000, "step": step_sec}, + timeout=90 + ) + r.raise_for_status() + return r.json() diff --git a/src/clients/loki.py b/src/clients/loki.py new file mode 100644 index 0000000..c905987 --- /dev/null +++ b/src/clients/loki.py @@ -0,0 +1,64 @@ +from typing import Any, Dict, Optional + +from utils.timeutils import step_to_logql_range + + +def is_loki(ds: Dict[str, Any]) -> bool: + return bool(ds) and (ds.get("type", "").lower() == "loki") + + +def summarize_loki_streams_by_count( + expr: str, + loki_query_range_fn, + ds_id: int, + start_epoch: int, + end_epoch: int, + step_sec: int, +) -> (Optional[Dict[str, Any]], Optional[str]): + """ + streams 결과만 나오는 LogQL을 matrix로 변환: + sum by () (count_over_time(()[STEP])) + """ + win = step_to_logql_range(step_sec) + wrapped = f"sum by () (count_over_time(({expr})[{win}]))" + resp = loki_query_range_fn(ds_id, wrapped, start_epoch, end_epoch, step_sec) + if resp.get("status") != "success": + return None, "조회 실패" + + data = resp.get("data", {}) + if data.get("resultType") != "matrix": + return None, f"예상 외 resultType: {data.get('resultType')}" + + result = data.get("result", []) + if not result: + return None, "데이터 없음" + + values = result[0].get("values", []) + counts = [] + for ts, val in values: + try: + v = float(val) + except Exception: + v = 0.0 + counts.append(v) + + total = sum(counts) + duration_sec = end_epoch - start_epoch + avg_rps = (total / duration_sec) if duration_sec > 0 else None + max_bucket = max(counts) if counts else None + return { + "total": total, + "avg_rps": avg_rps, + "max_bucket": max_bucket, + "bucket_range": win, + }, None + + +def format_loki_matrix_block(label: str, merged: Optional[Dict[str, Any]]) -> str: + if not merged: + return f" - `{label}` (loki): 데이터 없음" + + def fmt(x): + return "n/a" if x is None else f"{x:.4g}" + + return f" - `{label}` (loki) → avg **{fmt(merged['avg'])}**, p95 **{fmt(merged['p95'])}**, max **{fmt(merged['max'])}**" diff --git a/src/clients/prom.py b/src/clients/prom.py new file mode 100644 index 0000000..6171d21 --- /dev/null +++ b/src/clients/prom.py @@ -0,0 +1,57 @@ +import math +from typing import Any, Dict, List, Optional, Tuple + + +def is_prometheus(ds: Dict[str, Any]) -> bool: + return bool(ds) and (ds.get("type", "").lower() == "prometheus") + + +def p95(values: List[float]) -> Optional[float]: + if not values: + return None + arr = sorted(values) + idx = int(math.ceil(0.95 * len(arr))) - 1 + idx = max(0, min(idx, len(arr) - 1)) + return arr[idx] + + +def summarize_series(samples: List[Tuple[int, float]]) -> Dict[str, Optional[float]]: + vals = [ + v + for _, v in samples + if v is not None and not math.isnan(v) and not math.isinf(v) + ] + if not vals: + return {"avg": None, "p95": None, "max": None} + return {"avg": sum(vals) / len(vals), "p95": p95(vals), "max": max(vals)} + + +def summarize_matrix_result( + resp: Dict[str, Any], +) -> Optional[Dict[str, Optional[float]]]: + series = resp.get("data", {}).get("result", []) + if not series: + return None + series_summaries = [] + for s in series: + samples = [] + for ts, val in s.get("values", []): + try: + v = float(val) + except Exception: + v = None + samples.append((int(float(ts)), v)) + series_summaries.append(summarize_series(samples)) + + def safe_mean(xs): + xs = [x for x in xs if x is not None] + return sum(xs) / len(xs) if xs else None + + merged = { + "avg": safe_mean([s["avg"] for s in series_summaries]), + "p95": safe_mean([s["p95"] for s in series_summaries]), + "max": max( + [s["max"] for s in series_summaries if s["max"] is not None], default=None + ), + } + return merged diff --git a/src/main.py b/src/main.py new file mode 100644 index 0000000..43b955f --- /dev/null +++ b/src/main.py @@ -0,0 +1,123 @@ +import argparse +from datetime import timedelta + +from clients.grafana_client import GrafanaClient +from clients.loki import is_loki +from services.dashboard import ( + extract_targets, + flatten_panels, + panel_datasource_resolver, +) +from services.reporter import post_mattermost +from services.summarizer import compute_total_for_target +from setting.config import AppConfig +from utils.timeutils import now_kst, to_epoch + + +def parse_args(): + p = argparse.ArgumentParser() + p.add_argument("--range", choices=["7d", "24h", "1d"], required=True) + return p.parse_args() + + +def main(): + args = parse_args() + cfg = AppConfig.load() # env에서 URL, API KEY, 대시보드 UID, 웹훅 + gf = GrafanaClient( + cfg.grafana_url, cfg.grafana_api_key + ) # Grafana REST 호출용 클라이언트 생성 + + # 시간 범위 설정 + now = now_kst() + if args.range in ("24h", "1d"): + start = now - timedelta(days=1) + range_label = "지난 24시간" + else: + start = now - timedelta(days=7) + range_label = "지난 7일" + + start_epoch, end_epoch = to_epoch(start), to_epoch(now) + # step = step_for_range(end_epoch - start_epoch) + step = 21600 # 6시간 + + # 대시보드에서 패널 추출 + dash = gf.get_dashboard_by_uid(cfg.grafana_dashboard_uid) + panels = flatten_panels(dash) + + lines = [] + lines.append( + f"**LLM Gateway Unified Monitoring 요약** \n기간: {range_label} ({start.strftime('%Y-%m-%d %H:%M')} ~ {now.strftime('%Y-%m-%d %H:%M')} KST)" + ) + lines.append("") + + skipped = [] + counted = 0 + all_ds = gf.list_datasources() + + # 모든 패널 순회 + for p in panels: + title = p.get("title") or "(제목 없음)" + # 패널에 연결된 데이터소스 해석 + ds = panel_datasource_resolver(p, gf.get_datasource_by_uid, lambda: all_ds) + + if not ds or ds.get("id") is None: + skipped.append(f"- `{title}` (데이터소스 없음)") + continue + + # Loki 데이터소스가 아니면 건너뜀 (Prometheus 패널 제외) + if not is_loki(ds): + continue + + # 패널에 정의된 쿼리(target) 추출 (expr + legend) + targets = extract_targets(p) + if not targets: + skipped.append(f"- `{title}` (쿼리 없음)") + continue + + block = [f"**• {title}**"] + any_series = False + + # 각 target 쿼리(expr) 순회 + for t in targets: + expr = t["expr"] + legend = t["legend"] or "" + try: + # Loki 쿼리를 실행해 total 합계 계산 + line = compute_total_for_target( + ds=ds, + ds_id=ds["id"], + legend=legend, + expr=expr, + start_epoch=start_epoch, + end_epoch=end_epoch, + step_sec=step, + # prom_query_range_fn=gf.prom_query_range, + loki_query_range_fn=gf.loki_query_range, + ) + if line is None: + continue + + block.append(line) + if "total **" in line: + any_series = True + except Exception as e: + block.append(f" - `{legend}`: 오류 - {e}") + + if any_series: + counted += 1 + lines.append("\n".join(block)) + lines.append("") + + if skipped: + lines.append( + "
건너뛴 패널\n\n" + + "\n".join(skipped) + + "\n\n
" + ) + + post_mattermost(cfg.mattermost_webhook, lines) + print(f"[OK] Sent summary for {counted} panels.") + + +if __name__ == "__main__": + main() diff --git a/src/services/__init__.py b/src/services/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/services/backup/summarizer_avg.py b/src/services/backup/summarizer_avg.py new file mode 100644 index 0000000..26b1159 --- /dev/null +++ b/src/services/backup/summarizer_avg.py @@ -0,0 +1,70 @@ +from typing import Any, Dict + +from clients.loki import format_loki_matrix_block, is_loki, summarize_loki_streams_by_count +from clients.prom import is_prometheus, summarize_matrix_result + + +def summarize_target_block( + ds: Dict[str, Any], + ds_id: int, + legend: str, + expr: str, + start_epoch: int, + end_epoch: int, + step_sec: int, + prom_query_range_fn, + loki_query_range_fn, +) -> str: + label = legend if legend else "(no legend)" + + # Prometheus + if is_prometheus(ds): + resp = prom_query_range_fn(ds_id, expr, start_epoch, end_epoch, step_sec) + if resp.get("status") != "success": + return f" - `{label}`: 조회 실패" + + data = resp.get("data", {}) + rtype = data.get("resultType") + if rtype != "matrix": + if rtype == "vector": + return f" - `{label}`: resultType=vector (범위 결과 없음)" + return f" - `{label}`: 지원하지 않는 resultType: {rtype}" + + merged = summarize_matrix_result(resp) + if not merged: + return f" - `{label}`: 데이터 없음" + + def fmt(x): + return "n/a" if x is None else f"{x:.4g}" + + return f" - `{label}` → avg **{fmt(merged['avg'])}**, p95 **{fmt(merged['p95'])}**, max **{fmt(merged['max'])}**" + + # Loki + if is_loki(ds): + resp = loki_query_range_fn(ds_id, expr, start_epoch, end_epoch, step_sec) + if resp.get("status") != "success": + return f" - `{label}`: 조회 실패" + + data = resp.get("data", {}) + rtype = data.get("resultType") + if rtype == "matrix": + merged = summarize_matrix_result(resp) + return format_loki_matrix_block(label, merged) + if rtype == "streams": + agg, err = summarize_loki_streams_by_count( + expr, loki_query_range_fn, ds_id, start_epoch, end_epoch, step_sec + ) + if err: + return f" - `{label}` (loki): {err}" + + def fmt(x): + return "n/a" if x is None else f"{x:.4g}" + + return ( + f" - `{label}` (loki) → total **{fmt(agg['total'])}**, " + f"avg_rps **{fmt(agg['avg_rps'])}**, " + f"max_{agg['bucket_range']} **{fmt(agg['max_bucket'])}**" + ) + return f" - `{label}`: 지원하지 않는 resultType: {rtype}" + + return f" - `{label}`: 미지원 데이터소스 {ds.get('type')}" diff --git a/src/services/backup/summarizer_prom.py b/src/services/backup/summarizer_prom.py new file mode 100644 index 0000000..d761a51 --- /dev/null +++ b/src/services/backup/summarizer_prom.py @@ -0,0 +1,156 @@ +from typing import Any, Dict, Optional + +from clients.loki import is_loki +from clients.prom import is_prometheus +from utils.timeutils import step_to_logql_range + + +def _sum_values_from_matrix(resp: Dict[str, Any]) -> float: + """matrix 결과의 모든 시계열 샘플 값을 합산""" + total = 0.0 + for s in resp.get("data", {}).get("result", []): + # 각 시계열 안의 values = [(timestamp, value), ...] + for ts, val in s.get("values", []): + try: + total += float(val) # 숫자로 변환 후 합계 + except Exception: + continue + return total + + +def _expr_looks_like_rate(expr: str) -> bool: + """쿼리식이 rate/irate/increase/sum_over_time 계열인지 판별""" + e = expr.replace(" ", "").lower() + return ( + ("rate(" in e) + or ("irate(" in e) + or ("increase(" in e) + or ("sum_over_time(" in e) + ) + + +def _prom_total_via_wrapper( + expr: str, + prom_query_range_fn, + ds_id: int, + start_epoch: int, + end_epoch: int, + step_sec: int, +) -> Optional[float]: + """ + 1) sum_over_time(()[STEP]) 로 감싸서 non-overlap 버킷 합계를 만든 뒤 합산. + - 가 rate()/increase() 등 insta​nt vector여도 동작 + - STEP=쿼리 step과 동일 → 겹치지 않는 버킷 + 실패 시 None 반환. + """ + win = step_to_logql_range(step_sec) + wrapped = f"sum_over_time(( {expr} )[{win}])" + try: + resp = prom_query_range_fn(ds_id, wrapped, start_epoch, end_epoch, step_sec) + if resp.get("status") != "success": + return None + if resp.get("data", {}).get("resultType") != "matrix": + return None + return _sum_values_from_matrix(resp) + except Exception: + return None + + +def _prom_total_fallback( + expr: str, resp: Dict[str, Any], step_sec: int +) -> Optional[float]: + """ + fallback: matrix 값을 그대로 합산(=버킷 합) + - 만약 expr가 rate()/irate() 라면 '값*step'로 적분 후 합산 + - 이미 increase()/sum_over_time() 결과면 값 자체가 버킷 카운트이므로 그대로 합산 + """ + result = resp.get("data", {}).get("result", []) + if not result: + return 0.0 + + looks_rate = _expr_looks_like_rate(expr) + total = 0.0 + for s in result: + for ts, val in s.get("values", []): + try: + v = float(val) + except Exception: + continue + if looks_rate and ( + "rate(" in expr.replace(" ", "").lower() + or "irate(" in expr.replace(" ", "").lower() + ): + total += v * step_sec # rate → 적분 + else: + total += v # increase()/sum_over_time() 또는 이미 카운트 버킷 + return total + + +def compute_total_for_target( + ds: Dict[str, Any], + ds_id: int, + legend: str, + expr: str, + start_epoch: int, + end_epoch: int, + step_sec: int, + prom_query_range_fn, + loki_query_range_fn, +) -> str: + label = legend if legend else "(no legend)" + + # --- Prometheus --- + if is_prometheus(ds): + # 1) 래퍼로 비겹치는 버킷 합 구하기 시도 + wrapped_total = _prom_total_via_wrapper( + expr, prom_query_range_fn, ds_id, start_epoch, end_epoch, step_sec + ) + if wrapped_total is not None: + return f" - `{label}` → total **{wrapped_total:.4g}**" + + # 2) 실패 시 원 쿼리로 range 실행 후 합계 추정 + try: + resp = prom_query_range_fn(ds_id, expr, start_epoch, end_epoch, step_sec) + if resp.get("status") != "success": + return f" - `{label}`: 조회 실패" + if resp.get("data", {}).get("resultType") != "matrix": + return f" - `{label}`: 지원하지 않는 resultType: {resp.get('data', {}).get('resultType')}" + total = _prom_total_fallback(expr, resp, step_sec) + return f" - `{label}` → total **{(total or 0.0):.4g}**" + except Exception as e: + return f" - `{label}`: 오류 - {e}" + + # --- Loki --- + if is_loki(ds): + try: + # 1) 그대로 실행 + resp = loki_query_range_fn(ds_id, expr, start_epoch, end_epoch, step_sec) + if resp.get("status") != "success": + return f" - `{label}`: 조회 실패" + rtype = resp.get("data", {}).get("resultType") + + if rtype == "matrix": + # 이미 숫자 시계열(카운트/비율 등)이면 합계 + total = _sum_values_from_matrix(resp) + return f" - `{label}` (loki) → total **{total:.4g}**" + + if rtype == "streams": + # streams면 count_over_time로 버킷 카운트화(비겹침) 후 합계 + win = step_to_logql_range(step_sec) + wrapped = f"sum by () (count_over_time(({expr})[{win}]))" + r2 = loki_query_range_fn( + ds_id, wrapped, start_epoch, end_epoch, step_sec + ) + if r2.get("status") != "success": + return f" - `{label}` (loki): 조회 실패" + if r2.get("data", {}).get("resultType") != "matrix": + return f" - `{label}` (loki): 예상 외 resultType: {r2.get('data', {}).get('resultType')}" + total = _sum_values_from_matrix(r2) + return f" - `{label}` (loki) → total **{total:.4g}**" + + return f" - `{label}`: 지원하지 않는 resultType: {rtype}" + + except Exception as e: + return f" - `{label}`: 오류 - {e}" + + return f" - `{label}`: 미지원 데이터소스 {ds.get('type')}" diff --git a/src/services/dashboard.py b/src/services/dashboard.py new file mode 100644 index 0000000..23a53a3 --- /dev/null +++ b/src/services/dashboard.py @@ -0,0 +1,75 @@ +import logging +from typing import Any, Dict, List + +logger = logging.getLogger(__name__) + + +def flatten_panels(dashboard: Dict[str, Any]) -> List[Dict[str, Any]]: + """ + 대시보드 JSON 구조 안의 패널들을 flatten 해서 리스트로 반환. + - row 패널 안의 panels도 재귀적으로 펼침. + """ + out: List[Dict[str, Any]] = [] + + def walk(panels): + for p in panels: + if p.get("type") == "row" and p.get("panels"): + logger.info(f"[FLATTEN ROW] 포함된 패널 수={len(p['panels'])}") + walk(p["panels"]) + else: + logger.info(f"[ADD PANEL] title={p.get('title')}, type={p.get('type')}") + out.append(p) + + panels = dashboard.get("panels", []) + logger.info(f"[DASHBOARD PANELS] count={len(panels)}") + walk(panels) + logger.info(f"[FLATTEN RESULT] 총 패널 수={len(out)}") + return out + + +def panel_datasource_resolver( + panel: Dict[str, Any], get_ds_by_uid, list_ds +) -> Dict[str, Any] | None: + """ + 패널에 연결된 데이터소스를 찾아 반환. + - dict(uid) 타입이면 get_ds_by_uid 사용 + - str 타입이면 list_ds() 안에서 name으로 검색 + """ + ds = panel.get("datasource") + if not ds: + logger.info(f"[DATASOURCE RESOLVE] panel '{panel.get('title')}' → 없음") + return None + + if isinstance(ds, dict) and "uid" in ds: + resolved = get_ds_by_uid(ds["uid"]) + logger.info(f"[DATASOURCE RESOLVE] uid={ds['uid']} → {resolved}") + return resolved + + if isinstance(ds, str): + for item in list_ds(): + if item.get("name") == ds: + logger.info(f"[DATASOURCE RESOLVE] name={ds} → {item}") + return item + logger.info( + f"[DATASOURCE RESOLVE FAIL] panel '{panel.get('title')}' datasource={ds}" + ) + return None + + +def extract_targets(panel: Dict[str, Any]) -> List[Dict[str, str]]: + """ + 패널 안의 target(expr, legend)을 추출해서 리스트로 반환. + """ + outs = [] + for t in panel.get("targets", []): + expr = t.get("expr") or t.get("query") or t.get("expression") + if expr: + legend = t.get("legendFormat") or t.get("refId") or "" + outs.append({"expr": expr, "legend": legend}) + logger.info( + f"[EXTRACT TARGET] panel='{panel.get('title')}', expr={expr}, legend={legend}" + ) + logger.info( + f"[EXTRACT TARGETS DONE] panel='{panel.get('title')}', count={len(outs)}" + ) + return outs diff --git a/src/services/reporter.py b/src/services/reporter.py new file mode 100644 index 0000000..9f58f18 --- /dev/null +++ b/src/services/reporter.py @@ -0,0 +1,27 @@ +import logging +from typing import List + +import requests + +logger = logging.getLogger(__name__) + + +def post_mattermost(webhook: str, lines: List[str]) -> None: + """ + Mattermost Webhook으로 메시지 전송. + """ + payload = { + "username": "Grafana Reporter", + "icon_url": "https://grafana.com/static/assets/img/fav32.png", + "text": "\n".join(lines), + } + logger.info(f"[MATTERMOST POST] url={webhook}") + logger.info(f"[MATTERMOST PAYLOAD] {payload}") + + try: + r = requests.post(webhook, json=payload, timeout=30) + r.raise_for_status() + logger.info(f"[MATTERMOST STATUS] {r.status_code}") + except Exception as e: + logger.exception(f"[MATTERMOST ERROR] {e}") + raise diff --git a/src/services/summarizer.py b/src/services/summarizer.py new file mode 100644 index 0000000..95f4a26 --- /dev/null +++ b/src/services/summarizer.py @@ -0,0 +1,93 @@ +import logging +from typing import Any, Dict + +from clients.loki import is_loki +from utils.timeutils import step_to_logql_range + +logging.basicConfig(level=logging.INFO, format="%(message)s") +logger = logging.getLogger(__name__) + + +def _sum_values_from_matrix(resp: Dict[str, Any]) -> float: + """ + Loki range API resultType == "matrix" → 모든 values를 합산 + """ + total = 0.0 + results = resp.get("data", {}).get("result", []) + logger.info(f"[MATRIX RESULT COUNT] {len(results)} series") + + for s in results: + values = s.get("values", []) + logger.info(f"[SERIES LENGTH] {len(values)} samples") + for ts, val in values: + try: + v = float(val) + total += v + except Exception: + logger.info(f"[SKIP VALUE] {val}") + continue + logger.info(f"[MATRIX TOTAL] {total}") + return total + + +def compute_total_for_target( + ds: Dict[str, Any], + ds_id: int, + legend: str, + expr: str, + start_epoch: int, + end_epoch: int, + step_sec: int, + # prom_query_range_fn, + loki_query_range_fn, +) -> str: + label = legend if legend else "(no legend)" + + # --- Loki --- + if is_loki(ds): + try: + step_sec = 21600 + logger.info(f"[TARGET START] legend={label}, expr={expr}") + + # 1) 그대로 실행 + resp = loki_query_range_fn(ds_id, expr, start_epoch, end_epoch, step_sec) + logger.info(f"[RAW RESPONSE STATUS] {resp.get('status')}") + if resp.get("status") != "success": + return f" - `{label}`: 조회 실패" + + rtype = resp.get("data", {}).get("resultType") + logger.info(f"[RESULT TYPE] {rtype}") + + # (A) 이미 숫자 시계열(예: count_over_time, rate, sum 등) → 단순 합계 + if rtype == "matrix": + total = _sum_values_from_matrix(resp) + return f" - `{label}` → total **{total:.4g}**" + + # (B) 로그 라인 스트림 → count_over_time으로 '버킷 카운트' 시계열로 변환 후 합계 + if rtype == "streams": + win = step_to_logql_range(step_sec) + + # sum by () 로 라벨 제거(단일 시계열로 합산) + wrapped = f"sum by () (count_over_time(({expr})[{win}]))" + logger.info(f"[WRAPPED EXPR] {wrapped}") + + # 변환된 쿼리 실행 + r2 = loki_query_range_fn( + ds_id, wrapped, start_epoch, end_epoch, step_sec + ) + logger.info(f"[WRAPPED RESPONSE STATUS] {r2.get('status')}") + if r2.get("status") != "success": + return f" - `{label}`: 조회 실패" + if r2.get("data", {}).get("resultType") != "matrix": + return f" - `{label}`: 예상 외 resultType: {r2.get('data', {}).get('resultType')}" + total = _sum_values_from_matrix(r2) + logger.info(f"[STREAMS TOTAL AFTER COUNT] {total}") + return f" - `{label}` → total **{total:.4g}**" + + return f" - `{label}`: 지원하지 않는 resultType: {rtype}" + + except Exception as e: + logger.exception(f"[ERROR] {e}") + return f" - `{label}`: 오류 - {e}" + + return f" - `{label}`: 미지원 데이터소스 {ds.get('type')}" diff --git a/src/setting/__init__.py b/src/setting/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/setting/config.py b/src/setting/config.py new file mode 100644 index 0000000..d5a1dc5 --- /dev/null +++ b/src/setting/config.py @@ -0,0 +1,29 @@ +import os +import sys +from dataclasses import dataclass + + +def env(name: str) -> str: + v = os.getenv(name) + if not v: + print(f"[ERR] env {name} is required.", file=sys.stderr) + sys.exit(1) + return v.strip() + + +@dataclass(frozen=True) +class AppConfig: + grafana_url: str + grafana_api_key: str + grafana_dashboard_uid: str + mattermost_webhook: str + + @staticmethod + def load() -> "AppConfig": + url = env("GRAFANA_URL").rstrip("/") + return AppConfig( + grafana_url=url, + grafana_api_key=env("GRAFANA_API_KEY"), + grafana_dashboard_uid=env("GRAFANA_DASHBOARD_UID"), + mattermost_webhook=env("MATTERMOST_WEBHOOK"), + ) diff --git a/src/utils/timeutils.py b/src/utils/timeutils.py new file mode 100644 index 0000000..1978de7 --- /dev/null +++ b/src/utils/timeutils.py @@ -0,0 +1,56 @@ +# timeutils.py +import logging +from datetime import datetime, timedelta, timezone + +# 로깅 설정 +logger = logging.getLogger(__name__) + +KST = timezone(timedelta(hours=9)) + + +def now_kst() -> datetime: + """KST 기준 현재 시간을 반환""" + now = datetime.now(KST) + logger.info(f"[NOW KST] {now}") + return now + + +def to_epoch(dt: datetime) -> int: + """datetime → epoch 초 단위로 변환""" + epoch = int(dt.timestamp()) + logger.info(f"[TO EPOCH] {dt} → {epoch}") + return epoch + + +def step_for_range(seconds: int) -> int: + """ + 조회 기간(초 단위)에 따라 step 크기(버킷 간격)를 결정 + - 24h 이내: 300초(5m) + - 3d 이내: 900초(15m) + - 그 이상: 1800초(30m) + """ + if seconds <= 24 * 3600: # 24h + step = 300 # 5m + elif seconds <= 3 * 24 * 3600: # 3d + step = 900 # 15m + else: + step = 1800 # 30m + + logger.info(f"[STEP FOR RANGE] seconds={seconds}, step={step}") + return step + + +def step_to_logql_range(step_sec: int) -> str: + """ + step 값을 LogQL range 윈도 문자열로 변환 + ex) 300 → '5m', 900 → '15m', 3600 → '1h' + """ + if step_sec % 3600 == 0: + win = f"{step_sec // 3600}h" + elif step_sec % 60 == 0: + win = f"{step_sec // 60}m" + else: + win = f"{step_sec}s" + + logger.info(f"[STEP TO LOGQL RANGE] step_sec={step_sec}, window={win}") + return win