Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(data_classes): add KinesisFirehoseEvent #1540

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions aws_lambda_powertools/utilities/data_classes/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
from .event_bridge_event import EventBridgeEvent
from .event_source import event_source
from .kafka_event import KafkaEvent
from .kinesis_firehose_event import KinesisFirehoseEvent
from .kinesis_stream_event import KinesisStreamEvent
from .lambda_function_url_event import LambdaFunctionUrlEvent
from .s3_event import S3Event
Expand All @@ -32,6 +33,7 @@
"DynamoDBStreamEvent",
"EventBridgeEvent",
"KafkaEvent",
"KinesisFirehoseEvent",
"KinesisStreamEvent",
"LambdaFunctionUrlEvent",
"S3Event",
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
import base64
import json
from typing import Iterator, Optional

from aws_lambda_powertools.utilities.data_classes.common import DictWrapper


class KinesisFirehoseRecordMetadata(DictWrapper):
@property
def _metadata(self) -> dict:
"""Optional: metadata associated with this record; present only when Kinesis Stream is source"""
return self["kinesisRecordMetadata"] # could raise KeyError

@property
def shard_id(self) -> str:
"""Kinesis stream shard ID; present only when Kinesis Stream is source"""
return self._metadata["shardId"]

@property
def partition_key(self) -> str:
"""Kinesis stream partition key; present only when Kinesis Stream is source"""
return self._metadata["partitionKey"]

@property
def approximate_arrival_timestamp(self) -> int:
"""Kinesis stream approximate arrival ISO timestamp; present only when Kinesis Stream is source"""
return self._metadata["approximateArrivalTimestamp"]

@property
def sequence_number(self) -> str:
"""Kinesis stream sequence number; present only when Kinesis Stream is source"""
return self._metadata["sequenceNumber"]

@property
def subsequence_number(self) -> str:
"""Kinesis stream sub-sequence number; present only when Kinesis Stream is source

Note: this will only be present for Kinesis streams using record aggregation
"""
return self._metadata["subsequenceNumber"]


class KinesisFirehoseRecord(DictWrapper):
@property
def approximate_arrival_timestamp(self) -> int:
"""The approximate time that the record was inserted into the delivery stream"""
return self["approximateArrivalTimestamp"]

@property
def record_id(self) -> str:
"""Record ID; uniquely identifies this record within the current batch"""
return self["recordId"]

@property
def data(self) -> str:
"""The data blob, base64-encoded"""
return self["data"]

@property
def metadata(self) -> Optional[KinesisFirehoseRecordMetadata]:
"""Optional: metadata associated with this record; present only when Kinesis Stream is source"""
return KinesisFirehoseRecordMetadata(self._data) if self.get("kinesisRecordMetadata") else None

@property
def data_as_bytes(self) -> bytes:
"""Decoded base64-encoded data as bytes"""
return base64.b64decode(self.data)

@property
def data_as_text(self) -> str:
"""Decoded base64-encoded data as text"""
return self.data_as_bytes.decode("utf-8")

@property
def data_as_json(self) -> dict:
"""Decoded base64-encoded data loaded to json"""
if self._json_data is None:
self._json_data = json.loads(self.data_as_text)
return self._json_data
Comment on lines +75 to +79
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we need to review this piece of code.
This is not your fault, we have this method/property as default in all classes and this can be a problem for records that allow non-json data like firehose put is and maybe others.. Check this out

image
image
image

We'll discuss this in our daily sync on Monday and I back here with updates.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

EAFP



class KinesisFirehoseEvent(DictWrapper):
"""Kinesis Data Firehose event

Documentation:
--------------
- https://docs.aws.amazon.com/lambda/latest/dg/services-kinesisfirehose.html
"""

@property
def invocation_id(self) -> str:
"""Unique ID for for Lambda invocation"""
return self["invocationId"]

@property
def delivery_stream_arn(self) -> str:
"""ARN of the Firehose Data Firehose Delivery Stream"""
return self["deliveryStreamArn"]

@property
def source_kinesis_stream_arn(self) -> Optional[str]:
"""ARN of the Kinesis Stream; present only when Kinesis Stream is source"""
return self.get("sourceKinesisStreamArn")

@property
def region(self) -> str:
"""AWS region where the event originated eg: us-east-1"""
return self["region"]

