124 lines
4.9 KiB
Python
124 lines
4.9 KiB
Python
import csv
|
|
|
|
from elasticsearch import Elasticsearch
|
|
from elasticsearch.helpers import bulk
|
|
|
|
# --- 설정 ---
|
|
ELASTICSEARCH_HOSTS = ["http://localhost:9200"] # Elasticsearch 주소 (필요시 수정)
|
|
INDEX_NAME = "my-user-index" # 사용할 인덱스 이름
|
|
CSV_FILE_PATH = "dummy_data2.csv" # CSV 파일 경로
|
|
# --- 설정 끝 ---
|
|
|
|
# Elasticsearch 클라이언트 생성
|
|
try:
|
|
es = Elasticsearch(ELASTICSEARCH_HOSTS)
|
|
# 연결 테스트 (선택 사항)
|
|
if not es.ping():
|
|
raise ValueError("Elasticsearch 연결 실패!")
|
|
print("Elasticsearch 연결 성공!")
|
|
except Exception as e:
|
|
print(f"Elasticsearch 연결 중 오류 발생: {e}")
|
|
exit()
|
|
|
|
|
|
def generate_actions(csv_filepath, index_name):
|
|
"""CSV 파일을 읽어 Elasticsearch bulk API 액션을 생성합니다."""
|
|
try:
|
|
with open(csv_filepath, "r", encoding="utf-8") as f:
|
|
reader = csv.reader(f)
|
|
header = next(reader) # 헤더 읽기
|
|
|
|
for row in reader:
|
|
if not row: # 빈 줄 건너뛰기
|
|
continue
|
|
|
|
doc = {}
|
|
try:
|
|
# 헤더와 데이터 매핑
|
|
doc = dict(zip(header, row))
|
|
|
|
# 데이터 타입 변환 (age를 정수로)
|
|
if "age" in doc and doc["age"]: # 값이 있을 때만 변환 시도
|
|
try:
|
|
doc["age"] = int(doc["age"])
|
|
except ValueError:
|
|
print(
|
|
f"경고: 'age' 필드 정수 변환 실패 (값: {doc['age']}). 행: {row}"
|
|
)
|
|
# age 변환 실패 시 처리: None으로 설정하거나, 로그만 남기고 진행하거나 선택
|
|
doc["age"] = None # 예시: None으로 설정
|
|
elif "age" in doc and not doc["age"]:
|
|
doc["age"] = None # 빈 문자열이면 None으로 설정
|
|
|
|
# created_at은 Elasticsearch가 자동으로 date 타입으로 인식할 수 있음
|
|
# (인덱스 매핑 설정에 따라 다를 수 있음)
|
|
|
|
# Bulk API 형식에 맞게 생성
|
|
yield {
|
|
"_index": index_name,
|
|
"_id": doc.get("id"), # CSV의 id를 Elasticsearch 문서 ID로 사용
|
|
"_source": doc,
|
|
}
|
|
except ValueError as ve:
|
|
print(f"데이터 변환 오류 (행 건너뜀): {row} - {ve}")
|
|
continue
|
|
except Exception as ex:
|
|
print(f"행 처리 중 오류 발생 (행 건너뜀): {row} - {ex}")
|
|
continue
|
|
|
|
except FileNotFoundError:
|
|
print(f"오류: CSV 파일 '{csv_filepath}'를 찾을 수 없습니다.")
|
|
exit()
|
|
except Exception as e:
|
|
print(f"CSV 파일 읽기 중 오류 발생: {e}")
|
|
exit()
|
|
|
|
|
|
# --- 실행 ---
|
|
print(
|
|
f"'{CSV_FILE_PATH}' 파일에서 데이터를 읽어 '{INDEX_NAME}' 인덱스로 벌크 삽입을 시작합니다..."
|
|
)
|
|
|
|
try:
|
|
# bulk 함수를 사용하여 데이터 삽입
|
|
success, failed_items = bulk(
|
|
es,
|
|
generate_actions(CSV_FILE_PATH, INDEX_NAME),
|
|
chunk_size=500,
|
|
request_timeout=60,
|
|
raise_on_error=False,
|
|
raise_on_exception=False,
|
|
) # 에러 발생 시 중단하지 않도록 설정 추가
|
|
|
|
# 실패 건수 계산 (failed_items 리스트의 길이)
|
|
num_failed = len(failed_items)
|
|
|
|
print(f"벌크 삽입 완료: 성공={success}, 실패={num_failed}")
|
|
|
|
# *** 수정된 부분 ***
|
|
# failed_items 리스트가 비어있지 않은지 확인 (즉, 실패한 항목이 있는지 확인)
|
|
if failed_items: # 또는 if num_failed > 0:
|
|
print(
|
|
f"실패한 항목 {num_failed}개가 있습니다. 상세 내용은 아래와 같습니다 (일부만 표시될 수 있음):"
|
|
)
|
|
# 실패 상세 내용 출력 (예: 처음 5개만)
|
|
for i, item in enumerate(failed_items):
|
|
if i < 5:
|
|
print(f" - {item}")
|
|
else:
|
|
print(f" ... (총 {num_failed}개 중 {i}개 표시)")
|
|
break
|
|
# 필요하다면 모든 실패 항목을 로그 파일 등에 기록할 수 있습니다.
|
|
else:
|
|
# 인덱스 새로고침 (데이터가 검색 가능하도록) - 실패가 없을 때만 수행하거나 항상 수행하도록 선택 가능
|
|
try:
|
|
es.indices.refresh(index=INDEX_NAME)
|
|
print(f"인덱스 '{INDEX_NAME}' 새로고침 완료.")
|
|
except Exception as refresh_err:
|
|
print(f"인덱스 새로고침 중 오류 발생: {refresh_err}")
|
|
|
|
|
|
except Exception as e:
|
|
# bulk 함수 자체에서 예외가 발생한 경우 (예: 연결 문제)
|
|
print(f"벌크 삽입 중 예상치 못한 오류 발생: {e}")
|