diff --git a/aws_lambda_powertools/utilities/data_classes/kinesis_firehose_event.py b/aws_lambda_powertools/utilities/data_classes/kinesis_firehose_event.py index 3e296935bfd..4d5454cfc20 100644 --- a/aws_lambda_powertools/utilities/data_classes/kinesis_firehose_event.py +++ b/aws_lambda_powertools/utilities/data_classes/kinesis_firehose_event.py @@ -6,33 +6,38 @@ class KinesisFirehoseRecordMetadata(DictWrapper): + @property + def _metadata(self) -> dict: + """Optional: metadata associated with this record; present only when Kinesis Stream is source""" + return self["kinesisRecordMetadata"] # could raise KeyError + @property def shard_id(self) -> Optional[str]: """Kinesis stream shard ID; present only when Kinesis Stream is source""" - return self.get("shardId") + return self._metadata.get("shardId") @property - def partition_key(self) -> Optional[str] + def partition_key(self) -> Optional[str]: """Kinesis stream partition key; present only when Kinesis Stream is source""" - return self.get("partitionKey") + return self._metadata.get("partitionKey") @property - def approximate_arrival_timestamp(self) -> Optional[str] + def approximate_arrival_timestamp(self) -> Optional[str]: """Kinesis stream approximate arrival ISO timestamp; present only when Kinesis Stream is source""" - return self.get("approximateArrivalTimestamp") + return self._metadata.get("approximateArrivalTimestamp") @property - def sequence_number(self) -> Optional[str] + def sequence_number(self) -> Optional[str]: """Kinesis stream sequence number; present only when Kinesis Stream is source""" - return self.get("sequenceNumber") + return self._metadata.get("sequenceNumber") @property - def subsequence_number(self) -> Optional[str] + def subsequence_number(self) -> Optional[str]: """Kinesis stream sub-sequence number; present only when Kinesis Stream is source - + Note: this will only be present for Kinesis streams using record aggregation """ - return self.get("subsequenceNumber") + return self._metadata.get("subsequenceNumber") class KinesisFirehoseRecord(DictWrapper): @@ -40,7 +45,7 @@ class KinesisFirehoseRecord(DictWrapper): def approximate_arrival_timestamp(self) -> float: """The approximate time that the record was inserted into the delivery stream""" return float(self["approximateArrivalTimestamp"]) - + @property def record_id(self) -> str: """Record ID; uniquely identifies this record within the current batch""" @@ -52,9 +57,9 @@ def data(self) -> str: return self["data"] @property - def metadata(self) -> Optional[KinesisFirehoseRecordMetadata]: + def metadata(self) -> KinesisFirehoseRecordMetadata: """Optional: metadata associated with this record; present only when Kinesis Stream is source""" - return KinesisFirehoseRecordMetadata(self.get('kinesisRecordMetadata', {})) + return KinesisFirehoseRecordMetadata(self._data) @property def data_as_bytes(self) -> bytes: