First commit

This commit is contained in:
kyy
2025-09-03 11:20:46 +09:00
commit 86f6a6f1fe
20 changed files with 873 additions and 0 deletions

0
src/services/__init__.py Normal file
View File

View File

@@ -0,0 +1,70 @@
from typing import Any, Dict
from clients.loki import format_loki_matrix_block, is_loki, summarize_loki_streams_by_count
from clients.prom import is_prometheus, summarize_matrix_result
def summarize_target_block(
ds: Dict[str, Any],
ds_id: int,
legend: str,
expr: str,
start_epoch: int,
end_epoch: int,
step_sec: int,
prom_query_range_fn,
loki_query_range_fn,
) -> str:
label = legend if legend else "(no legend)"
# Prometheus
if is_prometheus(ds):
resp = prom_query_range_fn(ds_id, expr, start_epoch, end_epoch, step_sec)
if resp.get("status") != "success":
return f" - `{label}`: 조회 실패"
data = resp.get("data", {})
rtype = data.get("resultType")
if rtype != "matrix":
if rtype == "vector":
return f" - `{label}`: resultType=vector (범위 결과 없음)"
return f" - `{label}`: 지원하지 않는 resultType: {rtype}"
merged = summarize_matrix_result(resp)
if not merged:
return f" - `{label}`: 데이터 없음"
def fmt(x):
return "n/a" if x is None else f"{x:.4g}"
return f" - `{label}` → avg **{fmt(merged['avg'])}**, p95 **{fmt(merged['p95'])}**, max **{fmt(merged['max'])}**"
# Loki
if is_loki(ds):
resp = loki_query_range_fn(ds_id, expr, start_epoch, end_epoch, step_sec)
if resp.get("status") != "success":
return f" - `{label}`: 조회 실패"
data = resp.get("data", {})
rtype = data.get("resultType")
if rtype == "matrix":
merged = summarize_matrix_result(resp)
return format_loki_matrix_block(label, merged)
if rtype == "streams":
agg, err = summarize_loki_streams_by_count(
expr, loki_query_range_fn, ds_id, start_epoch, end_epoch, step_sec
)
if err:
return f" - `{label}` (loki): {err}"
def fmt(x):
return "n/a" if x is None else f"{x:.4g}"
return (
f" - `{label}` (loki) → total **{fmt(agg['total'])}**, "
f"avg_rps **{fmt(agg['avg_rps'])}**, "
f"max_{agg['bucket_range']} **{fmt(agg['max_bucket'])}**"
)
return f" - `{label}`: 지원하지 않는 resultType: {rtype}"
return f" - `{label}`: 미지원 데이터소스 {ds.get('type')}"

View File

