diff --git a/aws_lambda_powertools/utilities/data_classes/__init__.py b/aws_lambda_powertools/utilities/data_classes/__init__.py index 8ed77f9f3a3..2aa2021ed1e 100644 --- a/aws_lambda_powertools/utilities/data_classes/__init__.py +++ b/aws_lambda_powertools/utilities/data_classes/__init__.py @@ -13,6 +13,7 @@ from .event_bridge_event import EventBridgeEvent from .event_source import event_source from .kafka_event import KafkaEvent +from .kinesis_firehose_event import KinesisFirehoseEvent from .kinesis_stream_event import KinesisStreamEvent from .lambda_function_url_event import LambdaFunctionUrlEvent from .s3_event import S3Event @@ -32,6 +33,7 @@ "DynamoDBStreamEvent", "EventBridgeEvent", "KafkaEvent", + "KinesisFirehoseEvent", "KinesisStreamEvent", "LambdaFunctionUrlEvent", "S3Event", diff --git a/aws_lambda_powertools/utilities/data_classes/kinesis_firehose_event.py b/aws_lambda_powertools/utilities/data_classes/kinesis_firehose_event.py new file mode 100644 index 00000000000..5683902f9d0 --- /dev/null +++ b/aws_lambda_powertools/utilities/data_classes/kinesis_firehose_event.py @@ -0,0 +1,113 @@ +import base64 +import json +from typing import Iterator, Optional + +from aws_lambda_powertools.utilities.data_classes.common import DictWrapper + + +class KinesisFirehoseRecordMetadata(DictWrapper): + @property + def _metadata(self) -> dict: + """Optional: metadata associated with this record; present only when Kinesis Stream is source""" + return self["kinesisRecordMetadata"] # could raise KeyError + + @property + def shard_id(self) -> str: + """Kinesis stream shard ID; present only when Kinesis Stream is source""" + return self._metadata["shardId"] + + @property + def partition_key(self) -> str: + """Kinesis stream partition key; present only when Kinesis Stream is source""" + return self._metadata["partitionKey"] + + @property + def approximate_arrival_timestamp(self) -> int: + """Kinesis stream approximate arrival ISO timestamp; present only when Kinesis Stream is source""" + return self._metadata["approximateArrivalTimestamp"] + + @property + def sequence_number(self) -> str: + """Kinesis stream sequence number; present only when Kinesis Stream is source""" + return self._metadata["sequenceNumber"] + + @property + def subsequence_number(self) -> str: + """Kinesis stream sub-sequence number; present only when Kinesis Stream is source + + Note: this will only be present for Kinesis streams using record aggregation + """ + return self._metadata["subsequenceNumber"] + + +class KinesisFirehoseRecord(DictWrapper): + @property + def approximate_arrival_timestamp(self) -> int: + """The approximate time that the record was inserted into the delivery stream""" + return self["approximateArrivalTimestamp"] + + @property + def record_id(self) -> str: + """Record ID; uniquely identifies this record within the current batch""" + return self["recordId"] + + @property + def data(self) -> str: + """The data blob, base64-encoded""" + return self["data"] + + @property + def metadata(self) -> Optional[KinesisFirehoseRecordMetadata]: + """Optional: metadata associated with this record; present only when Kinesis Stream is source""" + return KinesisFirehoseRecordMetadata(self._data) if self.get("kinesisRecordMetadata") else None + + @property + def data_as_bytes(self) -> bytes: + """Decoded base64-encoded data as bytes""" + return base64.b64decode(self.data) + + @property + def data_as_text(self) -> str: + """Decoded base64-encoded data as text""" + return self.data_as_bytes.decode("utf-8") + + @property + def data_as_json(self) -> dict: + """Decoded base64-encoded data loaded to json""" + if self._json_data is None: + self._json_data = json.loads(self.data_as_text) + return self._json_data + + +class KinesisFirehoseEvent(DictWrapper): + """Kinesis Data Firehose event + + Documentation: + -------------- + - https://docs.aws.amazon.com/lambda/latest/dg/services-kinesisfirehose.html + """ + + @property + def invocation_id(self) -> str: + """Unique ID for for Lambda invocation""" + return self["invocationId"] + + @property + def delivery_stream_arn(self) -> str: + """ARN of the Firehose Data Firehose Delivery Stream""" + return self["deliveryStreamArn"] + + @property + def source_kinesis_stream_arn(self) -> Optional[str]: + """ARN of the Kinesis Stream; present only when Kinesis Stream is source""" + return self.get("sourceKinesisStreamArn") + + @property + def region(self) -> str: + """AWS region where the event originated eg: us-east-1""" + return self["region"] + + @property + def records(self) -> Iterator[KinesisFirehoseRecord]: + for record in self["records"]: + yield KinesisFirehoseRecord(record) diff --git a/docs/utilities/data_classes.md b/docs/utilities/data_classes.md index 67d821fe04f..509110e0480 100644 --- a/docs/utilities/data_classes.md +++ b/docs/utilities/data_classes.md @@ -77,6 +77,7 @@ Event Source | Data_class [EventBridge](#eventbridge) | `EventBridgeEvent` [Kafka](#kafka) | `KafkaEvent` [Kinesis Data Stream](#kinesis-streams) | `KinesisStreamEvent` +[Kinesis Firehose Delivery Stream](#kinesis-firehose-delivery-stream) | `KinesisFirehoseEvent` [Lambda Function URL](#lambda-function-url) | `LambdaFunctionUrlEvent` [Rabbit MQ](#rabbit-mq) | `RabbitMQEvent` [S3](#s3) | `S3Event` @@ -892,6 +893,20 @@ or plain text, depending on the original payload. do_something_with(data) ``` +### Kinesis Firehose delivery stream + +Kinesis Firehose Data Transformation can use a Lambda Function to modify the records +inline, and re-emit them back to the Delivery Stream. + +Similar to Kinesis Data Streams, the events contain base64 encoded data. You can use the helper +function to access the data either as json or plain text, depending on the original payload. + +=== "app.py" + + ```python + --8<-- "examples/event_sources/src/kinesis_firehose_delivery_stream.py" + ``` + ### Lambda Function URL === "app.py" diff --git a/examples/event_sources/src/kinesis_firehose_delivery_stream.py b/examples/event_sources/src/kinesis_firehose_delivery_stream.py new file mode 100644 index 00000000000..67bf53dfe06 --- /dev/null +++ b/examples/event_sources/src/kinesis_firehose_delivery_stream.py @@ -0,0 +1,25 @@ +import base64 +import json + +from aws_lambda_powertools.utilities.data_classes import KinesisFirehoseEvent, event_source +from aws_lambda_powertools.utilities.typing import LambdaContext + + +@event_source(data_class=KinesisFirehoseEvent) +def lambda_handler(event: KinesisFirehoseEvent, context: LambdaContext): + result = [] + + for record in event.records: + # if data was delivered as json; caches loaded value + data = record.data_as_json + + processed_record = { + "recordId": record.record_id, + "data": base64.b64encode(json.dumps(data).encode("utf-8")), + "result": "Ok", + } + + result.append(processed_record) + + # return transformed records + return {"records": result} diff --git a/tests/events/kinesisFirehosePutEvent.json b/tests/events/kinesisFirehosePutEvent.json index 27aeddd80eb..f3e07190710 100644 --- a/tests/events/kinesisFirehosePutEvent.json +++ b/tests/events/kinesisFirehosePutEvent.json @@ -2,16 +2,16 @@ "invocationId": "2b4d1ad9-2f48-94bd-a088-767c317e994a", "deliveryStreamArn": "arn:aws:firehose:us-east-2:123456789012:deliverystream/delivery-stream-name", "region": "us-east-2", - "records":[ + "records": [ { - "recordId":"record1", - "approximateArrivalTimestamp":1664029185290, - "data":"SGVsbG8gV29ybGQ=" + "recordId": "record1", + "approximateArrivalTimestamp": 1664029185290, + "data": "SGVsbG8gV29ybGQ=" }, { - "recordId":"record2", - "approximateArrivalTimestamp":1664029186945, - "data":"eyJIZWxsbyI6ICJXb3JsZCJ9" + "recordId": "record2", + "approximateArrivalTimestamp": 1664029186945, + "data": "eyJIZWxsbyI6ICJXb3JsZCJ9" } ] } diff --git a/tests/functional/test_data_classes.py b/tests/functional/test_data_classes.py index 1f8c0cef955..235a3f8f8da 100644 --- a/tests/functional/test_data_classes.py +++ b/tests/functional/test_data_classes.py @@ -18,6 +18,7 @@ CodePipelineJobEvent, EventBridgeEvent, KafkaEvent, + KinesisFirehoseEvent, KinesisStreamEvent, S3Event, SESEvent, @@ -1239,6 +1240,72 @@ def test_kafka_self_managed_event(): assert record.get_header_value("HeaderKey", case_sensitive=False) == b"headerValue" +def test_kinesis_firehose_kinesis_event(): + event = KinesisFirehoseEvent(load_event("kinesisFirehoseKinesisEvent.json")) + + assert event.region == "us-east-2" + assert event.invocation_id == "2b4d1ad9-2f48-94bd-a088-767c317e994a" + assert event.delivery_stream_arn == "arn:aws:firehose:us-east-2:123456789012:deliverystream/delivery-stream-name" + assert event.source_kinesis_stream_arn == "arn:aws:kinesis:us-east-1:123456789012:stream/kinesis-source" + + records = list(event.records) + assert len(records) == 2 + record_01, record_02 = records[:] + + assert record_01.approximate_arrival_timestamp == 1664028820148 + assert record_01.record_id == "record1" + assert record_01.data == "SGVsbG8gV29ybGQ=" + assert record_01.data_as_bytes == b"Hello World" + assert record_01.data_as_text == "Hello World" + + assert record_01.metadata.shard_id == "shardId-000000000000" + assert record_01.metadata.partition_key == "4d1ad2b9-24f8-4b9d-a088-76e9947c317a" + assert record_01.metadata.approximate_arrival_timestamp == 1664028820148 + assert record_01.metadata.sequence_number == "49546986683135544286507457936321625675700192471156785154" + assert record_01.metadata.subsequence_number == "" + + assert record_02.approximate_arrival_timestamp == 1664028793294 + assert record_02.record_id == "record2" + assert record_02.data == "eyJIZWxsbyI6ICJXb3JsZCJ9" + assert record_02.data_as_bytes == b'{"Hello": "World"}' + assert record_02.data_as_text == '{"Hello": "World"}' + assert record_02.data_as_json == {"Hello": "World"} + + assert record_02.metadata.shard_id == "shardId-000000000001" + assert record_02.metadata.partition_key == "4d1ad2b9-24f8-4b9d-a088-76e9947c318a" + assert record_02.metadata.approximate_arrival_timestamp == 1664028793294 + assert record_02.metadata.sequence_number == "49546986683135544286507457936321625675700192471156785155" + assert record_02.metadata.subsequence_number == "" + + +def test_kinesis_firehose_put_event(): + event = KinesisFirehoseEvent(load_event("kinesisFirehosePutEvent.json")) + + assert event.region == "us-east-2" + assert event.invocation_id == "2b4d1ad9-2f48-94bd-a088-767c317e994a" + assert event.delivery_stream_arn == "arn:aws:firehose:us-east-2:123456789012:deliverystream/delivery-stream-name" + assert event.source_kinesis_stream_arn is None + + records = list(event.records) + assert len(records) == 2 + record_01, record_02 = records[:] + + assert record_01.approximate_arrival_timestamp == 1664029185290 + assert record_01.record_id == "record1" + assert record_01.data == "SGVsbG8gV29ybGQ=" + assert record_01.data_as_bytes == b"Hello World" + assert record_01.data_as_text == "Hello World" + assert record_01.metadata is None + + assert record_02.approximate_arrival_timestamp == 1664029186945 + assert record_02.record_id == "record2" + assert record_02.data == "eyJIZWxsbyI6ICJXb3JsZCJ9" + assert record_02.data_as_bytes == b'{"Hello": "World"}' + assert record_02.data_as_text == '{"Hello": "World"}' + assert record_02.data_as_json == {"Hello": "World"} + assert record_02.metadata is None + + def test_kinesis_stream_event(): event = KinesisStreamEvent(load_event("kinesisStreamEvent.json"))