import os import sys import re import asyncio import pymysql from datetime import datetime from pydantic import BaseModel from fastapi import FastAPI, Request from fastapi.middleware.cors import CORSMiddleware from fastapi.responses import StreamingResponse, FileResponse from fastapi.staticfiles import StaticFiles from fastapi.templating import Jinja2Templates from analyze import analyze_file_content from crawler_service import run_crawler_service, crawl_stop_event from sql_queries import InquiryQueries, DashboardQueries # --- 환경 설정 --- os.environ["PYTHONIOENCODING"] = "utf-8" # Tesseract 경로는 환경에 따라 다를 수 있으므로 환경변수 우선 사용 권장 TESSDATA_PREFIX = os.getenv("TESSDATA_PREFIX", r"C:\Users\User\AppData\Local\Programs\Tesseract-OCR\tessdata") os.environ["TESSDATA_PREFIX"] = TESSDATA_PREFIX app = FastAPI(title="Project Master Overseas API") templates = Jinja2Templates(directory="templates") # 정적 파일 마운트 app.mount("/style", StaticFiles(directory="style"), name="style") app.mount("/js", StaticFiles(directory="js"), name="js") app.mount("/sample_files", StaticFiles(directory="sample"), name="sample_files") app.add_middleware( CORSMiddleware, allow_origins=["*"], allow_credentials=False, allow_methods=["*"], allow_headers=["*"], ) # --- 데이터 모델 --- class AuthRequest(BaseModel): user_id: str password: str # --- 유틸리티 함수 --- def get_db_connection(): """MySQL 데이터베이스 연결을 반환 (환경변수 기반)""" return pymysql.connect( host=os.getenv('DB_HOST', 'localhost'), user=os.getenv('DB_USER', 'root'), password=os.getenv('DB_PASSWORD', '45278434'), database=os.getenv('DB_NAME', 'PM_proto'), charset='utf8mb4', cursorclass=pymysql.cursors.DictCursor ) async def run_in_threadpool(func, *args): """동기 함수를 비차단 방식으로 실행""" loop = asyncio.get_event_loop() return await loop.run_in_executor(None, func, *args) # --- HTML 라우팅 --- @app.get("/") async def root(request: Request): return templates.TemplateResponse("index.html", {"request": request}) @app.get("/dashboard") async def get_dashboard(request: Request): return templates.TemplateResponse("dashboard.html", {"request": request}) @app.get("/mailTest") async def get_mail_test(request: Request): return templates.TemplateResponse("mailTest.html", {"request": request}) @app.get("/inquiries") async def get_inquiries_page(request: Request): return templates.TemplateResponse("inquiries.html", {"request": request}) @app.get("/analysis") async def get_analysis_page(request: Request): return templates.TemplateResponse("analysis.html", {"request": request}) class InquiryReplyRequest(BaseModel): reply: str status: str handler: str # --- 문의사항 API --- @app.get("/api/inquiries") async def get_inquiries(pm_type: str = None, category: str = None, status: str = None, keyword: str = None): # ... (existing code) try: with get_db_connection() as conn: with conn.cursor() as cursor: sql = InquiryQueries.SELECT_BASE params = [] if pm_type: sql += " AND pm_type = %s" params.append(pm_type) if category: sql += " AND category = %s" params.append(category) if status: sql += " AND status = %s" params.append(status) if keyword: sql += " AND (content LIKE %s OR author LIKE %s OR project_nm LIKE %s)" params.extend([f"%{keyword}%", f"%{keyword}%", f"%{keyword}%"]) sql += f" {InquiryQueries.ORDER_BY_DESC}" cursor.execute(sql, params) return cursor.fetchall() except Exception as e: return {"error": str(e)} @app.get("/api/inquiries/{id}") async def get_inquiry_detail(id: int): try: with get_db_connection() as conn: with conn.cursor() as cursor: cursor.execute(InquiryQueries.SELECT_BY_ID, (id,)) return cursor.fetchone() except Exception as e: return {"error": str(e)} @app.post("/api/inquiries/{id}/reply") async def update_inquiry_reply(id: int, req: InquiryReplyRequest): try: with get_db_connection() as conn: with conn.cursor() as cursor: handled_date = datetime.now().strftime("%Y.%m.%d") cursor.execute(InquiryQueries.UPDATE_REPLY, (req.reply, req.status, req.handler, handled_date, id)) conn.commit() return {"success": True} except Exception as e: return {"error": str(e)} @app.delete("/api/inquiries/{id}/reply") async def delete_inquiry_reply(id: int): try: with get_db_connection() as conn: with conn.cursor() as cursor: cursor.execute(InquiryQueries.DELETE_REPLY, (id,)) conn.commit() return {"success": True} except Exception as e: return {"error": str(e)} # --- 분석 및 수집 API --- @app.get("/available-dates") async def get_available_dates(): """히스토리 날짜 목록 반환""" try: with get_db_connection() as conn: with conn.cursor() as cursor: cursor.execute(DashboardQueries.GET_AVAILABLE_DATES) rows = cursor.fetchall() return [row['crawl_date'].strftime("%Y.%m.%d") for row in rows if row['crawl_date']] except Exception as e: return {"error": str(e)} @app.get("/project-data") async def get_project_data(date: str = None): """특정 날짜의 프로젝트 정보 JOIN 반환""" try: target_date = date.replace(".", "-") if date and date != "-" else None with get_db_connection() as conn: with conn.cursor() as cursor: if not target_date: cursor.execute(DashboardQueries.GET_LAST_CRAWL_DATE) res = cursor.fetchone() target_date = res['last_date'] if not target_date: return {"projects": []} cursor.execute(DashboardQueries.GET_PROJECT_LIST, (target_date,)) rows = cursor.fetchall() projects = [] for r in rows: name = r['short_nm'] if r['short_nm'] and r['short_nm'].strip() else r['project_nm'] projects.append([name, r['department'], r['master'], r['recent_log'], r['file_count'], r['continent'], r['country']]) return {"projects": projects} except Exception as e: return {"error": str(e)} @app.get("/project-activity") async def get_project_activity(date: str = None): """활성도 분석 API""" try: with get_db_connection() as conn: with conn.cursor() as cursor: if not date or date == "-": cursor.execute(DashboardQueries.GET_LAST_CRAWL_DATE) res = cursor.fetchone() target_date_val = res['last_date'] if res['last_date'] else datetime.now().date() else: target_date_val = datetime.strptime(date.replace(".", "-"), "%Y-%m-%d").date() target_date_dt = datetime.combine(target_date_val, datetime.min.time()) # 아코디언 리스트와 동일하게 마스터의 모든 프로젝트를 가져오되, 해당 날짜의 히스토리를 매칭 cursor.execute(DashboardQueries.GET_PROJECT_LIST_FOR_ANALYSIS, (target_date_val,)) rows = cursor.fetchall() analysis = {"summary": {"active": 0, "warning": 0, "stale": 0, "unknown": 0}, "details": []} for r in rows: log, files = r['recent_log'], r['file_count'] status, days = "unknown", 999 # 파일 수 정수 변환 (데이터가 없거나 0이면 0) file_val = int(files) if files else 0 has_log = log and log != "데이터 없음" and log != "X" if file_val == 0: # [핵심] 파일이 0개면 무조건 "데이터 없음" status = "unknown" elif has_log: if "폴더자동삭제" in log.replace(" ", ""): # [추가] 폴더 자동 삭제인 경우 날짜 상관없이 무조건 "방치" status = "stale" days = 999 else: # 로그 날짜가 있는 경우 정밀 분석 match = re.search(r'(\d{4})\.(\d{2})\.(\d{2})', log) if match: diff = (target_date_dt - datetime.strptime(match.group(0), "%Y.%m.%d")).days status = "active" if diff <= 7 else "warning" if diff <= 14 else "stale" days = diff else: status = "stale" else: # 파일은 있지만 로그가 없는 경우 status = "stale" analysis["summary"][status] += 1 analysis["details"].append({"name": r['short_nm'] or r['project_nm'], "status": status, "days_ago": days}) return analysis except Exception as e: return {"error": str(e)} @app.post("/auth/crawl") async def auth_crawl(req: AuthRequest): """크롤링 인증""" if req.user_id == os.getenv("PM_USER_ID") and req.password == os.getenv("PM_PASSWORD"): return {"success": True} return {"success": False, "message": "크롤링을 할 수 없습니다."} @app.get("/sync") async def sync_data(): return StreamingResponse(run_crawler_service(), media_type="text_event-stream") @app.get("/stop-sync") async def stop_sync(): crawl_stop_event.set() return {"success": True} @app.get("/api/analysis/p-war") async def get_p_war_analysis(): """P-WAR(Project Performance Above Replacement) 분석 API - 실제 평균 기반""" try: with get_db_connection() as conn: with conn.cursor() as cursor: cursor.execute(DashboardQueries.GET_LAST_CRAWL_DATE) last_date = cursor.fetchone()['last_date'] cursor.execute(DashboardQueries.GET_PROJECT_LIST, (last_date,)) projects = cursor.fetchall() cursor.execute("SELECT project_nm, COUNT(*) as cnt FROM inquiries WHERE status != '완료' GROUP BY project_nm") inquiry_risks = {row['project_nm']: row['cnt'] for row in cursor.fetchall()} import math temp_data = [] total_files = 0 total_stagnant = 0 total_risk = 0 count = len(projects) if count == 0: return [] # 1. 1차 순회: 전체 합계 계산 (평균 산출용) for p in projects: file_count = int(p['file_count']) if p['file_count'] else 0 log = p['recent_log'] days_stagnant = 10 if log and log != "데이터 없음": match = re.search(r'(\d{4})\.(\d{2})\.(\d{2})', log) if match: log_date = datetime.strptime(match.group(0), "%Y.%m.%d").date() days_stagnant = (last_date - log_date).days risk_count = inquiry_risks.get(p['project_nm'], 0) total_files += file_count total_stagnant += days_stagnant total_risk += risk_count temp_data.append((p, file_count, days_stagnant, risk_count)) # 2. 시스템 실제 평균(Mean) 산출 avg_files = total_files / count avg_stagnant = 5 # 사용자 요청에 따라 방치 기준을 5일로 강제 고정 (엄격한 판정) avg_risk = total_risk / count # 3. 평균 수준의 프로젝트 가치(V_avg) 정의 v_rep = ( (1 / (1 + avg_stagnant)) * math.log10(avg_files + 1) ) - (avg_risk * 0.5) results = [] # 4. 2차 순회: P-WAR 산출 (개별 가치 - 평균 가치) for p, f_cnt, d_stg, r_cnt in temp_data: name = p['short_nm'] or p['project_nm'] log = p['recent_log'] or "" is_auto_delete = "폴더자동삭제" in log.replace(" ", "") activity_factor = 1 / (1 + d_stg) scale_factor = math.log10(f_cnt + 1) v_project = (activity_factor * scale_factor) - (r_cnt * 0.5) # [추가] 폴더 자동 삭제 페널티 부여 (실질적 관리 부재) if is_auto_delete: v_project -= 1.5 p_war = v_project - v_rep results.append({ "project_nm": name, "file_count": f_cnt, "days_stagnant": d_stg, "risk_count": r_cnt, "p_war": round(p_war, 3), "is_auto_delete": is_auto_delete, "master": p['master'], "dept": p['department'], "avg_info": { "avg_files": round(avg_files, 1), "avg_stagnant": round(avg_stagnant, 1), "avg_risk": round(avg_risk, 2) } }) results.sort(key=lambda x: x['p_war']) return results except Exception as e: return {"error": str(e)} @app.get("/attachments") async def get_attachments(): path = "sample" if not os.path.exists(path): os.makedirs(path) return [{"name": f, "size": f"{os.path.getsize(os.path.join(path, f))/1024:.1f} KB"} for f in os.listdir(path) if os.path.isfile(os.path.join(path, f))] @app.get("/analyze-file") async def analyze_file(filename: str): return await run_in_threadpool(analyze_file_content, filename) @app.get("/sample.png") async def get_sample_img(): return FileResponse("sample.png")