import re import math import statistics from datetime import datetime from sql_queries import DashboardQueries from prediction_service import SOIPredictionService class AnalysisService: """프로젝트 통계 및 활동성 분석 전문 서비스""" @staticmethod def calculate_activity_status(target_date_dt, log, file_count): """개별 프로젝트의 활동 상태 및 방치일 산출""" status, days = "unknown", 999 file_val = int(file_count) if file_count else 0 has_log = log and log != "데이터 없음" and log != "X" if file_val == 0: status = "unknown" elif has_log: if "폴더자동삭제" in log.replace(" ", ""): status = "stale" days = 999 else: match = re.search(r'(\d{4})\.(\d{2})\.(\d{2})', log) if match: log_date = datetime.strptime(match.group(0), "%Y.%m.%d") diff = (target_date_dt - log_date).days status = "active" if diff <= 7 else "warning" if diff <= 14 else "stale" days = diff else: status = "stale" else: status = "stale" return status, days @staticmethod def get_project_activity_logic(cursor, date_str): """활동도 분석 리포트 생성 로직""" if not date_str or date_str == "-": cursor.execute(DashboardQueries.GET_LAST_CRAWL_DATE) res = cursor.fetchone() target_date_val = res['last_date'] if res['last_date'] else datetime.now().date() else: target_date_val = datetime.strptime(date_str.replace(".", "-"), "%Y-%m-%d").date() target_date_dt = datetime.combine(target_date_val, datetime.min.time()) cursor.execute(DashboardQueries.GET_PROJECT_LIST_FOR_ANALYSIS, (target_date_val,)) rows = cursor.fetchall() analysis = {"summary": {"active": 0, "warning": 0, "stale": 0, "unknown": 0}, "details": []} for r in rows: status, days = AnalysisService.calculate_activity_status(target_date_dt, r['recent_log'], r['file_count']) analysis["summary"][status] += 1 analysis["details"].append({"name": r['short_nm'] or r['project_nm'], "status": status, "days_ago": days}) return analysis @staticmethod def get_p_zsr_analysis_logic(cursor): """절대적 방치 실태 고발 및 AI 위험 적응형(AAS) 분석 로직""" cursor.execute(DashboardQueries.GET_LAST_CRAWL_DATE) res_date = cursor.fetchone() if not res_date or not res_date['last_date']: return [] last_date = res_date['last_date'] cursor.execute(""" SELECT m.project_id, m.project_nm, m.short_nm, m.department, m.master, h.recent_log, h.file_count, m.continent, m.country FROM projects_master m LEFT JOIN projects_history h ON m.project_id = h.project_id AND h.crawl_date = %s ORDER BY m.project_id ASC """, (last_date,)) projects = cursor.fetchall() if not projects: return [] # [Step 1] AI 전처리: 부서별 평균 방치일 계산 (조직적 위험도 산출용) dept_stats = {} for p in projects: log = p['recent_log'] days = 14 # 기본값 if log and log != "데이터 없음": match = re.search(r'(\d{4})\.(\d{2})\.(\d{2})', log) if match: log_date = datetime.strptime(match.group(0), "%Y.%m.%d").date() days = (last_date - log_date).days dept = p['department'] or "미분류" if dept not in dept_stats: dept_stats[dept] = [] dept_stats[dept].append(days) dept_avg_risk = {d: statistics.mean(days_list) for d, days_list in dept_stats.items()} # [Step 2] AI 위험 적응형 SOI 산출 (AAS 모델) results = [] total_soi = 0 for p in projects: file_count = int(p['file_count']) if p['file_count'] else 0 log = p['recent_log'] dept = p['department'] or "미분류" # 방치일 계산 days_stagnant = 14 if log and log != "데이터 없음": match = re.search(r'(\d{4})\.(\d{2})\.(\d{2})', log) if match: log_date = datetime.strptime(match.group(0), "%Y.%m.%d").date() days_stagnant = (last_date - log_date).days is_auto_delete = log and "폴더자동삭제" in log.replace(" ", "") # AI-Hazard 추론 로직 (Dynamic Lambda) # 1. 자산 규모 리스크 (파일이 많을수록 방치 시 가치 하락 가속) scale_impact = min(0.04, math.log10(file_count + 1) * 0.008) if file_count > 0 else 0 # 2. 조직적 전염 리스크 (부서 전체가 방치 중이면 패널티 부여) dept_risk_days = dept_avg_risk.get(dept, 14) env_impact = min(0.03, (dept_risk_days / 30) * 0.01) # 최종 AI 위험 계수 산출 (기본 0.04에서 변동) ai_lambda = 0.04 + scale_impact + env_impact # 지수 감쇄 적용 (AAS Score) soi_score = math.exp(-ai_lambda * days_stagnant) * 100 # [AI 데이터 진정성 검증 로직 1 - ECV 패널티 (존재론적)] existence_confidence = 1.0 if file_count == 0: existence_confidence = 0.05 elif file_count < 10: existence_confidence = 0.4 # [AI 데이터 진정성 검증 로직 2 - Log Quality Scoring (활동의 질)] log_quality_factor = 1.0 if log and log != "데이터 없음": # 성과 중심 (High) if any(k in log for k in ["업로드", "수정", "등록", "변환", "파일", "업데이트"]): log_quality_factor = 1.0 # 구조 관리 (Mid) elif any(k in log for k in ["폴더", "생성", "삭제", "이동"]): log_quality_factor = 0.7 # 단순 행정/설정 (Low) elif any(k in log for k in ["참가자", "권한", "추가", "변경", "메일"]): log_quality_factor = 0.4 else: log_quality_factor = 0.6 # 기타 일반 로그 # 최종 점수 산출 (AAS * ECV * LogQuality) soi_score = soi_score * existence_confidence * log_quality_factor if is_auto_delete: soi_score = 0.1 # [AI 미래 예측 및 실무 투입 에너지 분석] history_rows = SOIPredictionService.get_historical_soi(cursor, p['project_id']) predicted_soi = SOIPredictionService.predict_future_soi(soi_score, history_rows, days_ahead=14) # 실무 투입 에너지 계산 (최근 30개 히스토리 기준 파일 변화일수) effort_days = 0 if len(history_rows) > 1: for i in range(1, len(history_rows)): if history_rows[i]['file_count'] != history_rows[i-1]['file_count']: effort_days += 1 work_effort_rate = round((effort_days / max(1, len(history_rows))) * 100, 1) total_soi += soi_score results.append({ "project_nm": p['short_nm'] or p['project_nm'], "file_count": file_count, "days_stagnant": days_stagnant, "risk_count": 0, "p_war": round(soi_score, 1), "predicted_soi": predicted_soi, "is_auto_delete": is_auto_delete, "master": p['master'], "dept": p['department'], "ai_lambda": round(ai_lambda, 4), "log_quality": log_quality_factor, "work_effort": work_effort_rate, # 신규 지표 추가 "avg_info": { "avg_files": 0, "avg_stagnant": 0, "avg_risk": round(total_soi / len(projects), 1) } }) results.sort(key=lambda x: x['p_war']) return results