Files
elasticsearch/dumy_insert.py
2025-03-27 16:11:09 +09:00

124 lines
4.9 KiB
Python

import csv
from elasticsearch import Elasticsearch
from elasticsearch.helpers import bulk
# --- 설정 ---
ELASTICSEARCH_HOSTS = ["http://localhost:9200"] # Elasticsearch 주소 (필요시 수정)
INDEX_NAME = "my-user-index" # 사용할 인덱스 이름
CSV_FILE_PATH = "dummy_data2.csv" # CSV 파일 경로
# --- 설정 끝 ---
# Elasticsearch 클라이언트 생성
try:
es = Elasticsearch(ELASTICSEARCH_HOSTS)
# 연결 테스트 (선택 사항)
if not es.ping():
raise ValueError("Elasticsearch 연결 실패!")
print("Elasticsearch 연결 성공!")
except Exception as e:
print(f"Elasticsearch 연결 중 오류 발생: {e}")
exit()
def generate_actions(csv_filepath, index_name):
"""CSV 파일을 읽어 Elasticsearch bulk API 액션을 생성합니다."""
try:
with open(csv_filepath, "r", encoding="utf-8") as f:
reader = csv.reader(f)
header = next(reader) # 헤더 읽기
for row in reader:
if not row: # 빈 줄 건너뛰기
continue
doc = {}
try:
# 헤더와 데이터 매핑
doc = dict(zip(header, row))
# 데이터 타입 변환 (age를 정수로)
if "age" in doc and doc["age"]: # 값이 있을 때만 변환 시도
try:
doc["age"] = int(doc["age"])
except ValueError:
print(
f"경고: 'age' 필드 정수 변환 실패 (값: {doc['age']}). 행: {row}"
)
# age 변환 실패 시 처리: None으로 설정하거나, 로그만 남기고 진행하거나 선택
doc["age"] = None # 예시: None으로 설정
elif "age" in doc and not doc["age"]:
doc["age"] = None # 빈 문자열이면 None으로 설정
# created_at은 Elasticsearch가 자동으로 date 타입으로 인식할 수 있음
# (인덱스 매핑 설정에 따라 다를 수 있음)
# Bulk API 형식에 맞게 생성
yield {
"_index": index_name,
"_id": doc.get("id"), # CSV의 id를 Elasticsearch 문서 ID로 사용
"_source": doc,
}
except ValueError as ve:
print(f"데이터 변환 오류 (행 건너뜀): {row} - {ve}")
continue
except Exception as ex:
print(f"행 처리 중 오류 발생 (행 건너뜀): {row} - {ex}")
continue
except FileNotFoundError:
print(f"오류: CSV 파일 '{csv_filepath}'를 찾을 수 없습니다.")
exit()
except Exception as e:
print(f"CSV 파일 읽기 중 오류 발생: {e}")
exit()
# --- 실행 ---
print(
f"'{CSV_FILE_PATH}' 파일에서 데이터를 읽어 '{INDEX_NAME}' 인덱스로 벌크 삽입을 시작합니다..."
)
try:
# bulk 함수를 사용하여 데이터 삽입
success, failed_items = bulk(
es,
generate_actions(CSV_FILE_PATH, INDEX_NAME),
chunk_size=500,
request_timeout=60,
raise_on_error=False,
raise_on_exception=False,
) # 에러 발생 시 중단하지 않도록 설정 추가
# 실패 건수 계산 (failed_items 리스트의 길이)
num_failed = len(failed_items)
print(f"벌크 삽입 완료: 성공={success}, 실패={num_failed}")
# *** 수정된 부분 ***
# failed_items 리스트가 비어있지 않은지 확인 (즉, 실패한 항목이 있는지 확인)
if failed_items: # 또는 if num_failed > 0:
print(
f"실패한 항목 {num_failed}개가 있습니다. 상세 내용은 아래와 같습니다 (일부만 표시될 수 있음):"
)
# 실패 상세 내용 출력 (예: 처음 5개만)
for i, item in enumerate(failed_items):
if i < 5:
print(f" - {item}")
else:
print(f" ... (총 {num_failed}개 중 {i}개 표시)")
break
# 필요하다면 모든 실패 항목을 로그 파일 등에 기록할 수 있습니다.
else:
# 인덱스 새로고침 (데이터가 검색 가능하도록) - 실패가 없을 때만 수행하거나 항상 수행하도록 선택 가능
try:
es.indices.refresh(index=INDEX_NAME)
print(f"인덱스 '{INDEX_NAME}' 새로고침 완료.")
except Exception as refresh_err:
print(f"인덱스 새로고침 중 오류 발생: {refresh_err}")
except Exception as e:
# bulk 함수 자체에서 예외가 발생한 경우 (예: 연결 문제)
print(f"벌크 삽입 중 예상치 못한 오류 발생: {e}")