361 lines
15 KiB
Python
361 lines
15 KiB
Python
import os
|
|
import sys
|
|
import re
|
|
import asyncio
|
|
import pymysql
|
|
from datetime import datetime
|
|
from pydantic import BaseModel
|
|
from fastapi import FastAPI, Request
|
|
from fastapi.middleware.cors import CORSMiddleware
|
|
from fastapi.responses import StreamingResponse, FileResponse
|
|
from fastapi.staticfiles import StaticFiles
|
|
from fastapi.templating import Jinja2Templates
|
|
|
|
from analyze import analyze_file_content
|
|
from crawler_service import run_crawler_service, crawl_stop_event
|
|
from sql_queries import InquiryQueries, DashboardQueries
|
|
|
|
# --- 환경 설정 ---
|
|
os.environ["PYTHONIOENCODING"] = "utf-8"
|
|
# Tesseract 경로는 환경에 따라 다를 수 있으므로 환경변수 우선 사용 권장
|
|
TESSDATA_PREFIX = os.getenv("TESSDATA_PREFIX", r"C:\Users\User\AppData\Local\Programs\Tesseract-OCR\tessdata")
|
|
os.environ["TESSDATA_PREFIX"] = TESSDATA_PREFIX
|
|
|
|
app = FastAPI(title="Project Master Overseas API")
|
|
templates = Jinja2Templates(directory="templates")
|
|
|
|
# 정적 파일 마운트
|
|
app.mount("/style", StaticFiles(directory="style"), name="style")
|
|
app.mount("/js", StaticFiles(directory="js"), name="js")
|
|
app.mount("/sample_files", StaticFiles(directory="sample"), name="sample_files")
|
|
|
|
app.add_middleware(
|
|
CORSMiddleware,
|
|
allow_origins=["*"],
|
|
allow_credentials=False,
|
|
allow_methods=["*"],
|
|
allow_headers=["*"],
|
|
)
|
|
|
|
# --- 데이터 모델 ---
|
|
class AuthRequest(BaseModel):
|
|
user_id: str
|
|
password: str
|
|
|
|
# --- 유틸리티 함수 ---
|
|
def get_db_connection():
|
|
"""MySQL 데이터베이스 연결을 반환 (환경변수 기반)"""
|
|
return pymysql.connect(
|
|
host=os.getenv('DB_HOST', 'localhost'),
|
|
user=os.getenv('DB_USER', 'root'),
|
|
password=os.getenv('DB_PASSWORD', '45278434'),
|
|
database=os.getenv('DB_NAME', 'PM_proto'),
|
|
charset='utf8mb4',
|
|
cursorclass=pymysql.cursors.DictCursor
|
|
)
|
|
|
|
async def run_in_threadpool(func, *args):
|
|
"""동기 함수를 비차단 방식으로 실행"""
|
|
loop = asyncio.get_event_loop()
|
|
return await loop.run_in_executor(None, func, *args)
|
|
|
|
# --- HTML 라우팅 ---
|
|
@app.get("/")
|
|
async def root(request: Request):
|
|
return templates.TemplateResponse("index.html", {"request": request})
|
|
|
|
@app.get("/dashboard")
|
|
async def get_dashboard(request: Request):
|
|
return templates.TemplateResponse("dashboard.html", {"request": request})
|
|
|
|
@app.get("/mailTest")
|
|
async def get_mail_test(request: Request):
|
|
return templates.TemplateResponse("mailTest.html", {"request": request})
|
|
|
|
@app.get("/inquiries")
|
|
async def get_inquiries_page(request: Request):
|
|
return templates.TemplateResponse("inquiries.html", {"request": request})
|
|
|
|
@app.get("/analysis")
|
|
async def get_analysis_page(request: Request):
|
|
return templates.TemplateResponse("analysis.html", {"request": request})
|
|
|
|
class InquiryReplyRequest(BaseModel):
|
|
reply: str
|
|
status: str
|
|
handler: str
|
|
|
|
# --- 문의사항 API ---
|
|
@app.get("/api/inquiries")
|
|
async def get_inquiries(pm_type: str = None, category: str = None, status: str = None, keyword: str = None):
|
|
# ... (existing code)
|
|
try:
|
|
with get_db_connection() as conn:
|
|
with conn.cursor() as cursor:
|
|
sql = InquiryQueries.SELECT_BASE
|
|
params = []
|
|
if pm_type:
|
|
sql += " AND pm_type = %s"
|
|
params.append(pm_type)
|
|
if category:
|
|
sql += " AND category = %s"
|
|
params.append(category)
|
|
if status:
|
|
sql += " AND status = %s"
|
|
params.append(status)
|
|
if keyword:
|
|
sql += " AND (content LIKE %s OR author LIKE %s OR project_nm LIKE %s)"
|
|
params.extend([f"%{keyword}%", f"%{keyword}%", f"%{keyword}%"])
|
|
|
|
sql += f" {InquiryQueries.ORDER_BY_DESC}"
|
|
cursor.execute(sql, params)
|
|
return cursor.fetchall()
|
|
except Exception as e:
|
|
return {"error": str(e)}
|
|
|
|
@app.get("/api/inquiries/{id}")
|
|
async def get_inquiry_detail(id: int):
|
|
try:
|
|
with get_db_connection() as conn:
|
|
with conn.cursor() as cursor:
|
|
cursor.execute(InquiryQueries.SELECT_BY_ID, (id,))
|
|
return cursor.fetchone()
|
|
except Exception as e:
|
|
return {"error": str(e)}
|
|
|
|
@app.post("/api/inquiries/{id}/reply")
|
|
async def update_inquiry_reply(id: int, req: InquiryReplyRequest):
|
|
try:
|
|
with get_db_connection() as conn:
|
|
with conn.cursor() as cursor:
|
|
handled_date = datetime.now().strftime("%Y.%m.%d")
|
|
cursor.execute(InquiryQueries.UPDATE_REPLY, (req.reply, req.status, req.handler, handled_date, id))
|
|
conn.commit()
|
|
return {"success": True}
|
|
except Exception as e:
|
|
return {"error": str(e)}
|
|
|
|
@app.delete("/api/inquiries/{id}/reply")
|
|
async def delete_inquiry_reply(id: int):
|
|
try:
|
|
with get_db_connection() as conn:
|
|
with conn.cursor() as cursor:
|
|
cursor.execute(InquiryQueries.DELETE_REPLY, (id,))
|
|
conn.commit()
|
|
return {"success": True}
|
|
except Exception as e:
|
|
return {"error": str(e)}
|
|
|
|
# --- 분석 및 수집 API ---
|
|
@app.get("/available-dates")
|
|
async def get_available_dates():
|
|
"""히스토리 날짜 목록 반환"""
|
|
try:
|
|
with get_db_connection() as conn:
|
|
with conn.cursor() as cursor:
|
|
cursor.execute(DashboardQueries.GET_AVAILABLE_DATES)
|
|
rows = cursor.fetchall()
|
|
return [row['crawl_date'].strftime("%Y.%m.%d") for row in rows if row['crawl_date']]
|
|
except Exception as e:
|
|
return {"error": str(e)}
|
|
|
|
@app.get("/project-data")
|
|
async def get_project_data(date: str = None):
|
|
"""특정 날짜의 프로젝트 정보 JOIN 반환"""
|
|
try:
|
|
target_date = date.replace(".", "-") if date and date != "-" else None
|
|
with get_db_connection() as conn:
|
|
with conn.cursor() as cursor:
|
|
if not target_date:
|
|
cursor.execute(DashboardQueries.GET_LAST_CRAWL_DATE)
|
|
res = cursor.fetchone()
|
|
target_date = res['last_date']
|
|
|
|
if not target_date: return {"projects": []}
|
|
|
|
cursor.execute(DashboardQueries.GET_PROJECT_LIST, (target_date,))
|
|
rows = cursor.fetchall()
|
|
|
|
projects = []
|
|
for r in rows:
|
|
name = r['short_nm'] if r['short_nm'] and r['short_nm'].strip() else r['project_nm']
|
|
projects.append([name, r['department'], r['master'], r['recent_log'], r['file_count'], r['continent'], r['country']])
|
|
return {"projects": projects}
|
|
except Exception as e:
|
|
return {"error": str(e)}
|
|
|
|
@app.get("/project-activity")
|
|
async def get_project_activity(date: str = None):
|
|
"""활성도 분석 API"""
|
|
try:
|
|
with get_db_connection() as conn:
|
|
with conn.cursor() as cursor:
|
|
if not date or date == "-":
|
|
cursor.execute(DashboardQueries.GET_LAST_CRAWL_DATE)
|
|
res = cursor.fetchone()
|
|
target_date_val = res['last_date'] if res['last_date'] else datetime.now().date()
|
|
else:
|
|
target_date_val = datetime.strptime(date.replace(".", "-"), "%Y-%m-%d").date()
|
|
|
|
target_date_dt = datetime.combine(target_date_val, datetime.min.time())
|
|
|
|
# 아코디언 리스트와 동일하게 마스터의 모든 프로젝트를 가져오되, 해당 날짜의 히스토리를 매칭
|
|
cursor.execute(DashboardQueries.GET_PROJECT_LIST_FOR_ANALYSIS, (target_date_val,))
|
|
rows = cursor.fetchall()
|
|
|
|
analysis = {"summary": {"active": 0, "warning": 0, "stale": 0, "unknown": 0}, "details": []}
|
|
for r in rows:
|
|
log, files = r['recent_log'], r['file_count']
|
|
status, days = "unknown", 999
|
|
|
|
# 파일 수 정수 변환 (데이터가 없거나 0이면 0)
|
|
file_val = int(files) if files else 0
|
|
has_log = log and log != "데이터 없음" and log != "X"
|
|
|
|
if file_val == 0:
|
|
# [핵심] 파일이 0개면 무조건 "데이터 없음"
|
|
status = "unknown"
|
|
elif has_log:
|
|
if "폴더자동삭제" in log.replace(" ", ""):
|
|
# [추가] 폴더 자동 삭제인 경우 날짜 상관없이 무조건 "방치"
|
|
status = "stale"
|
|
days = 999
|
|
else:
|
|
# 로그 날짜가 있는 경우 정밀 분석
|
|
match = re.search(r'(\d{4})\.(\d{2})\.(\d{2})', log)
|
|
if match:
|
|
diff = (target_date_dt - datetime.strptime(match.group(0), "%Y.%m.%d")).days
|
|
status = "active" if diff <= 7 else "warning" if diff <= 14 else "stale"
|
|
days = diff
|
|
else:
|
|
status = "stale"
|
|
else:
|
|
# 파일은 있지만 로그가 없는 경우
|
|
status = "stale"
|
|
|
|
analysis["summary"][status] += 1
|
|
analysis["details"].append({"name": r['short_nm'] or r['project_nm'], "status": status, "days_ago": days})
|
|
return analysis
|
|
except Exception as e:
|
|
return {"error": str(e)}
|
|
|
|
@app.post("/auth/crawl")
|
|
async def auth_crawl(req: AuthRequest):
|
|
"""크롤링 인증"""
|
|
if req.user_id == os.getenv("PM_USER_ID") and req.password == os.getenv("PM_PASSWORD"):
|
|
return {"success": True}
|
|
return {"success": False, "message": "크롤링을 할 수 없습니다."}
|
|
|
|
@app.get("/sync")
|
|
async def sync_data():
|
|
return StreamingResponse(run_crawler_service(), media_type="text_event-stream")
|
|
|
|
@app.get("/stop-sync")
|
|
async def stop_sync():
|
|
crawl_stop_event.set()
|
|
return {"success": True}
|
|
|
|
@app.get("/api/analysis/p-war")
|
|
async def get_p_war_analysis():
|
|
"""P-WAR(Project Performance Above Replacement) 분석 API - 실제 평균 기반"""
|
|
try:
|
|
with get_db_connection() as conn:
|
|
with conn.cursor() as cursor:
|
|
cursor.execute(DashboardQueries.GET_LAST_CRAWL_DATE)
|
|
last_date = cursor.fetchone()['last_date']
|
|
|
|
cursor.execute(DashboardQueries.GET_PROJECT_LIST, (last_date,))
|
|
projects = cursor.fetchall()
|
|
|
|
cursor.execute("SELECT project_nm, COUNT(*) as cnt FROM inquiries WHERE status != '완료' GROUP BY project_nm")
|
|
inquiry_risks = {row['project_nm']: row['cnt'] for row in cursor.fetchall()}
|
|
|
|
import math
|
|
temp_data = []
|
|
total_files = 0
|
|
total_stagnant = 0
|
|
total_risk = 0
|
|
count = len(projects)
|
|
|
|
if count == 0: return []
|
|
|
|
# 1. 1차 순회: 전체 합계 계산 (평균 산출용)
|
|
for p in projects:
|
|
file_count = int(p['file_count']) if p['file_count'] else 0
|
|
log = p['recent_log']
|
|
|
|
days_stagnant = 10
|
|
if log and log != "데이터 없음":
|
|
match = re.search(r'(\d{4})\.(\d{2})\.(\d{2})', log)
|
|
if match:
|
|
log_date = datetime.strptime(match.group(0), "%Y.%m.%d").date()
|
|
days_stagnant = (last_date - log_date).days
|
|
|
|
risk_count = inquiry_risks.get(p['project_nm'], 0)
|
|
|
|
total_files += file_count
|
|
total_stagnant += days_stagnant
|
|
total_risk += risk_count
|
|
temp_data.append((p, file_count, days_stagnant, risk_count))
|
|
|
|
# 2. 시스템 실제 평균(Mean) 산출
|
|
avg_files = total_files / count
|
|
avg_stagnant = 5 # 사용자 요청에 따라 방치 기준을 5일로 강제 고정 (엄격한 판정)
|
|
avg_risk = total_risk / count
|
|
|
|
# 3. 평균 수준의 프로젝트 가치(V_avg) 정의
|
|
v_rep = ( (1 / (1 + avg_stagnant)) * math.log10(avg_files + 1) ) - (avg_risk * 0.5)
|
|
|
|
results = []
|
|
# 4. 2차 순회: P-WAR 산출 (개별 가치 - 평균 가치)
|
|
for p, f_cnt, d_stg, r_cnt in temp_data:
|
|
name = p['short_nm'] or p['project_nm']
|
|
log = p['recent_log'] or ""
|
|
is_auto_delete = "폴더자동삭제" in log.replace(" ", "")
|
|
|
|
activity_factor = 1 / (1 + d_stg)
|
|
scale_factor = math.log10(f_cnt + 1)
|
|
v_project = (activity_factor * scale_factor) - (r_cnt * 0.5)
|
|
|
|
# [추가] 폴더 자동 삭제 페널티 부여 (실질적 관리 부재)
|
|
if is_auto_delete:
|
|
v_project -= 1.5
|
|
|
|
p_war = v_project - v_rep
|
|
|
|
results.append({
|
|
"project_nm": name,
|
|
"file_count": f_cnt,
|
|
"days_stagnant": d_stg,
|
|
"risk_count": r_cnt,
|
|
"p_war": round(p_war, 3),
|
|
"is_auto_delete": is_auto_delete,
|
|
"master": p['master'],
|
|
"dept": p['department'],
|
|
"avg_info": {
|
|
"avg_files": round(avg_files, 1),
|
|
"avg_stagnant": round(avg_stagnant, 1),
|
|
"avg_risk": round(avg_risk, 2)
|
|
}
|
|
})
|
|
|
|
results.sort(key=lambda x: x['p_war'])
|
|
return results
|
|
except Exception as e:
|
|
return {"error": str(e)}
|
|
|
|
@app.get("/attachments")
|
|
async def get_attachments():
|
|
path = "sample"
|
|
if not os.path.exists(path): os.makedirs(path)
|
|
return [{"name": f, "size": f"{os.path.getsize(os.path.join(path, f))/1024:.1f} KB"}
|
|
for f in os.listdir(path) if os.path.isfile(os.path.join(path, f))]
|
|
|
|
@app.get("/analyze-file")
|
|
async def analyze_file(filename: str):
|
|
return await run_in_threadpool(analyze_file_content, filename)
|
|
|
|
@app.get("/sample.png")
|
|
async def get_sample_img():
|
|
return FileResponse("sample.png")
|