-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathdiscord_test.py
468 lines (412 loc) · 17.4 KB
/
discord_test.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
import pytz
from selenium import webdriver
from webdriver_manager.chrome import ChromeDriverManager
from selenium.webdriver import Keys, ActionChains
from selenium.webdriver.common.by import By
from selenium.webdriver.chrome.service import Service
from bs4 import BeautifulSoup as bs
from selenium.webdriver.chrome.options import Options
from kafka import KafkaConsumer, KafkaProducer
from collections import deque
import json
from slack_sdk import WebClient
from slack_sdk.errors import SlackApiError
import logging
import re
import random
import time
import concurrent.futures
from datetime import datetime
import psycopg2
from psycopg2 import sql
from dotenv import load_dotenv
import os
import base64
import requests
from scanner import PAGES, SITES
import discord
from discord import app_commands
from discord.ext import commands
import asyncio
import threading
from collections import defaultdict
load_dotenv()
DISCORD_TOKEN = os.environ.get("DISCORD_TOKEN")
SLACK_WEBHOOK_URL = os.environ.get("SLACK_WEBHOOK_URL")
SLACK_TOKEN = os.environ.get("SLACK_TOKEN")
SLACK_CHANNEL_ID = os.environ.get("SLACK_CHANNEL_ID")
DISCORD_WEBHOOK = os.environ.get("DISCORD_WEBHOOK")
DB_NAME = os.environ.get("DB_NAME")
DB_USER = os.environ.get("DB_USER")
DB_PASSWORD = os.environ.get("DB_PASSWORD")
DB_HOST = os.environ.get("DB_HOST")
DB_PORT = os.environ.get("DB_PORT")
# Logging 설정
logging.basicConfig(filename='discord.log', level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
# Discord bot 초기화
class ChannelManager:
def __init__(self):
self.init_db()
def get_connection(self):
"""데이터베이스 연결 생성"""
return psycopg2.connect(
dbname=DB_NAME,
user=DB_USER,
password=DB_PASSWORD,
host=DB_HOST,
port=DB_PORT
)
def init_db(self):
"""데이터베이스 초기화"""
conn = self.get_connection()
try:
with conn.cursor() as cur:
# 채널 테이블 생성
cur.execute('''
CREATE TABLE IF NOT EXISTS channels (
channel_id BIGINT PRIMARY KEY,
guild_id BIGINT NOT NULL,
channel_name TEXT NOT NULL,
guild_name TEXT NOT NULL,
category TEXT,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
last_message_at TIMESTAMP,
message_count INTEGER DEFAULT 0,
is_active BOOLEAN DEFAULT TRUE
)
''')
# 메시지 로그 테이블 생성
cur.execute('''
CREATE TABLE IF NOT EXISTS message_logs (
id SERIAL PRIMARY KEY,
channel_id BIGINT,
message TEXT NOT NULL,
sent_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
status TEXT,
error_message TEXT,
FOREIGN KEY (channel_id) REFERENCES channels (channel_id)
)
''')
conn.commit()
finally:
conn.close()
def add_channel(self, channel_id, guild_id, channel_name, guild_name, category=None):
"""채널 추가"""
conn = self.get_connection()
try:
with conn.cursor() as cur:
cur.execute('''
INSERT INTO channels
(channel_id, guild_id, channel_name, guild_name, category)
VALUES (%s, %s, %s, %s, %s)
ON CONFLICT (channel_id)
DO UPDATE SET
guild_id = EXCLUDED.guild_id,
channel_name = EXCLUDED.channel_name,
guild_name = EXCLUDED.guild_name,
category = EXCLUDED.category,
is_active = TRUE
''', (channel_id, guild_id, channel_name, guild_name, category))
conn.commit()
logging.info(f"채널 추가 완료 {guild_name} / {channel_name} channel id:{channel_id}")
return True
except Exception as e:
logging.error(f"채널 추가 중 오류 발생: {e}")
return False
finally:
conn.close()
def remove_channel(self, channel_id):
"""채널 제거 (비활성화)"""
conn = self.get_connection()
try:
with conn.cursor() as cur:
cur.execute('''
UPDATE channels
SET is_active = FALSE
WHERE channel_id = %s
''', (channel_id,))
conn.commit()
logging.info(f"채널 삭제 완료 channel id:{channel_id}")
return True
except Exception as e:
logging.error(f"채널 제거 중 오류 발생: {e}")
return False
finally:
conn.close()
def get_active_channels(self):
"""활성화된 채널 목록 조회"""
conn = self.get_connection()
try:
with conn.cursor() as cur:
cur.execute('SELECT channel_id FROM channels WHERE is_active = TRUE')
return [row[0] for row in cur.fetchall()]
finally:
conn.close()
def log_message(self, channel_id, message, status, error_message=None):
"""메시지 전송 로그 기록"""
conn = self.get_connection()
try:
with conn.cursor() as cur:
cur.execute('''
INSERT INTO message_logs (channel_id, message, status, error_message)
VALUES (%s, %s, %s, %s)
''', (channel_id, str(message), status, error_message))
if status == 'success':
cur.execute('''
UPDATE channels
SET message_count = message_count + 1,
last_message_at = CURRENT_TIMESTAMP
WHERE channel_id = %s
''', (channel_id,))
conn.commit()
except Exception as e:
logging.error(f"메시지 로깅 중 오류 발생: {e}")
finally:
conn.close()
def get_channel_stats(self, channel_id):
"""채널 통계 조회"""
conn = self.get_connection()
try:
with conn.cursor() as cur:
cur.execute('''
SELECT channel_name, guild_name, message_count, last_message_at
FROM channels
WHERE channel_id = %s
''', (channel_id,))
return cur.fetchone()
finally:
conn.close()
def add_keyword(self, channel_id, user_id, keyword):
conn = self.get_connection()
try:
with conn.cursor() as cur:
cur.execute("SELECT keyword FROM channel_keyword WHERE channel_id = %s AND user_id = %s AND keyword = %s", (channel_id, user_id, keyword))
exist = cur.fetchone()
if exist:
return f"{keyword}는 이미 등록된 키워드입니다."
else:
cur.execute("INSERT INTO channel_keyword (channel_id, user_id, keyword) VALUES (%s, %s, %s)", (channel_id, user_id, keyword))
conn.commit()
return f"{keyword} 키워드 등록 완료."
except Exception as e:
logging.error(f"키워드 등록 중 오류 발생: {e}")
return (f"키워드 등록 중 오류가 발생했습니다: {e}")
finally:
conn.close()
def del_keyword(self, channel_id, user_id, keyword):
conn = self.get_connection()
try:
with conn.cursor() as cur:
cur.execute("SELECT keyword FROM channel_keyword WHERE channel_id = %s AND user_id = %s AND keyword = %s", (channel_id, user_id, keyword))
exist = cur.fetchone()
if not exist:
return f"{keyword}는 등록되지 않은 키워드입니다."
else:
cur.execute("DELETE FROM channel_keyword WHERE channel_id = %s AND user_id = %s AND keyword = %s", (channel_id, user_id, keyword))
conn.commit()
return f"{keyword} 키워드 삭제 완료."
except Exception as e:
logging.error(f"키워드 삭제 중 오류 발생: {e}")
return (f"키워드 삭제 중 오류가 발생했습니다: {e}")
finally:
conn.close()
def get_keyword(self, channel_id):
conn = self.get_connection()
try:
with conn.cursor() as cur:
cur.execute("SELECT keyword FROM channel_keyword WHERE channel_id = %s", (channel_id,))
keywords = cur.fetchall()
if not keywords:
return f"등록된 키워드가 없습니다."
else:
keyword_list = ', '.join(keyword[0] for keyword in keywords)
return f"등록된 키워드 : {keyword_list}"
except Exception as e:
logging.error(f"키워드 조회 중 오류 발생: {e}")
return (f"키워드 조회 중 오류가 발생했습니다: {e}")
finally:
conn.close()
def get_keyword_info(self, message, keywords):
conn = self.get_connection()
info_dict = defaultdict(list)
try:
for kw in keywords:
if kw in message["item_name"] or kw in message["content"]:
with conn.cursor() as cur:
cur.execute("SELECT user_id, channel_id FROM channel_keyword WHERE keyword = %s", (kw,))
info = cur.fetchall()
for user_id, channel_id in info:
info_dict[channel_id].append(user_id)
return info_dict
except Exception as e:
logging.error(f"키워드 등록 유저조회 중 오류 발생: {e}")
return []
finally:
conn.close()
def get_keyword_list(self):
conn = self.get_connection()
try:
with conn.cursor() as cur:
cur.execute("SELECT keyword FROM channel_keyword")
keywords = cur.fetchall()
return [i[0] for i in keywords]
except Exception as e:
logging.error(f"키워드 조회 중 오류 발생: {e}")
return []
finally:
conn.close()
# Discord bot 초기화
intents = discord.Intents.default()
intents.message_content = True
intents.guilds = True
intents.members = True
bot = commands.Bot(command_prefix='/', intents=intents)
bot.remove_command("help")
channel_manager = ChannelManager()
@bot.event
async def on_ready():
logging.info(f'{bot.user} 로 로그인했습니다!')
# Kafka consumer 시작
threading.Thread(target=run_kafka_consumer, daemon=True).start()
@bot.event
async def on_guild_join(guild):
# 봇이 새로운 서버에 초대되었을 때
channel = guild.system_channel # 시스템 채널을 가져옵니다.
# 시스템 채널이 존재하는 경우 안내 메시지를 보냅니다.
if channel is not None:
await channel.send(
f"안녕하세요! */help*"
)
@bot.command(usage = "/add_keyword 키워드")
async def add_keyword(ctx, *, keyword):
"""알람 keyword 등록"""
result = channel_manager.add_keyword(ctx.channel.id, ctx.author.id, keyword)
await ctx.send(result)
@bot.command(usage = "/del_keyword 키워드")
async def del_keyword(ctx, *, keyword):
"""알람 keyword 삭제"""
result = channel_manager.del_keyword(ctx.channel.id, ctx.author.id, keyword)
await ctx.send(result)
@bot.command(usage = "/get_keyword")
async def get_keyword(ctx):
"""등록된 keyword 가져오기 """
result = channel_manager.get_keyword(ctx.channel.id)
await ctx.send(result)
@bot.command()
async def help(ctx):
"""help message 전송"""
help_message=f"""
- 채널 등록: /register
- 채널 해제: /unregister
- 키워드 등록: /add_keyword 키워드
- 키워드 해제: /del_keyword 키워드
- 등록한 키워드 보기: /get_keyword
키워드 등록 시 키워드가 포함된 글은 {ctx.author.mention} 멘션이 갑니다!
"""
await ctx.send(help_message)
@bot.command(name="register", description="이 채널에 핫딜정보를 출력합니다.")
async def register(ctx):
"""현재 채널을 등록"""
channel = ctx.channel
category_name = channel.category.name if channel.category else None
success = channel_manager.add_channel(
channel.id,
ctx.guild.id,
channel.name,
ctx.guild.name,
category_name
)
if success:
await ctx.send(f'채널이 등록되었습니다: {channel.name}')
else:
await ctx.send('채널 등록에 실패했습니다.')
### thread에 keyword 작성 -> emoji 반응한 사람들만 알림가게
# threads = channel.threads
# thread_exists = any(thread.name == "keyword" for thread in threads)
# if not thread_exists:
# new_thread = await channel.create_thread(
# name = "keyword",
# auto_archive_duration = 0
# )
# await new_thread.send(f"keyword thread 생성")
# else:
# await ctx.send(f"keyword thread가 이미 존재합니다.")
@bot.command()
async def unregister(ctx):
"""현재 채널을 등록 해제"""
success = channel_manager.remove_channel(ctx.channel.id)
if success:
await ctx.send(f'채널이 등록 해제되었습니다: {ctx.channel.name}')
else:
await ctx.send('채널 등록 해제에 실패했습니다.')
# @bot.command()
# async def stats(ctx):
# """채널 통계 조회"""
# stats = channel_manager.get_channel_stats(ctx.channel.id)
# if stats:
# channel_name, guild_name, msg_count, last_msg = stats
# await ctx.send(f'''
# 채널 통계:
# - 채널명: {channel_name}
# - 서버명: {guild_name}
# - 전송된 메시지 수: {msg_count}
# - 마지막 메시지 전송: {last_msg}
# ''')
# else:
# await ctx.send('채널 통계를 찾을 수 없습니다.')
def transform_message(message):
content = message["content"]
content = re.sub(r"\n+", "\n", content.strip())
embed = discord.Embed(title=f"{message['item_name']}", description=f"{message['site']}", timestamp=datetime.now(pytz.timezone("UTC")))
embed.add_field(name="원문 링크", value=message["item_link"], inline=True)
embed.add_field(name="구매 링크", value=message["shopping_mall_link"], inline=True)
embed.add_field(name="본문", value=content[:1024], inline=False)
return embed
async def send_message_to_channels(message_content):
"""등록된 채널에 메시지 전송"""
channels = channel_manager.get_active_channels()
keywords = channel_manager.get_keyword_list()
info_dict = channel_manager.get_keyword_info(message_content, keywords)
embed = transform_message(message_content)
for channel_id in channels:
channel = bot.get_channel(channel_id)
if channel:
try:
mention_user = info_dict[channel_id]
await channel.send(embed=embed)
if mention_user:
for user_id in set(mention_user):
user = channel.guild.get_member(user_id)
await channel.send(f"{user.mention} {embed.title}")
channel_manager.log_message(channel_id, message_content, 'success')
logging.info(f'메시지 전송 성공: {channel.guild.name}/{channel.name}')
except Exception as e:
error_msg = str(e)
channel_manager.log_message(channel_id, message_content, 'failed', error_msg)
logging.error(f'채널 {channel.guild.name}/{channel.name}에 메시지 전송 실패: {error_msg}')
def run_kafka_consumer():
"""Kafka consumer 실행"""
logging.info("Kafka consumer 시작")
consumer = KafkaConsumer(
'transformed_message', # 토픽명
bootstrap_servers=['localhost:29092'], # 카프카 브로커 주소 리스트
auto_offset_reset='earliest', # 오프셋 위치(earliest:가장 처음, latest: 가장 최근)
enable_auto_commit=True, # 오프셋 자동 커밋 여부
group_id = "discord_bot_test",
value_deserializer=lambda x: json.loads(x.decode('utf-8')), # 메시지의 값 역직렬화,
key_deserializer=lambda x: json.loads(x.decode('utf-8')), # 키의 값 역직렬화
)
for message in consumer:
status = message.key
discord_message = message.value
logging.info(f"Kafka 메시지 수신: {discord_message}")
if status == "success":
asyncio.run_coroutine_threadsafe(
send_message_to_channels(discord_message),
bot.loop
)
def main():
"""메인 함수"""
bot.run(DISCORD_TOKEN)
if __name__ == "__main__":
main()