-
Notifications
You must be signed in to change notification settings - Fork 49
/
log_delivery.py
155 lines (132 loc) · 5.64 KB
/
log_delivery.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
import logging
import time
import uuid
from typing import Any, Optional
from .boto3_proxy import SessionProxy
from .utils import HandlerRequest, HookInvocationRequest
class ProviderFilter(logging.Filter):
def __init__(self, provider: str):
super().__init__()
self.provider = provider
def filter(self, record: logging.LogRecord) -> bool:
return not record.name.startswith(self.provider)
class ProviderLogHandler(logging.Handler):
def __init__(
self, group: str, stream: str, session: SessionProxy, *args: Any, **kwargs: Any
):
super().__init__(*args, **kwargs)
self.group = group
self.stream = stream.replace(":", "__")
self.client = session.client("logs")
self.sequence_token = "" # nosec
@classmethod
def _get_existing_logger(cls) -> Optional["ProviderLogHandler"]:
for handler in logging.getLogger().handlers:
if isinstance(handler, cls):
return handler
return None
@classmethod
def setup(
cls,
request: HandlerRequest,
provider_sess: Optional[SessionProxy],
log_format: Optional[logging.Formatter] = None,
) -> None:
log_group = request.requestData.providerLogGroupName
if request.stackId and request.requestData.logicalResourceId:
stream_name = f"{request.stackId}/{request.requestData.logicalResourceId}"
else:
stream_name = f"{request.awsAccountId}-{request.region}"
log_handler = cls._get_existing_logger()
if provider_sess and log_group and request.resourceType:
if log_handler:
# This is a re-used lambda container, log handler is already setup, so
# we just refresh the client with new creds
log_handler.client = provider_sess.client("logs")
return
# filter provider messages from platform
provider = request.resourceType.replace("::", "_").lower()
log_handler = cls(
group=log_group, stream=stream_name, session=provider_sess
)
if log_format:
log_handler.setFormatter(log_format)
# add log handler to root, so that provider gets plugin logs too
logging.getLogger().addHandler(log_handler)
logging.getLogger().handlers[0].addFilter(ProviderFilter(provider))
def _create_log_group(self) -> None:
try:
self.client.create_log_group(logGroupName=self.group)
except self.client.exceptions.ResourceAlreadyExistsException:
pass
def _create_log_stream(self) -> None:
try:
self.client.create_log_stream(
logGroupName=self.group, logStreamName=self.stream
)
except self.client.exceptions.ResourceAlreadyExistsException:
pass
def _put_log_event(self, msg: logging.LogRecord) -> None:
kwargs = {
"logGroupName": self.group,
"logStreamName": self.stream,
"logEvents": [
{"timestamp": round(time.time() * 1000), "message": self.format(msg)}
],
}
if self.sequence_token:
kwargs["sequenceToken"] = self.sequence_token
try:
self.sequence_token = self.client.put_log_events(**kwargs)[
"nextSequenceToken"
]
except (
self.client.exceptions.DataAlreadyAcceptedException,
self.client.exceptions.InvalidSequenceTokenException,
) as e:
self.sequence_token = str(e).rsplit(" ", maxsplit=1)[-1]
self._put_log_event(msg)
def emit(self, record: logging.LogRecord) -> None:
try:
self._put_log_event(record)
except self.client.exceptions.ResourceNotFoundException as e:
if "log group does not exist" in str(e):
self._create_log_group()
self._create_log_stream()
self._put_log_event(record)
class HookProviderLogHandler(ProviderLogHandler):
@classmethod
def _get_existing_logger(cls) -> Optional["HookProviderLogHandler"]:
for handler in logging.getLogger().handlers:
if isinstance(handler, cls):
return handler
return None
@classmethod
def setup( # type: ignore
cls,
request: HookInvocationRequest,
provider_sess: Optional[SessionProxy],
log_format: Optional[logging.Formatter] = None,
) -> None:
log_group = request.requestData.providerLogGroupName
if request.stackId and request.requestData.targetLogicalId:
stream_name = f"{request.stackId}/{request.requestData.targetLogicalId}"
else:
stream_name = f"{request.awsAccountId}-{uuid.uuid4()}"
log_handler = cls._get_existing_logger()
if provider_sess and log_group and request.hookTypeName:
if log_handler:
# This is a re-used lambda container, log handler is already setup, so
# we just refresh the client with new creds
log_handler.client = provider_sess.client("logs")
return
# filter provider messages from platform
provider = request.hookTypeName.replace("::", "_").lower()
logging.getLogger().handlers[0].addFilter(ProviderFilter(provider))
log_handler = cls(
group=log_group, stream=stream_name, session=provider_sess
)
if log_format:
log_handler.setFormatter(log_format)
# add log handler to root, so that provider gets plugin logs too
logging.getLogger().addHandler(log_handler)