-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmariadb_to_elasticsearch.py
108 lines (90 loc) · 3.35 KB
/
mariadb_to_elasticsearch.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
from datetime import datetime
from elasticsearch import Elasticsearch, helpers
from decouple import config
from db_connector import get_db_connection
import time
from prettytable import PrettyTable
# Elasticsearch 클라이언트 설정
es = Elasticsearch(
hosts=[{
"host": config('ES_HOST'),
"port": config('ES_PORT', default=9200, cast=int),
"scheme": "http"
}],
http_auth=(config('ES_USERNAME'), config('ES_PASSWORD'))
)
# MariaDB 연결
mydb = get_db_connection()
cursor = mydb.cursor(dictionary=True)
# Elasticsearch 인덱스명
es_index = 'post'
# 한 번에 처리할 레코드 수를 정의
batch_size = 10000
# 작업 시간 측정 위한 딕셔너리
time_dict = {
'Elasticsearch에 데이터 인덱싱': 0,
}
try:
# 총 레코드 수 가져옴
cursor.execute("SELECT COUNT(*) FROM post")
total_count = cursor.fetchone()['COUNT(*)']
print(f"Total records to process: {total_count}")
# 처리할 전체 페이지 수를 계산
pages = total_count // batch_size + (1 if total_count % batch_size else 0)
indexing_start_time = time.time()
for page in range(pages):
offset = page * batch_size
# JOIN 쿼리를 사용하여 필요한 정보를 가져옴
query = """
SELECT p.*, m.nickname AS writerNickname, m.profile_image AS writerProfileImage,
(SELECT COUNT(*) FROM comment c WHERE c.post_id = p.post_id) AS commentCount
FROM post p
JOIN member m ON p.member_id = m.member_id
LIMIT %s OFFSET %s
"""
cursor.execute(query, (batch_size, offset))
rows = cursor.fetchall()
# 각 레코드에 대한 액션을 생성
actions = [
{
"_index": es_index,
"_source": {
'postId': row['post_id'],
'memberId': row['member_id'],
'writerNickname': row['writerNickname'],
'writerProfileImage': row['writerProfileImage'],
'commentCount': row['commentCount'],
'topic': row['topic'],
'title': row['title'],
'content': row['content'],
'likes': row['likes'],
'clicks': row['clicks'],
'bookmarks': row['bookmarks'],
'status': row['status'],
'createdAt': int(row['created_at'].timestamp() * 1000),
'modifiedAt': int(row['modified_at'].timestamp() * 1000),
'@timestamp': datetime.utcnow().isoformat()
}
}
for row in rows
]
# Bulk 인덱싱 실행
helpers.bulk(es, actions)
print(f"Batch {page + 1}/{pages} indexed.")
indexing_end_time = time.time()
time_dict['Elasticsearch에 데이터 인덱싱'] = indexing_end_time - indexing_start_time
except Exception as e:
print(f"Error: {e}")
finally:
if mydb.is_connected():
cursor.close()
mydb.close()
# 총 소요 시간을 계산하여 추가
total_time = sum(time_dict.values())
time_dict['Total'] = total_time
# 소요시간 측정 표 출력
table = PrettyTable()
table.field_names = ["작업", "소요 시간 (초)"]
for key, value in time_dict.items():
table.add_row([key, f"{value:.2f}"])
print(table)