Files
test-mcp/analysis_service.py

271 lines
12 KiB
Python

import re
import math
import statistics
from datetime import datetime, timedelta
from sql_queries import DashboardQueries
from prediction_service import SOIPredictionService
class AnalysisService:
"""프로젝트 통계 및 활동성 분석 전문 서비스"""
@staticmethod
def calculate_operational_consistency(history_rows, days_stagnant):
"""운영 일관성 지수(OCI) 산출 로직 (자산 규모 및 장기 정체 패널티 포함)
최근 30일간 활동 리듬 분석 + 현재 방치 기간에 따른 강력한 감쇄
"""
if not history_rows or len(history_rows) < 2:
return 0.0
# [추가] 최신 상태 확인: 현재 로그가 '폴더자동삭제'면 점수 즉시 0점 (일수는 실제 일수 유지)
latest_log = history_rows[-1].get('recent_log', '') or ''
if latest_log and "폴더자동삭제" in latest_log.replace(" ", ""):
return 0.0
# 1. 최근 30일 이력 기반 Base Score 산출
now = datetime.now().date()
recent_30 = [h for h in history_rows if (now - h['crawl_date']).days <= 30]
if not recent_30:
return 0.0
# [추가] 자산 규모 확인: 파일이 0개면 운영 일관성 산출 자체가 무의미함
max_files = max([int(h['file_count'] or 0) for h in recent_30])
if max_files == 0:
return 0.0
# 주차별 활동 여부 (4주) - 파일이 1개 이상 존재할 때만 유효 활동으로 인정
weeks_active = [False, False, False, False]
for h in recent_30:
if int(h['file_count'] or 0) > 0:
days_ago = (now - h['crawl_date']).days
week_idx = min(3, days_ago // 7)
weeks_active[week_idx] = True
base_consistency = (sum(weeks_active) / 4) * 70
# 활동 밀도 (변화 발생일 비율)
effort_days = 0
for i in range(1, len(recent_30)):
# '폴더자동삭제' 로그가 포함된 날의 변화는 관리 노력으로 인정하지 않음
log_content = recent_30[i].get('recent_log', '') or ''
if "폴더자동삭제" in log_content.replace(" ", ""):
continue
if recent_30[i]['file_count'] != recent_30[i-1]['file_count']:
effort_days += 1
density_score = (effort_days / max(1, len(recent_30))) * 30
base_oci = base_consistency + density_score
# 2. [핵심] 패널티 엔진 적용
# A. 장기 정체 패널티: 방치일이 100일 이상이면 0점으로 수렴
stagnation_factor = max(0, (100 - days_stagnant) / 100.0)
# B. 자산 부족 패널티 (Existence Confidence): 파일이 너무 적으면 관리 신뢰도 하락
# 10개 미만은 50%만 인정, 그 이상은 점진적으로 100%까지 회복
asset_confidence = 1.0
if max_files < 10:
asset_confidence = 0.5
elif max_files < 30:
asset_confidence = 0.8
final_oci = base_oci * stagnation_factor * asset_confidence
return round(final_oci, 1)
@staticmethod
def calculate_activity_status(target_date_dt, log, file_count):
"""개별 프로젝트의 활동 상태 및 방치일 산출 (현재 시각 기준 실질 방치일 산출)"""
status, days = "unknown", 999
file_val = int(file_count) if file_count else 0
has_log = log and log != "데이터 없음" and log != "X"
# 실질적인 오늘 날짜를 기준으로 정체일 산출 (사용자 직관성 강화)
now_dt = datetime.now()
if file_val == 0:
status = "unknown"
elif has_log:
is_auto = "폴더자동삭제" in log.replace(" ", "")
# 2자리 또는 4자리 연도 지원 정규식
match = re.search(r'(\d{2,4})\.(\d{2})\.(\d{2})', log)
if match:
y, m, d = match.groups()
# 2자리 연도 보정
if len(y) == 2: y = "20" + y
log_date = datetime.strptime(f"{y}.{m}.{d}", "%Y.%m.%d")
# 수집일(target_date_dt)이 아닌 현재 시점(now_dt) 기준으로 차이 계산
diff = (now_dt - log_date).days
days = diff
# 상태 판정은 수집 시점의 target_date_dt를 기준으로 할지 검토 필요하나,
# 사용자 요청에 따라 '이상한 계산'을 바로잡기 위해 현재 시점 기준 판정 적용
status = "stale" if is_auto or diff > 14 else "warning" if diff > 7 else "active"
else:
status = "stale"
days = 999
else:
status = "stale"
return status, days
@staticmethod
def get_project_activity_logic(cursor, date_str):
"""활동도 분석 리포트 생성 로직"""
if not date_str or date_str == "-":
cursor.execute(DashboardQueries.GET_LAST_CRAWL_DATE)
res = cursor.fetchone()
target_date_val = res['last_date'] if res['last_date'] else datetime.now().date()
else:
target_date_val = datetime.strptime(date_str.replace(".", "-"), "%Y-%m-%d").date()
target_date_dt = datetime.combine(target_date_val, datetime.min.time())
cursor.execute(DashboardQueries.GET_PROJECT_LIST_FOR_ANALYSIS, (target_date_val,))
rows = cursor.fetchall()
analysis = {"summary": {"active": 0, "warning": 0, "stale": 0, "unknown": 0}, "details": []}
for r in rows:
status, days = AnalysisService.calculate_activity_status(target_date_dt, r['recent_log'], r['file_count'])
analysis["summary"][status] += 1
analysis["details"].append({"name": r['short_nm'] or r['project_nm'], "status": status, "days_ago": days})
return analysis
@staticmethod
def get_p_zsr_analysis_logic(cursor):
"""절대적 방치 실태 고발 및 운영 일관성(OCI) 분석 로직"""
cursor.execute(DashboardQueries.GET_LAST_CRAWL_DATE)
res_date = cursor.fetchone()
if not res_date or not res_date['last_date']:
return []
last_date = res_date['last_date']
# 특정 날짜(last_date) 이하의 각 프로젝트별 최신 데이터를 조인하도록 수정
cursor.execute("""
SELECT m.project_id, m.project_nm, m.short_nm, m.department, m.master,
h.recent_log, h.file_count, m.continent, m.country
FROM projects_master m
LEFT JOIN projects_history h ON h.project_id = m.project_id AND h.crawl_date = (
SELECT MAX(crawl_date)
FROM projects_history
WHERE project_id = m.project_id AND crawl_date <= %s
)
ORDER BY m.project_id ASC
""", (last_date,))
projects = cursor.fetchall()
if not projects: return []
results = []
total_avi = 0
total_files = 0
project_data_list = []
# 1차 Pass: 개별 AVI 산출 및 전체 합계 집계
now_dt = datetime.now()
for p in projects:
file_count = int(p['file_count']) if p['file_count'] else 0
log = p['recent_log']
# 방치일 계산 (현재 시각 기준 동기화)
days_stagnant = 14
is_auto_delete = log and "폴더자동삭제" in log.replace(" ", "")
if log and log != "데이터 없음":
match = re.search(r'(\d{2,4})\.(\d{2})\.(\d{2})', log)
if match:
y, m, d = match.groups()
if len(y) == 2: y = "20" + y
log_date = datetime.strptime(f"{y}.{m}.{d}", "%Y.%m.%d")
days_stagnant = (now_dt - log_date).days
elif is_auto_delete:
days_stagnant = 999
# AI-Hazard 추론 로직 (Dynamic Lambda)
scale_impact = min(0.04, math.log10(file_count + 1) * 0.008) if file_count > 0 else 0
ai_lambda = 0.04 + scale_impact
# 지수 감쇄 적용
avi_score = math.exp(-ai_lambda * days_stagnant) * 100
# ECV 패널티
existence_confidence = 1.0
if file_count == 0: existence_confidence = 0.05
elif file_count < 10: existence_confidence = 0.4
# Log Quality Scoring (SWVW 모델 적용)
from log_scorer import LogScorer
log_quality_factor = LogScorer.get_score(log)
avi_score = avi_score * existence_confidence * log_quality_factor
if is_auto_delete: avi_score = 0.1
total_avi += avi_score
total_files += file_count
project_data_list.append({
"p": p,
"avi_score": avi_score,
"file_count": file_count,
"days_stagnant": days_stagnant,
"is_auto_delete": is_auto_delete,
"log_quality": log_quality_factor,
"ai_lambda": ai_lambda
})
# 2차 Pass: 평균 기반 가치기여도(WAR) 산출
num_projects = len(projects) if projects else 1
avg_avi = total_avi / num_projects
avg_files = total_files / num_projects
for item in project_data_list:
p = item['p']
avi_score = item['avi_score']
file_count = item['file_count']
# [운영 일관성 분석 (OCI)]
history_rows = SOIPredictionService.get_historical_avi(cursor, p['project_id'])
oci_score = AnalysisService.calculate_operational_consistency(history_rows, item['days_stagnant'])
# 실무 투입 에너지 계산
effort_days = 0
if len(history_rows) > 1:
for i in range(1, len(history_rows)):
if history_rows[i]['file_count'] != history_rows[i-1]['file_count']:
effort_days += 1
work_effort_rate = round((effort_days / max(1, len(history_rows))) * 100, 1)
# [VCI 산출 - 로그 기반 상대 가중치 모델 (수정)]
# 1. 파일 규모 가중치를 로그(log10) 기반으로 변경하여 선형 폭주 방지
# 2. 평균 파일 수 대비 상대적 규모를 반영하되, 최대 가중치를 2.5로 캡핑(Capping)
if avg_files > 0:
relative_size = math.log10(file_count + 1) / math.log10(avg_files + 1)
else:
relative_size = 1.0
asset_weight = min(2.5, max(0.2, relative_size))
p_war_score = (avi_score - avg_avi) * asset_weight
results.append({
"project_nm": p['short_nm'] or p['project_nm'],
"file_count": file_count,
"days_stagnant": item['days_stagnant'],
"risk_count": round(p_war_score, 2), # WAR 기반 가치기여도 (평균 0)
"p_war": round(avi_score, 1),
"oci_score": oci_score,
"is_auto_delete": item['is_auto_delete'],
"master": p['master'],
"dept": p['department'],
"ai_lambda": round(item['ai_lambda'], 4),
"log_quality": item['log_quality'],
"work_effort": work_effort_rate,
"avg_info": {
"avg_files": round(avg_files, 1),
"avg_stagnant": 0,
"avg_risk": round(avg_avi, 1)
}
})
results.sort(key=lambda x: x['p_war'])
return results