import csv from elasticsearch import Elasticsearch from elasticsearch.helpers import bulk # --- 설정 --- ELASTICSEARCH_HOSTS = ["http://localhost:9200"] # Elasticsearch 주소 (필요시 수정) INDEX_NAME = "my-user-index" # 사용할 인덱스 이름 CSV_FILE_PATH = "dummy_data2.csv" # CSV 파일 경로 # --- 설정 끝 --- # Elasticsearch 클라이언트 생성 try: es = Elasticsearch(ELASTICSEARCH_HOSTS) # 연결 테스트 (선택 사항) if not es.ping(): raise ValueError("Elasticsearch 연결 실패!") print("Elasticsearch 연결 성공!") except Exception as e: print(f"Elasticsearch 연결 중 오류 발생: {e}") exit() def generate_actions(csv_filepath, index_name): """CSV 파일을 읽어 Elasticsearch bulk API 액션을 생성합니다.""" try: with open(csv_filepath, "r", encoding="utf-8") as f: reader = csv.reader(f) header = next(reader) # 헤더 읽기 for row in reader: if not row: # 빈 줄 건너뛰기 continue doc = {} try: # 헤더와 데이터 매핑 doc = dict(zip(header, row)) # 데이터 타입 변환 (age를 정수로) if "age" in doc and doc["age"]: # 값이 있을 때만 변환 시도 try: doc["age"] = int(doc["age"]) except ValueError: print( f"경고: 'age' 필드 정수 변환 실패 (값: {doc['age']}). 행: {row}" ) # age 변환 실패 시 처리: None으로 설정하거나, 로그만 남기고 진행하거나 선택 doc["age"] = None # 예시: None으로 설정 elif "age" in doc and not doc["age"]: doc["age"] = None # 빈 문자열이면 None으로 설정 # created_at은 Elasticsearch가 자동으로 date 타입으로 인식할 수 있음 # (인덱스 매핑 설정에 따라 다를 수 있음) # Bulk API 형식에 맞게 생성 yield { "_index": index_name, "_id": doc.get("id"), # CSV의 id를 Elasticsearch 문서 ID로 사용 "_source": doc, } except ValueError as ve: print(f"데이터 변환 오류 (행 건너뜀): {row} - {ve}") continue except Exception as ex: print(f"행 처리 중 오류 발생 (행 건너뜀): {row} - {ex}") continue except FileNotFoundError: print(f"오류: CSV 파일 '{csv_filepath}'를 찾을 수 없습니다.") exit() except Exception as e: print(f"CSV 파일 읽기 중 오류 발생: {e}") exit() # --- 실행 --- print( f"'{CSV_FILE_PATH}' 파일에서 데이터를 읽어 '{INDEX_NAME}' 인덱스로 벌크 삽입을 시작합니다..." ) try: # bulk 함수를 사용하여 데이터 삽입 success, failed_items = bulk( es, generate_actions(CSV_FILE_PATH, INDEX_NAME), chunk_size=500, request_timeout=60, raise_on_error=False, raise_on_exception=False, ) # 에러 발생 시 중단하지 않도록 설정 추가 # 실패 건수 계산 (failed_items 리스트의 길이) num_failed = len(failed_items) print(f"벌크 삽입 완료: 성공={success}, 실패={num_failed}") # *** 수정된 부분 *** # failed_items 리스트가 비어있지 않은지 확인 (즉, 실패한 항목이 있는지 확인) if failed_items: # 또는 if num_failed > 0: print( f"실패한 항목 {num_failed}개가 있습니다. 상세 내용은 아래와 같습니다 (일부만 표시될 수 있음):" ) # 실패 상세 내용 출력 (예: 처음 5개만) for i, item in enumerate(failed_items): if i < 5: print(f" - {item}") else: print(f" ... (총 {num_failed}개 중 {i}개 표시)") break # 필요하다면 모든 실패 항목을 로그 파일 등에 기록할 수 있습니다. else: # 인덱스 새로고침 (데이터가 검색 가능하도록) - 실패가 없을 때만 수행하거나 항상 수행하도록 선택 가능 try: es.indices.refresh(index=INDEX_NAME) print(f"인덱스 '{INDEX_NAME}' 새로고침 완료.") except Exception as refresh_err: print(f"인덱스 새로고침 중 오류 발생: {refresh_err}") except Exception as e: # bulk 함수 자체에서 예외가 발생한 경우 (예: 연결 문제) print(f"벌크 삽입 중 예상치 못한 오류 발생: {e}")