@property
def records(self) -> Iterator[KinesisFirehoseRecord]:
for record in self["records"]:
yield KinesisFirehoseRecord(record)
15 changes: 15 additions & 0 deletions docs/utilities/data_classes.md
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ Event Source | Data_class
[EventBridge](#eventbridge) | `EventBridgeEvent`
[Kafka](#kafka) | `KafkaEvent`
[Kinesis Data Stream](#kinesis-streams) | `KinesisStreamEvent`
[Kinesis Firehose Delivery Stream](#kinesis-firehose-delivery-stream) | `KinesisFirehoseEvent`
[Lambda Function URL](#lambda-function-url) | `LambdaFunctionUrlEvent`
[Rabbit MQ](#rabbit-mq) | `RabbitMQEvent`
[S3](#s3) | `S3Event`
Expand Down Expand Up @@ -892,6 +893,20 @@ or plain text, depending on the original payload.
do_something_with(data)
```

### Kinesis Firehose delivery stream

Kinesis Firehose Data Transformation can use a Lambda Function to modify the records
inline, and re-emit them back to the Delivery Stream.

Similar to Kinesis Data Streams, the events contain base64 encoded data. You can use the helper
function to access the data either as json or plain text, depending on the original payload.

=== "app.py"

```python
--8<-- "examples/event_sources/src/kinesis_firehose_delivery_stream.py"
```

### Lambda Function URL

=== "app.py"
Expand Down
25 changes: 25 additions & 0 deletions examples/event_sources/src/kinesis_firehose_delivery_stream.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
import base64
import json

from aws_lambda_powertools.utilities.data_classes import KinesisFirehoseEvent, event_source
from aws_lambda_powertools.utilities.typing import LambdaContext


@event_source(data_class=KinesisFirehoseEvent)
def lambda_handler(event: KinesisFirehoseEvent, context: LambdaContext):
result = []

for record in event.records:
# if data was delivered as json; caches loaded value
data = record.data_as_json

processed_record = {
"recordId": record.record_id,
"data": base64.b64encode(json.dumps(data).encode("utf-8")),
"result": "Ok",
}

result.append(processed_record)

# return transformed records
return {"records": result}
14 changes: 7 additions & 7 deletions tests/events/kinesisFirehosePutEvent.json
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,16 @@
"invocationId": "2b4d1ad9-2f48-94bd-a088-767c317e994a",
"deliveryStreamArn": "arn:aws:firehose:us-east-2:123456789012:deliverystream/delivery-stream-name",
"region": "us-east-2",
"records":[
"records": [
{
"recordId":"record1",
"approximateArrivalTimestamp":1664029185290,
"data":"SGVsbG8gV29ybGQ="
"recordId": "record1",
"approximateArrivalTimestamp": 1664029185290,
"data": "SGVsbG8gV29ybGQ="
},
{
"recordId":"record2",
"approximateArrivalTimestamp":1664029186945,
"data":"eyJIZWxsbyI6ICJXb3JsZCJ9"
"recordId": "record2",
"approximateArrivalTimestamp": 1664029186945,
"data": "eyJIZWxsbyI6ICJXb3JsZCJ9"
}
]
}
67 changes: 67 additions & 0 deletions tests/functional/test_data_classes.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
CodePipelineJobEvent,
EventBridgeEvent,
KafkaEvent,
KinesisFirehoseEvent,
KinesisStreamEvent,
S3Event,
SESEvent,
Expand Down Expand Up @@ -1239,6 +1240,72 @@ def test_kafka_self_managed_event():
assert record.get_header_value("HeaderKey", case_sensitive=False) == b"headerValue"


def test_kinesis_firehose_kinesis_event():
event = KinesisFirehoseEvent(load_event("kinesisFirehoseKinesisEvent.json"))

assert event.region == "us-east-2"
assert event.invocation_id == "2b4d1ad9-2f48-94bd-a088-767c317e994a"
assert event.delivery_stream_arn == "arn:aws:firehose:us-east-2:123456789012:deliverystream/delivery-stream-name"
assert event.source_kinesis_stream_arn == "arn:aws:kinesis:us-east-1:123456789012:stream/kinesis-source"

records = list(event.records)
assert len(records) == 2
record_01, record_02 = records[:]

assert record_01.approximate_arrival_timestamp == 1664028820148
assert record_01.record_id == "record1"
assert record_01.data == "SGVsbG8gV29ybGQ="
assert record_01.data_as_bytes == b"Hello World"
assert record_01.data_as_text == "Hello World"

assert record_01.metadata.shard_id == "shardId-000000000000"
assert record_01.metadata.partition_key == "4d1ad2b9-24f8-4b9d-a088-76e9947c317a"
assert record_01.metadata.approximate_arrival_timestamp == 1664028820148
assert record_01.metadata.sequence_number == "49546986683135544286507457936321625675700192471156785154"
assert record_01.metadata.subsequence_number == ""

assert record_02.approximate_arrival_timestamp == 1664028793294
assert record_02.record_id == "record2"
assert record_02.data == "eyJIZWxsbyI6ICJXb3JsZCJ9"
assert record_02.data_as_bytes == b'{"Hello": "World"}'
assert record_02.data_as_text == '{"Hello": "World"}'
assert record_02.data_as_json == {"Hello": "World"}

assert record_02.metadata.shard_id == "shardId-000000000001"
assert record_02.metadata.partition_key == "4d1ad2b9-24f8-4b9d-a088-76e9947c318a"
assert record_02.metadata.approximate_arrival_timestamp == 1664028793294
assert record_02.metadata.sequence_number == "49546986683135544286507457936321625675700192471156785155"
assert record_02.metadata.subsequence_number == ""


def test_kinesis_firehose_put_event():
event = KinesisFirehoseEvent(load_event("kinesisFirehosePutEvent.json"))

assert event.region == "us-east-2"
assert event.invocation_id == "2b4d1ad9-2f48-94bd-a088-767c317e994a"
assert event.delivery_stream_arn == "arn:aws:firehose:us-east-2:123456789012:deliverystream/delivery-stream-name"
assert event.source_kinesis_stream_arn is None

records = list(event.records)
assert len(records) == 2
record_01, record_02 = records[:]

assert record_01.approximate_arrival_timestamp == 1664029185290
assert record_01.record_id == "record1"
assert record_01.data == "SGVsbG8gV29ybGQ="
assert record_01.data_as_bytes == b"Hello World"
assert record_01.data_as_text == "Hello World"
assert record_01.metadata is None

assert record_02.approximate_arrival_timestamp == 1664029186945
assert record_02.record_id == "record2"
assert record_02.data == "eyJIZWxsbyI6ICJXb3JsZCJ9"
assert record_02.data_as_bytes == b'{"Hello": "World"}'
assert record_02.data_as_text == '{"Hello": "World"}'
assert record_02.data_as_json == {"Hello": "World"}
assert record_02.metadata is None


def test_kinesis_stream_event():
event = KinesisStreamEvent(load_event("kinesisStreamEvent.json"))

Expand Down