@@ -0,0 +1,156 @@
from typing import Any, Dict, Optional
from clients.loki import is_loki
from clients.prom import is_prometheus
from utils.timeutils import step_to_logql_range
def _sum_values_from_matrix(resp: Dict[str, Any]) -> float:
"""matrix 결과의 모든 시계열 샘플 값을 합산"""
total = 0.0
for s in resp.get("data", {}).get("result", []):
# 각 시계열 안의 values = [(timestamp, value), ...]
for ts, val in s.get("values", []):
try:
total += float(val) # 숫자로 변환 후 합계
except Exception:
continue
return total
def _expr_looks_like_rate(expr: str) -> bool:
"""쿼리식이 rate/irate/increase/sum_over_time 계열인지 판별"""
e = expr.replace(" ", "").lower()
return (
("rate(" in e)
or ("irate(" in e)
or ("increase(" in e)
or ("sum_over_time(" in e)
)
def _prom_total_via_wrapper(
expr: str,
prom_query_range_fn,
ds_id: int,
start_epoch: int,
end_epoch: int,
step_sec: int,
) -> Optional[float]:
"""
1) sum_over_time((<expr>)[STEP]) 로 감싸서 non-overlap 버킷 합계를 만든 뒤 합산.
- <expr>가 rate()/increase() 등 instant vector여도 동작
- STEP=쿼리 step과 동일 → 겹치지 않는 버킷
실패 시 None 반환.
"""
win = step_to_logql_range(step_sec)
wrapped = f"sum_over_time(( {expr} )[{win}])"
try:
resp = prom_query_range_fn(ds_id, wrapped, start_epoch, end_epoch, step_sec)
if resp.get("status") != "success":
return None
if resp.get("data", {}).get("resultType") != "matrix":
return None
return _sum_values_from_matrix(resp)
except Exception:
return None
def _prom_total_fallback(
expr: str, resp: Dict[str, Any], step_sec: int
) -> Optional[float]:
"""
fallback: matrix 값을 그대로 합산(=버킷 합)
- 만약 expr가 rate()/irate() 라면 '값*step'로 적분 후 합산
- 이미 increase()/sum_over_time() 결과면 값 자체가 버킷 카운트이므로 그대로 합산
"""
result = resp.get("data", {}).get("result", [])
if not result:
return 0.0
looks_rate = _expr_looks_like_rate(expr)
total = 0.0
for s in result:
for ts, val in s.get("values", []):
try:
v = float(val)
except Exception:
continue
if looks_rate and (
"rate(" in expr.replace(" ", "").lower()
or "irate(" in expr.replace(" ", "").lower()
):
total += v * step_sec # rate → 적분
else:
total += v # increase()/sum_over_time() 또는 이미 카운트 버킷
return total
def compute_total_for_target(
ds: Dict[str, Any],
ds_id: int,
legend: str,
expr: str,
start_epoch: int,
end_epoch: int,
step_sec: int,
prom_query_range_fn,
loki_query_range_fn,
) -> str:
label = legend if legend else "(no legend)"
# --- Prometheus ---
if is_prometheus(ds):
# 1) 래퍼로 비겹치는 버킷 합 구하기 시도
wrapped_total = _prom_total_via_wrapper(
expr, prom_query_range_fn, ds_id, start_epoch, end_epoch, step_sec
)
if wrapped_total is not None:
return f" - `{label}` → total **{wrapped_total:.4g}**"
# 2) 실패 시 원 쿼리로 range 실행 후 합계 추정
try:
resp = prom_query_range_fn(ds_id, expr, start_epoch, end_epoch, step_sec)
if resp.get("status") != "success":
return f" - `{label}`: 조회 실패"
if resp.get("data", {}).get("resultType") != "matrix":
return f" - `{label}`: 지원하지 않는 resultType: {resp.get('data', {}).get('resultType')}"
total = _prom_total_fallback(expr, resp, step_sec)
return f" - `{label}` → total **{(total or 0.0):.4g}**"
except Exception as e:
return f" - `{label}`: 오류 - {e}"
# --- Loki ---
if is_loki(ds):
try:
# 1) 그대로 실행
resp = loki_query_range_fn(ds_id, expr, start_epoch, end_epoch, step_sec)
if resp.get("status") != "success":
return f" - `{label}`: 조회 실패"
rtype = resp.get("data", {}).get("resultType")
if rtype == "matrix":
# 이미 숫자 시계열(카운트/비율 등)이면 합계
total = _sum_values_from_matrix(resp)
return f" - `{label}` (loki) → total **{total:.4g}**"
if rtype == "streams":
# streams면 count_over_time로 버킷 카운트화(비겹침) 후 합계
win = step_to_logql_range(step_sec)
wrapped = f"sum by () (count_over_time(({expr})[{win}]))"
r2 = loki_query_range_fn(
ds_id, wrapped, start_epoch, end_epoch, step_sec
)
if r2.get("status") != "success":
return f" - `{label}` (loki): 조회 실패"
if r2.get("data", {}).get("resultType") != "matrix":
return f" - `{label}` (loki): 예상 외 resultType: {r2.get('data', {}).get('resultType')}"
total = _sum_values_from_matrix(r2)
return f" - `{label}` (loki) → total **{total:.4g}**"
return f" - `{label}`: 지원하지 않는 resultType: {rtype}"
except Exception as e:
return f" - `{label}`: 오류 - {e}"
return f" - `{label}`: 미지원 데이터소스 {ds.get('type')}"

75
src/services/dashboard.py Normal file
View File

@@ -0,0 +1,75 @@
import logging
from typing import Any, Dict, List
logger = logging.getLogger(__name__)
def flatten_panels(dashboard: Dict[str, Any]) -> List[Dict[str, Any]]:
"""
대시보드 JSON 구조 안의 패널들을 flatten 해서 리스트로 반환.
- row 패널 안의 panels도 재귀적으로 펼침.
"""
out: List[Dict[str, Any]] = []
def walk(panels):
for p in panels:
if p.get("type") == "row" and p.get("panels"):
logger.info(f"[FLATTEN ROW] 포함된 패널 수={len(p['panels'])}")
walk(p["panels"])
else:
logger.info(f"[ADD PANEL] title={p.get('title')}, type={p.get('type')}")
out.append(p)
panels = dashboard.get("panels", [])
logger.info(f"[DASHBOARD PANELS] count={len(panels)}")
walk(panels)
logger.info(f"[FLATTEN RESULT] 총 패널 수={len(out)}")
return out
def panel_datasource_resolver(
panel: Dict[str, Any], get_ds_by_uid, list_ds
) -> Dict[str, Any] | None:
"""
패널에 연결된 데이터소스를 찾아 반환.
- dict(uid) 타입이면 get_ds_by_uid 사용
- str 타입이면 list_ds() 안에서 name으로 검색
"""
ds = panel.get("datasource")
if not ds:
logger.info(f"[DATASOURCE RESOLVE] panel '{panel.get('title')}' → 없음")
return None
if isinstance(ds, dict) and "uid" in ds:
resolved = get_ds_by_uid(ds["uid"])
logger.info(f"[DATASOURCE RESOLVE] uid={ds['uid']}{resolved}")
return resolved
if isinstance(ds, str):
for item in list_ds():
if item.get("name") == ds:
logger.info(f"[DATASOURCE RESOLVE] name={ds}{item}")
return item
logger.info(
f"[DATASOURCE RESOLVE FAIL] panel '{panel.get('title')}' datasource={ds}"
)
return None
def extract_targets(panel: Dict[str, Any]) -> List[Dict[str, str]]:
"""
패널 안의 target(expr, legend)을 추출해서 리스트로 반환.
"""
outs = []
for t in panel.get("targets", []):
expr = t.get("expr") or t.get("query") or t.get("expression")
if expr:
legend = t.get("legendFormat") or t.get("refId") or ""
outs.append({"expr": expr, "legend": legend})
logger.info(
f"[EXTRACT TARGET] panel='{panel.get('title')}', expr={expr}, legend={legend}"
)
logger.info(
f"[EXTRACT TARGETS DONE] panel='{panel.get('title')}', count={len(outs)}"
)
return outs

