Skip to content

Commit

Permalink
fixup mypy
Browse files Browse the repository at this point in the history
  • Loading branch information
ryandeivert committed Sep 24, 2022
1 parent ba6aa66 commit e556593
Showing 1 changed file with 18 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -6,41 +6,46 @@


class KinesisFirehoseRecordMetadata(DictWrapper):
@property
def _metadata(self) -> dict:
"""Optional: metadata associated with this record; present only when Kinesis Stream is source"""
return self["kinesisRecordMetadata"] # could raise KeyError

@property
def shard_id(self) -> Optional[str]:
"""Kinesis stream shard ID; present only when Kinesis Stream is source"""
return self.get("shardId")
return self._metadata.get("shardId")

@property
def partition_key(self) -> Optional[str]
def partition_key(self) -> Optional[str]:
"""Kinesis stream partition key; present only when Kinesis Stream is source"""
return self.get("partitionKey")
return self._metadata.get("partitionKey")

@property
def approximate_arrival_timestamp(self) -> Optional[str]
def approximate_arrival_timestamp(self) -> Optional[str]:
"""Kinesis stream approximate arrival ISO timestamp; present only when Kinesis Stream is source"""
return self.get("approximateArrivalTimestamp")
return self._metadata.get("approximateArrivalTimestamp")

@property
def sequence_number(self) -> Optional[str]
def sequence_number(self) -> Optional[str]:
"""Kinesis stream sequence number; present only when Kinesis Stream is source"""
return self.get("sequenceNumber")
return self._metadata.get("sequenceNumber")

@property
def subsequence_number(self) -> Optional[str]
def subsequence_number(self) -> Optional[str]:
"""Kinesis stream sub-sequence number; present only when Kinesis Stream is source
Note: this will only be present for Kinesis streams using record aggregation
"""
return self.get("subsequenceNumber")
return self._metadata.get("subsequenceNumber")


class KinesisFirehoseRecord(DictWrapper):
@property
def approximate_arrival_timestamp(self) -> float:
"""The approximate time that the record was inserted into the delivery stream"""
return float(self["approximateArrivalTimestamp"])

@property
def record_id(self) -> str:
"""Record ID; uniquely identifies this record within the current batch"""
Expand All @@ -52,9 +57,9 @@ def data(self) -> str:
return self["data"]

@property
def metadata(self) -> Optional[KinesisFirehoseRecordMetadata]:
def metadata(self) -> KinesisFirehoseRecordMetadata:
"""Optional: metadata associated with this record; present only when Kinesis Stream is source"""
return KinesisFirehoseRecordMetadata(self.get('kinesisRecordMetadata', {}))
return KinesisFirehoseRecordMetadata(self._data)

@property
def data_as_bytes(self) -> bytes:
Expand Down

0 comments on commit e556593

Please sign in to comment.