From 4ced8db541ddbbfb56101d18bfa0a6c330fb387c Mon Sep 17 00:00:00 2001 From: kyy Date: Wed, 8 Jan 2025 15:06:58 +0900 Subject: [PATCH] Update commit --- ADR-0.md | 1 - Dockerfile | 6 ++ README.md | 10 +- docker-compose.yml | 2 + workspace/setting.json => setting.json | 0 workspace/main.py | 132 +++++++++++-------------- workspace/template.py | 10 +- workspace/tests/example.py | 10 +- workspace/worker.py | 2 +- 9 files changed, 78 insertions(+), 95 deletions(-) rename workspace/setting.json => setting.json (100%) mode change 100755 => 100644 diff --git a/ADR-0.md b/ADR-0.md index 8de09dc..5ef2605 100755 --- a/ADR-0.md +++ b/ADR-0.md @@ -13,7 +13,6 @@ FastAPI를 이용해 클라이언트가 RESTful API를 통해 서비스와 상 - 비동기 요청을 처리하고 데이터 유효성 검사를 수행. - **주요 엔드포인트**: - `/start-inference/`: CSV 및 모델 리스트 파일 업로드 후 추론 작업 시작. - - `/merge-results/`: 배치별로 나뉜 결과를 단일 파일로 결합 - `/download-latest/`: 가장 최근에 완료된 결과 파일 다운로드. ### (2) 작업 처리 계층 diff --git a/Dockerfile b/Dockerfile index 98cd4f7..e6939e7 100755 --- a/Dockerfile +++ b/Dockerfile @@ -9,6 +9,12 @@ WORKDIR /opt/workspace # Use a faster mirror for apt-get RUN sed -i 's/archive.ubuntu.com/mirror.kakao.com/g' /etc/apt/sources.list +# Install tzdata and set the timezone to Asia/Seoul +RUN apt-get update && apt-get install -y tzdata && \ + ln -sf /usr/share/zoneinfo/Asia/Seoul /etc/localtime && \ + echo "Asia/Seoul" > /etc/timezone && \ + dpkg-reconfigure -f noninteractive tzdata + # Install system dependencies (including dependencies for Hugging Face login) RUN apt-get update && apt-get install -y \ git \ diff --git a/README.md b/README.md index 32f54b5..c4d9166 100755 --- a/README.md +++ b/README.md @@ -47,14 +47,6 @@ http://localhost:8000/docs - CSV 데이터와 모델 리스트를 업로드하여 추론 작업 시작 - 결과 파일은 자동으로 저장됨 -2. **결과 병합** (`GET /merge-results/`): - - `processed` 디렉토리에 저장된 모든 `_result.csv` 파일을 병합하여 하나의 CSV 파일로 통합합니다. - - 병합된 최종 결과는 `processed/final_result.csv`로 저장되며, 이 파일의 경로가 응답으로 반환됩니다. - - **사용 사례**: - - 다중 모델 추론 결과를 한곳에 통합하여 분석하려는 경우. - - 배치별로 나뉜 결과를 단일 파일로 결합하려는 경우. - -3. **최신 결과 다운로드** (`GET /download-latest`): +2. **최신 결과 다운로드** (`GET /download-latest`): - 가장 최근에 완료된 추론 결과를 다운로드 - --- \ No newline at end of file diff --git a/docker-compose.yml b/docker-compose.yml index b11e75a..497a43c 100755 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -9,6 +9,7 @@ services: volumes: - ./workspace:/opt/workspace/ - ./cache:/root/.cache/ + - ../model:/opt/model/ environment: PYTORCH_CUDA_ALLOC_CONF: expandable_segments:True PYTHONPATH: /opt/workspace/ @@ -47,6 +48,7 @@ services: volumes: - ./workspace:/opt/workspace/ - ./cache:/root/.cache/ + - ../model:/opt/model/ environment: PYTORCH_CUDA_ALLOC_CONF: expandable_segments:True PYTHONPATH: /opt/workspace/ diff --git a/workspace/setting.json b/setting.json old mode 100755 new mode 100644 similarity index 100% rename from workspace/setting.json rename to setting.json diff --git a/workspace/main.py b/workspace/main.py index 96e6f01..6462c59 100755 --- a/workspace/main.py +++ b/workspace/main.py @@ -11,7 +11,7 @@ import torch from tqdm import tqdm import sys -sys.path.append("/opt/workspace/") +sys.path.append("/workspace/LLM_asyncio") from template import LLMInference app = FastAPI() @@ -27,11 +27,13 @@ logger = logging.getLogger(__name__) # FastAPI 엔드포인트: CSV 파일 및 모델 리스트 업로드 처리 @app.post("/start-inference/") async def process_csv(input_csv: UploadFile, model_list_txt: UploadFile, background_tasks: BackgroundTasks): - logger.info(f"file_name: {input_csv},model_list_file: {model_list_txt}") - # 파일 형식 확인 및 저장 - if not input_csv.filename.endswith(".csv") or not model_list_txt.filename.endswith(".txt"): - return JSONResponse(content={"error": "Invalid file format."}, status_code=400) + # 파일 형식 확인 + if not input_csv.filename.endswith(".csv"): + return JSONResponse(content={"error": "Uploaded file is not a CSV."}, status_code=400) + if not model_list_txt.filename.endswith(".txt"): + return JSONResponse(content={"error": "Uploaded model list is not a TXT file."}, status_code=400) + # 파일 저장 file_path = f"uploaded/{input_csv.filename}" model_list_path = f"uploaded/{model_list_txt.filename}" os.makedirs("uploaded", exist_ok=True) @@ -42,41 +44,32 @@ async def process_csv(input_csv: UploadFile, model_list_txt: UploadFile, backgro with open(model_list_path, "wb") as f: f.write(await model_list_txt.read()) - df = pd.read_csv(file_path, encoding="euc-kr") - batch_size = 10 - job_ids = [] + logger.info(f"Files uploaded: {file_path}, {model_list_path}") - # 데이터를 batch_size로 나누어 작업 큐에 추가 - for i in range(0, len(df), batch_size): - batch_file_path = file_path.replace(".csv", f"_batch_{i}_{i+batch_size}.csv") - df.iloc[i:i+batch_size].to_csv(batch_file_path, index=False, encoding="utf-8") - job = queue.enqueue(run_inference, batch_file_path, model_list_path, job_timeout=1800) - job_ids.append(job.id) - - logger.info(f"Jobs enqueued: {job_ids}") - return {"job_ids": job_ids, "status": "queued"} + # 작업 큐에 추가 + job = queue.enqueue(run_inference, file_path, model_list_path, job_timeout=1800) + logger.info(f"Job enqueued: {job.id}") + return {"job_id": job.id, "status": "queued"} def chat_formating(input_sentence: str, model_name: str): + try: + if "llama" in model_name: + hidden_prompt = LLMInference.llama_template() + if "gemma" in model_name: + hidden_prompt = LLMInference.gemma_template() + if "exaone" in model_name: + hidden_prompt = LLMInference.exaone_template() - if "llama" in model_name: - hidden_prompt = LLMInference.llama_template() - elif "gemma" in model_name: - hidden_prompt = LLMInference.gemma_template() - elif "exaone" in model_name: - hidden_prompt = LLMInference.exaone_template() - else: - raise ValueError("Unknown model name: " + model_name) - - formated_sentence = hidden_prompt.format(input_sent=input_sentence) - logger.info(f"Sentence: {formated_sentence}") - return formated_sentence + formated_sentence = hidden_prompt.format(input_sent=input_sentence) + return formated_sentence + except Exception as e: + logger.error(f"Not formatting input sentence: {e}") + return input_sentence # 모델 추론 함수 -def run_inference(batch_file_path: str, model_list_path: str): +def run_inference(file_path: str, model_list_path: str, batch_size: int = 32): try: - # 워커 ID 확인 - worker_id = os.environ.get("HOSTNAME", "Unknown Worker") - logger.info(f"Worker {worker_id} started inference for batch file: {batch_file_path}") + logger.info(f"Starting inference for file: {file_path} using models from {model_list_path}") # 모델 리스트 읽기 with open(model_list_path, "r") as f: @@ -85,59 +78,62 @@ def run_inference(batch_file_path: str, model_list_path: str): if not model_list: raise ValueError("The model list file is empty.") - # 배치 데이터 읽기 - df = pd.read_csv(batch_file_path, encoding="utf-8") + # CSV 읽기 + df = pd.read_csv(file_path, encoding="euc-kr") if "input" not in df.columns: raise ValueError("The input CSV must contain a column named 'input'.") - + # 에러 발생한 행 저장용 DataFrame 초기화 error_rows = pd.DataFrame(columns=df.columns) - # 추론 수행 + # 각 모델로 추론 for model in model_list: - logger.info(f"Worker {worker_id} loading model: {model}") + model_name = model.split("/")[-1] try: + logger.info(f"Loading model: {model}") llm = LLM(model) torch.cuda.empty_cache() - logger.info(f"Worker {worker_id} loaded model {model} successfully.") + logger.info(f"Model {model} loaded successfully.") except Exception as e: - logger.error(f"Worker {worker_id} error loading model {model}: {e}") + logger.error(f"Error loading model {model}: {e}") continue sampling_params = SamplingParams(max_tokens=50, temperature=0.7, top_p=0.9, top_k=50) - responses = [] - # tqdm 추가: 워커별 모델 진행 상태 표시 - with tqdm(total=len(df), desc=f"[{worker_id}] Model: {model}") as pbar: - model_name = model.split("/")[-1] - for _, row in df.iterrows(): + # 추론 수행 + responses = [] + for i in tqdm(range(0, len(df), batch_size), desc=f"Processing {model}"): + batch = df.iloc[i:i+batch_size] + batch_responses = [] + for _, row in batch.iterrows(): try: - input_text = chat_formating(input_sentence=row["input"], model_name=model_name) - response = llm.generate(input_text, sampling_params)[0].outputs[0].text.strip() - logger.info(f"Model: {model}, Input: {input_text}, Output: {response}") - responses.append(response) + original_input = row["input"] + formating_input = chat_formating(input_sentence=row["input"], model_name=model_name.lower()) + response = llm.generate(formating_input, sampling_params)[0].outputs[0].text.strip() + logger.info(f"Model: {model}, Input: {original_input}, Output: {response}") + batch_responses.append(response) except Exception as e: - logger.error(f"Worker {worker_id} error during inference for model {model}, row {row.name}: {e}") + logger.error(f"Error during inference for model {model}, row {row.name}: {e}") error_rows = pd.concat([error_rows, pd.DataFrame([row])], ignore_index=True) - responses.append(None) - finally: - pbar.update(1) + batch_responses.append(None) + responses.extend(batch_responses) # 결과 추가 df[model_name] = responses + del llm torch.cuda.empty_cache() gc.collect() - # 배치 결과 저장 - output_path = batch_file_path.replace("uploaded", "processed").replace(".csv", "_result.csv") + # 결과 저장 + output_path = file_path.replace("uploaded", "processed").replace(".csv", "_result.csv") os.makedirs("processed", exist_ok=True) df.to_csv(output_path, index=False, encoding="utf-8") - logger.info(f"Worker {worker_id} inference completed for batch. Result saved to: {output_path}") + logger.info(f"Inference completed. Result saved to: {output_path}") # 에러 행 저장 if not error_rows.empty: - error_path = batch_file_path.replace("uploaded", "errors").replace(".csv", "_errors.csv") + error_path = file_path.replace("uploaded", "errors").replace(".csv", "_errors.csv") os.makedirs("errors", exist_ok=True) error_rows.to_csv(error_path, index=False, encoding="utf-8") logger.info(f"Error rows saved to: {error_path}") @@ -145,24 +141,8 @@ def run_inference(batch_file_path: str, model_list_path: str): return output_path except Exception as e: - logger.error(f"Worker {worker_id} error during inference: {e}") + logger.error(f"Error during inference: {e}") raise - -@app.get("/merge-results/") -def merge_results(): - try: - processed_dir = "processed" - all_files = [os.path.join(processed_dir, f) for f in os.listdir(processed_dir) if f.endswith("_result.csv")] - combined_df = pd.concat([pd.read_csv(f, encoding="utf-8") for f in all_files], ignore_index=True) - - final_output_path = os.path.join(processed_dir, "final_result.csv") - combined_df.to_csv(final_output_path, index=False, encoding="utf-8") - - logger.info(f"Final merged result saved to: {final_output_path}") - return {"final_result_path": final_output_path} - except Exception as e: - logger.error(f"Error during merging results: {e}") - return JSONResponse(content={"error": "Failed to merge results."}, status_code=500) # 결과 파일 다운로드 @app.get("/download-latest", response_class=FileResponse) @@ -170,7 +150,6 @@ def download_latest_file(): try: # processed 디렉토리 경로 directory = "processed" - csv_files = [os.path.join(directory, f) for f in os.listdir(directory) if f.endswith(".csv")] if not csv_files: @@ -179,8 +158,7 @@ def download_latest_file(): latest_file = max(csv_files, key=os.path.getctime) logger.info(f"Downloading latest file: {latest_file}") - return FileResponse(latest_file, media_type="application/csv", filename=os.path.basename(latest_file)) except Exception as e: logger.error(f"Error during file download: {e}") - return JSONResponse(content={"error": "Failed to download the latest file."}, status_code=500) \ No newline at end of file + return JSONResponse(content={"error": "Failed to download the latest file."}, status_code=500) diff --git a/workspace/template.py b/workspace/template.py index c8d573f..60936c4 100755 --- a/workspace/template.py +++ b/workspace/template.py @@ -2,17 +2,17 @@ class LLMInference: def __init__(self): pass - @staticmethod + @staticmethod def llama_template(): hidden_prompt = "<|begin_of_text|><|start_header_id|>system<|end_header_id|>\n\n친절한 건설안전전문가로서 반드시 [규칙]을 바탕으로 상대방의 요청에 답변을 생성해주세요.\n[규칙]\n1. 한 문장으로 핵심만 요약해서 답변을 생성합니다.\n2. 모든 답변은 반드시 한 문장의 한국어(Korean)으로 작성합니다.\n3. 생성된 답변의 신뢰성은 1(낮음)~5(높음)으로 평가합니다.4. 답변 형식은 다음과 같습니다. 답변.\n(신뢰성:3)<|eot_id|>\n<|start_header_id|>user<|end_header_id|>\n\n{input_sent}<|eot_id|><|start_header_id|>assistant<|end_header_id|>" return hidden_prompt - - @staticmethod + + @staticmethod def gemma_template(): hidden_prompt = "user\n친절한 건설안전전문가로서 반드시 [규칙]을 바탕으로 상대방의 요청에 답변을 생성해주세요.\n[규칙]\n1. 한 문장으로 핵심만 요약해서 답변을 생성합니다.\n2. 모든 답변은 반드시 한 문장의 한국어(Korean)으로 작성합니다.\n3. 생성된 답변의 신뢰성은 1(낮음)~5(높음)으로 평가합니다.4. 답변 형식은 다음과 같습니다. 답변.\n(신뢰성:3)\n\n{input_sent}\nmodel" return hidden_prompt - + @staticmethod def exaone_template(): hidden_prompt = "[|system|]친절한 건설안전전문가로서 반드시 [규칙]을 바탕으로 상대방의 요청에 답변을 생성해주세요.\n[규칙]\n1. 한 문장으로 핵심만 요약해서 답변을 생성합니다.\n2. 모든 답변은 반드시 한 문장의 한국어(Korean)으로 작성합니다.\n3. 생성된 답변의 신뢰성은 1(낮음)~5(높음)으로 평가합니다.4. 답변 형식은 다음과 같습니다. 답변.\n(신뢰성:3)[|endofturn|]\n\n[|user|]{input_sent}[|endofturn|]\n[|assistant|]" - return hidden_prompt \ No newline at end of file + return hidden_prompt diff --git a/workspace/tests/example.py b/workspace/tests/example.py index 6c4a131..eb89b8b 100755 --- a/workspace/tests/example.py +++ b/workspace/tests/example.py @@ -6,7 +6,13 @@ from vllm import LLM 이렇게 올라온 모델을 사용해 이제 텍스트를 생성하는 text generate를 실행해보겠습니다. 다음과 같이 실행하면 됩니다. """ -llm = LLM(model="yanolja/EEVE-Korean-Instruct-2.8B-v1.0", max_model_len=2048, tensor_parallel_size=1) # 모델로드 +llm = LLM( + model="yanolja/EEVE-Korean-Instruct-2.8B-v1.0", + max_model_len=2048, + tensor_parallel_size=1, +) # 모델로드 -requestoutput = llm.generate("안녕하십니까. 기상 캐스터 어시스턴트입니다. 오늘의 날씨는") # 입력문장 +requestoutput = llm.generate( + "안녕하십니까. 기상 캐스터 어시스턴트입니다. 오늘의 날씨는" +) # 입력문장 print(requestoutput) diff --git a/workspace/worker.py b/workspace/worker.py index 4ed9b5d..16f0b23 100755 --- a/workspace/worker.py +++ b/workspace/worker.py @@ -9,4 +9,4 @@ queue = Queue("model_tasks", connection=redis_conn) if __name__ == "__main__": worker = Worker([queue], connection=redis_conn) - worker.work() \ No newline at end of file + worker.work()