-
Notifications
You must be signed in to change notification settings - Fork 406
/
Copy pathkinesis_firehose_event.py
308 lines (241 loc) · 10.5 KB
/
kinesis_firehose_event.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
from __future__ import annotations
import base64
import json
import warnings
from dataclasses import dataclass, field
from functools import cached_property
from typing import TYPE_CHECKING, Any, Callable, ClassVar, Iterator
from aws_lambda_powertools.utilities.data_classes.common import DictWrapper
if TYPE_CHECKING:
from typing_extensions import Literal
@dataclass(repr=False, order=False, frozen=True)
class KinesisFirehoseDataTransformationRecordMetadata:
"""
Metadata in Firehose Data Transform Record.
Parameters
----------
partition_keys: dict[str, str]
A dict of partition keys/value in string format, e.g. `{"year":"2023","month":"09"}`
Documentation:
--------------
- https://docs.aws.amazon.com/firehose/latest/dev/dynamic-partitioning.html
"""
partition_keys: dict[str, str] = field(default_factory=lambda: {})
def asdict(self) -> dict:
if self.partition_keys is not None:
return {"partitionKeys": self.partition_keys}
return {}
@dataclass(repr=False, order=False)
class KinesisFirehoseDataTransformationRecord:
"""Record in Kinesis Data Firehose response object.
Parameters
----------
record_id: str
uniquely identifies this record within the current batch
result: Literal["Ok", "Dropped", "ProcessingFailed"]
record data transformation status, whether it succeeded, should be dropped, or failed.
data: str
base64-encoded payload, by default empty string.
Use `data_from_text` or `data_from_json` methods to convert data if needed.
metadata: KinesisFirehoseDataTransformationRecordMetadata | None
Metadata associated with this record; can contain partition keys.
See: https://docs.aws.amazon.com/firehose/latest/dev/dynamic-partitioning.html
json_serializer: Callable
function to serialize `obj` to a JSON formatted `str`, by default json.dumps
json_deserializer: Callable
function to deserialize `str`, `bytes`, bytearray` containing a JSON document to a Python `obj`,
by default json.loads
Documentation:
--------------
- https://docs.aws.amazon.com/firehose/latest/dev/data-transformation.html
"""
_valid_result_types: ClassVar[tuple[str, str, str]] = ("Ok", "Dropped", "ProcessingFailed")
record_id: str
result: Literal["Ok", "Dropped", "ProcessingFailed"] = "Ok"
data: str = ""
metadata: KinesisFirehoseDataTransformationRecordMetadata | None = None
json_serializer: Callable = json.dumps
json_deserializer: Callable = json.loads
def asdict(self) -> dict:
if self.result not in self._valid_result_types:
warnings.warn(
stacklevel=1,
message=f'The result "{self.result}" is not valid, Choose from "Ok", "Dropped", "ProcessingFailed"',
)
record: dict[str, Any] = {
"recordId": self.record_id,
"result": self.result,
"data": self.data,
}
if self.metadata:
record["metadata"] = self.metadata.asdict()
return record
@property
def data_as_bytes(self) -> bytes:
"""Decoded base64-encoded data as bytes"""
if not self.data:
return b""
return base64.b64decode(self.data)
@property
def data_as_text(self) -> str:
"""Decoded base64-encoded data as text"""
if not self.data:
return ""
return self.data_as_bytes.decode("utf-8")
@cached_property
def data_as_json(self) -> dict:
"""Decoded base64-encoded data loaded to json"""
if not self.data:
return {}
return self.json_deserializer(self.data_as_text)
@dataclass(repr=False, order=False)
class KinesisFirehoseDataTransformationResponse:
"""Kinesis Data Firehose response object
Documentation:
--------------
- https://docs.aws.amazon.com/firehose/latest/dev/data-transformation.html
Parameters
----------
records : list[KinesisFirehoseResponseRecord]
records of Kinesis Data Firehose response object,
optional parameter at start. can be added later using `add_record` function.
Examples
--------
**Transforming data records**
```python
from aws_lambda_powertools.utilities.data_classes import (
KinesisFirehoseDataTransformationRecord,
KinesisFirehoseDataTransformationResponse,
KinesisFirehoseEvent,
)
from aws_lambda_powertools.utilities.serialization import base64_from_json
from aws_lambda_powertools.utilities.typing import LambdaContext
def lambda_handler(event: dict, context: LambdaContext):
firehose_event = KinesisFirehoseEvent(event)
result = KinesisFirehoseDataTransformationResponse()
for record in firehose_event.records:
payload = record.data_as_text # base64 decoded data as str
## generate data to return
transformed_data = {"tool_used": "powertools_dataclass", "original_payload": payload}
processed_record = KinesisFirehoseDataTransformationRecord(
record_id=record.record_id,
data=base64_from_json(transformed_data),
)
result.add_record(processed_record)
# return transformed records
return result.asdict()
```
"""
records: list[KinesisFirehoseDataTransformationRecord] = field(default_factory=list)
def add_record(self, record: KinesisFirehoseDataTransformationRecord):
self.records.append(record)
def asdict(self) -> dict:
if not self.records:
raise ValueError("Amazon Kinesis Data Firehose doesn't accept empty response")
return {"records": [record.asdict() for record in self.records]}
class KinesisFirehoseRecordMetadata(DictWrapper):
@property
def _metadata(self) -> dict:
"""Optional: metadata associated with this record; present only when Kinesis Stream is source"""
return self["kinesisRecordMetadata"] # could raise KeyError
@property
def shard_id(self) -> str:
"""Kinesis stream shard ID; present only when Kinesis Stream is source"""
return self._metadata["shardId"]
@property
def partition_key(self) -> str:
"""Kinesis stream partition key; present only when Kinesis Stream is source"""
return self._metadata["partitionKey"]
@property
def approximate_arrival_timestamp(self) -> int:
"""Kinesis stream approximate arrival ISO timestamp; present only when Kinesis Stream is source"""
return self._metadata["approximateArrivalTimestamp"]
@property
def sequence_number(self) -> str:
"""Kinesis stream sequence number; present only when Kinesis Stream is source"""
return self._metadata["sequenceNumber"]
@property
def subsequence_number(self) -> int:
"""Kinesis stream sub-sequence number; present only when Kinesis Stream is source
Note: this will only be present for Kinesis streams using record aggregation
"""
return self._metadata["subsequenceNumber"]
class KinesisFirehoseRecord(DictWrapper):
@property
def approximate_arrival_timestamp(self) -> int:
"""The approximate time that the record was inserted into the delivery stream"""
return self["approximateArrivalTimestamp"]
@property
def record_id(self) -> str:
"""Record ID; uniquely identifies this record within the current batch"""
return self["recordId"]
@property
def data(self) -> str:
"""The data blob, base64-encoded"""
return self["data"]
@property
def metadata(self) -> KinesisFirehoseRecordMetadata | None:
"""Optional: metadata associated with this record; present only when Kinesis Stream is source"""
return KinesisFirehoseRecordMetadata(self._data) if self.get("kinesisRecordMetadata") else None
@property
def data_as_bytes(self) -> bytes:
"""Decoded base64-encoded data as bytes"""
return base64.b64decode(self.data)
@property
def data_as_text(self) -> str:
"""Decoded base64-encoded data as text"""
return self.data_as_bytes.decode("utf-8")
@cached_property
def data_as_json(self) -> dict:
"""Decoded base64-encoded data loaded to json"""
return self._json_deserializer(self.data_as_text)
def build_data_transformation_response(
self,
result: Literal["Ok", "Dropped", "ProcessingFailed"] = "Ok",
data: str = "",
metadata: KinesisFirehoseDataTransformationRecordMetadata | None = None,
) -> KinesisFirehoseDataTransformationRecord:
"""Create a KinesisFirehoseResponseRecord directly using the record_id and given values
Parameters
----------
result : Literal["Ok", "Dropped", "ProcessingFailed"]
processing result, supported value: Ok, Dropped, ProcessingFailed
data : str, optional
data blob, base64-encoded, optional at init. Allows pass in base64-encoded data directly or
use either function like `data_from_text`, `data_from_json` to populate data
metadata: KinesisFirehoseResponseRecordMetadata, optional
Metadata associated with this record; can contain partition keys
- https://docs.aws.amazon.com/firehose/latest/dev/dynamic-partitioning.html
"""
return KinesisFirehoseDataTransformationRecord(
record_id=self.record_id,
result=result,
data=data,
metadata=metadata,
)
class KinesisFirehoseEvent(DictWrapper):
"""Kinesis Data Firehose event
Documentation:
--------------
- https://docs.aws.amazon.com/lambda/latest/dg/services-kinesisfirehose.html
"""
@property
def invocation_id(self) -> str:
"""Unique ID for for Lambda invocation"""
return self["invocationId"]
@property
def delivery_stream_arn(self) -> str:
"""ARN of the Firehose Data Firehose Delivery Stream"""
return self["deliveryStreamArn"]
@property
def source_kinesis_stream_arn(self) -> str | None:
"""ARN of the Kinesis Stream; present only when Kinesis Stream is source"""
return self.get("sourceKinesisStreamArn")
@property
def region(self) -> str:
"""AWS region where the event originated eg: us-east-1"""
return self["region"]
@property
def records(self) -> Iterator[KinesisFirehoseRecord]:
for record in self["records"]:
yield KinesisFirehoseRecord(data=record, json_deserializer=self._json_deserializer)