27
src/services/reporter.py Normal file
View File

@@ -0,0 +1,27 @@
import logging
from typing import List
import requests
logger = logging.getLogger(__name__)
def post_mattermost(webhook: str, lines: List[str]) -> None:
"""
Mattermost Webhook으로 메시지 전송.
"""
payload = {
"username": "Grafana Reporter",
"icon_url": "https://grafana.com/static/assets/img/fav32.png",
"text": "\n".join(lines),
}
logger.info(f"[MATTERMOST POST] url={webhook}")
logger.info(f"[MATTERMOST PAYLOAD] {payload}")
try:
r = requests.post(webhook, json=payload, timeout=30)
r.raise_for_status()
logger.info(f"[MATTERMOST STATUS] {r.status_code}")
except Exception as e:
logger.exception(f"[MATTERMOST ERROR] {e}")
raise

View File

@@ -0,0 +1,93 @@
import logging
from typing import Any, Dict
from clients.loki import is_loki
from utils.timeutils import step_to_logql_range
logging.basicConfig(level=logging.INFO, format="%(message)s")
logger = logging.getLogger(__name__)
def _sum_values_from_matrix(resp: Dict[str, Any]) -> float:
"""
Loki range API resultType == "matrix" → 모든 values를 합산
"""
total = 0.0
results = resp.get("data", {}).get("result", [])
logger.info(f"[MATRIX RESULT COUNT] {len(results)} series")
for s in results:
values = s.get("values", [])
logger.info(f"[SERIES LENGTH] {len(values)} samples")
for ts, val in values:
try:
v = float(val)
total += v
except Exception:
logger.info(f"[SKIP VALUE] {val}")
continue
logger.info(f"[MATRIX TOTAL] {total}")
return total
def compute_total_for_target(
ds: Dict[str, Any],
ds_id: int,
legend: str,
expr: str,
start_epoch: int,
end_epoch: int,
step_sec: int,
# prom_query_range_fn,
loki_query_range_fn,
) -> str:
label = legend if legend else "(no legend)"
# --- Loki ---
if is_loki(ds):
try:
step_sec = 21600
logger.info(f"[TARGET START] legend={label}, expr={expr}")
# 1) 그대로 실행
resp = loki_query_range_fn(ds_id, expr, start_epoch, end_epoch, step_sec)
logger.info(f"[RAW RESPONSE STATUS] {resp.get('status')}")
if resp.get("status") != "success":
return f" - `{label}`: 조회 실패"
rtype = resp.get("data", {}).get("resultType")
logger.info(f"[RESULT TYPE] {rtype}")
# (A) 이미 숫자 시계열(예: count_over_time, rate, sum 등) → 단순 합계
if rtype == "matrix":
total = _sum_values_from_matrix(resp)
return f" - `{label}` → total **{total:.4g}**"
# (B) 로그 라인 스트림 → count_over_time으로 '버킷 카운트' 시계열로 변환 후 합계
if rtype == "streams":
win = step_to_logql_range(step_sec)
# sum by () 로 라벨 제거(단일 시계열로 합산)
wrapped = f"sum by () (count_over_time(({expr})[{win}]))"
logger.info(f"[WRAPPED EXPR] {wrapped}")
# 변환된 쿼리 실행
r2 = loki_query_range_fn(
ds_id, wrapped, start_epoch, end_epoch, step_sec
)
logger.info(f"[WRAPPED RESPONSE STATUS] {r2.get('status')}")
if r2.get("status") != "success":
return f" - `{label}`: 조회 실패"
if r2.get("data", {}).get("resultType") != "matrix":
return f" - `{label}`: 예상 외 resultType: {r2.get('data', {}).get('resultType')}"
total = _sum_values_from_matrix(r2)
logger.info(f"[STREAMS TOTAL AFTER COUNT] {total}")
return f" - `{label}` → total **{total:.4g}**"
return f" - `{label}`: 지원하지 않는 resultType: {rtype}"
except Exception as e:
logger.exception(f"[ERROR] {e}")
return f" - `{label}`: 오류 - {e}"
return f" - `{label}`: 미지원 데이터소스 {ds.get('type')}"