-
Notifications
You must be signed in to change notification settings - Fork 0
/
message.py
92 lines (82 loc) · 4.01 KB
/
message.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
import logging
from datetime import datetime
import disnake
from streamer import Streamer
from aiohttp import ClientSession
from typing import TYPE_CHECKING
from modactions import ModAction
if TYPE_CHECKING:
from messageparser import Parser
class Message:
def __init__(self, parser, raw, streamer, mod_action, ignore, embed, embed_text, **kwargs):
self._parser: Parser = parser
self.__raw_message: dict = raw
self.logging = logging.getLogger("Twitch Pubsub Logging")
self.__streamer: Streamer = streamer
self.__mod_action: str = mod_action
self.__ignore_message: bool = ignore
self.__embed: disnake.Embed = embed
self.__embed_text: str = embed_text
self.__created_at = datetime.utcnow()
self.footer_message: str = "Mew"
@property
def streamer(self):
return self.__streamer
@property
def mod_action(self):
return self.__mod_action
@property
def created_at(self):
return self.__created_at
@property
def embed(self):
return self.__embed
@property
def embed_text(self):
return self.__embed_text
@property
def ignore(self):
return self.__ignore_message
async def send(self, session=None):
close_when_done = False
if session is None:
session = ClientSession()
close_when_done = True
session = session or ClientSession()
webhooks = []
for webhook in self.streamer.webhook_urls:
webhooks.append(disnake.Webhook.from_url(
webhook, session=session))
self.__embed.set_footer(text=self.footer_message, icon_url=self.__streamer.icon)
for webhook in webhooks:
try:
if self.mod_action == ModAction.automod_caught_message and self.__raw_message["data"]["status"] in ["DENIED", "ALLOWED"]:
existing = self._parser.automod_cache.get(self.__raw_message["data"]["message"]["id"], None)
if existing: #If we found the older message in the cache, update it :)
try:
if self._parser.use_embeds:
await existing["message"].edit(embed=self.__embed, allowed_mentions=disnake.AllowedMentions.none())
else:
await existing["message"].edit(content=self.__embed_text, allowed_mentions=disnake.AllowedMentions.none())
except disnake.NotFound:
pass
del self._parser.automod_cache[self.__raw_message["data"]["message"]["id"]]
else: #If it's not in the cache for some reason just send it as normal
if self._parser.use_embeds:
w_message = await webhook.send(embed=self.__embed, allowed_mentions=disnake.AllowedMentions.none())
else:
w_message = await webhook.send(content=self.__embed_text, allowed_mentions=disnake.AllowedMentions.none())
else:
if self._parser.use_embeds:
w_message = await webhook.send(embed=self.__embed, allowed_mentions=disnake.AllowedMentions.none(), wait=True)
else:
w_message = await webhook.send(content=self.__embed_text, allowed_mentions=disnake.AllowedMentions.none(), wait=True)
if self.mod_action == ModAction.automod_caught_message and self.__raw_message["data"]["status"] in ["PENDING"]:
self._parser.automod_cache.update({self.__raw_message["data"]["message"]["id"]: {"object": self, "message": w_message}})
except disnake.NotFound:
self.logging.warning(
f"Webhook not found for {self.streamer.username}")
except disnake.HTTPException as e:
self.logging.error(f"HTTP Exception sending webhook: {e}")
if close_when_done:
await session.close()