import csv import json import logging import time from datetime import datetime from pathlib import Path import redis from config.setting import PGN_REDIS_DB, PGN_REDIS_HOST, PGN_REDIS_PORT from fastapi import Depends, Request from utils.checking_keys import get_api_key from utils.request_utils import get_client_ip, get_swagger_port logger = logging.getLogger(__name__) redis_client = redis.Redis( host=PGN_REDIS_HOST, port=PGN_REDIS_PORT, db=PGN_REDIS_DB, decode_responses=True ) def log_user_request( request_info: str, endpoint: str, input_filename: str, model: str, prompt_filename: str, context_length: int, api_key: str, ): client_ip = get_client_ip(request_info) swagger_port = get_swagger_port(request_info) # ✅ 1. CSV 파일 저장 logs_dir = Path("./logs") logs_dir.mkdir(exist_ok=True) csv_path = logs_dir / f"{client_ip}.csv" new_file = not csv_path.exists() with open(csv_path, mode="a", newline="", encoding="utf-8") as f: writer = csv.writer(f) if new_file: writer.writerow( [ "timestamp", "swagger_port", "endpoint", "input_filename", "prompt_filename", "model", "context_length", "api_key", ] ) writer.writerow( [ time.strftime("%Y-%m-%d %H:%M:%S"), swagger_port, endpoint, input_filename, prompt_filename, model, context_length, api_key, ] ) # ✅ 2. Loki용 JSON 로그 출력 logger.info( json.dumps( { "ip": client_ip, "swagger_port": swagger_port, "endpoint": endpoint, "input_filename": input_filename, "prompt_filename": prompt_filename, "model": model, "context_length": context_length, "api_key": api_key, "event": "inference_log", } ) ) def log_generation_info(custom_mode: bool, user_prompt: str = ""): logger.info(f"[GENERATE-PROMPT-USED] 사용자 정의 프롬프트 사용유무: {custom_mode}") if custom_mode: logger.info(f"[GENERATE-USER-PROMPT]\n{user_prompt}") else: logger.info("[GENERATE-DEFAULT-PROMPT] Default_prompt") def log_pipeline_status(request_id: str, status_message: str, step_info: dict = None): log_entry = { "status": status_message, "timestamp": datetime.now().isoformat(), "step_info": step_info, } redis_client.rpush(f"pipeline_status:{request_id}", json.dumps(log_entry)) def ns_to_sec(ns: int) -> float: """나노초를 초로 변환""" return round(ns / 1e9, 3) # 소수점 3자리 def log_ollama_stats(res: dict): """Ollama 응답 JSON 내 추론 통계를 한 줄 JSON 로그로 출력 (초 단위 변환 + token/s 포함)""" # 원본 값 total_duration = res.get("total_duration") load_duration = res.get("load_duration") prompt_eval_count = res.get("prompt_eval_count") prompt_eval_duration = res.get("prompt_eval_duration") eval_count = res.get("eval_count") eval_duration = res.get("eval_duration") # 초 단위로 변환 stats = { "model": res.get("model"), "total_duration_ns": total_duration, "total_duration_sec": ns_to_sec(total_duration), "load_duration_ns": load_duration, "load_duration_sec": ns_to_sec(load_duration), "prompt_eval_count": prompt_eval_count, "prompt_eval_duration_ns": prompt_eval_duration, "prompt_eval_duration_sec": ns_to_sec(prompt_eval_duration), "eval_count": eval_count, "eval_duration_ns": eval_duration, "eval_duration_sec": ns_to_sec(eval_duration), } # token/s 계산 if eval_count and eval_duration: stats["generation_speed_tok_per_sec"] = round( eval_count / (eval_duration / 1e9), 2 ) logger.info("[OLLAMA-STATS] " + json.dumps(stats, ensure_ascii=False)) class EndpointLogger: def __init__(self, request: Request, api_key: str = Depends(get_api_key)): self.request = request self.api_key = api_key def log( self, model: str, input_filename: str = "N/A", prompt_filename: str = "N/A", context_length: int = 0, ): try: log_user_request( request_info=self.request, endpoint=self.request.url.path, input_filename=input_filename, model=model, prompt_filename=prompt_filename, context_length=context_length, api_key=self.api_key, ) except Exception as e: logger.warning( f"Failed to log request for endpoint {self.request.url.path}: {e}" ) class HealthCheckFilter(logging.Filter): def filter(self, record: logging.LogRecord) -> bool: # The access log record for uvicorn has the data in `args`. # record.args = (client_addr, method, path, http_version, status_code) # e.g. ('127.0.0.1:37894', 'GET', '/health/API', '1.1', 200) if isinstance(record.args, tuple) and len(record.args) == 5: method = record.args[1] path = record.args[2] status_code = record.args[4] if method == 'GET' and isinstance(path, str) and path.startswith('/health') and status_code == 200: return False return True