Skip to content

Commit

Permalink
fix: use put_records for kinesis due to rate exceeded
Browse files Browse the repository at this point in the history
  • Loading branch information
RanbirAulakh committed Sep 18, 2024
1 parent bc3cdf7 commit 9385478
Show file tree
Hide file tree
Showing 2 changed files with 69 additions and 13 deletions.
72 changes: 64 additions & 8 deletions src/aws/osml/model_runner/sink/kinesis_sink.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,12 @@

import logging
import sys
from random import randint
from typing import List, Optional

import boto3
import geojson
from geojson import Feature, FeatureCollection
from geojson import Feature

from aws.osml.model_runner.api import SinkMode, SinkType
from aws.osml.model_runner.app_config import BotoConfig, ServiceConfig
Expand Down Expand Up @@ -42,19 +43,74 @@ def __init__(
self.kinesis_client = boto3.client("kinesis", config=BotoConfig.default)

def _flush_stream(self, partition_key: str, features: List[Feature]) -> None:
record = geojson.dumps(FeatureCollection(features))
self.kinesis_client.put_record(
StreamName=self.stream,
PartitionKey=partition_key,
Data=record,
)
"""
Flushes the provided features to the Kinesis stream in batches.
This method constructs records from the provided features and sends them to
the specified Kinesis stream. It batches records to ensure compliance with
Kinesis limits / throttling, including:
- A maximum of 500 records per request.
- Each record can be up to 1 MB in size.
- The total size of all records in a request must not exceed 5 MB.
:param partition_key: The partition key used to group related records.
:param features: A list of feature objects to be sent to the Kinesis stream.
"""

records = []
max_records_per_batch = 500
current_size = 0

for feature in features:
data = geojson.dumps(feature)
data_size = len(data)

# Generate a valid ExplicitHashKey as a random integer within the acceptable range
explicit_hash_key = str(randint(1, 2**31 - 1))

# Create the record
record = {"Data": data, "PartitionKey": partition_key, "ExplicitHashKey": explicit_hash_key}

# Check if adding this record would exceed the limits
if (len(records) >= max_records_per_batch) or (
current_size + data_size > 5242880
): # 5 MB limit due to Put_Records limitation
self.kinesis_client.put_records(
StreamName=self.stream,
Records=records,
)

# Clear the records and reset size
records = []
current_size = 0

# Append the record
records.append(record)
current_size += data_size

# Flush any remaining records
if records:
self.kinesis_client.put_records(
StreamName=self.stream,
Records=records,
)

@property
def mode(self) -> SinkMode:
# Only aggregate mode is supported at the moment
return SinkMode.AGGREGATE

def write(self, job_id: str, features: List[Feature]) -> bool:
"""
Writes a list of GeoJSON features to the Kinesis stream in batches. The batch size and record size
are checked before sending data. Features are aggregated, and the remaining data is flushed at the end.
This function also validates the Kinesis stream before attempting to send data.
:param job_id: The unique job identifier used as the partition key for Kinesis.
:param features: A list of GeoJSON Feature objects to be written to the Kinesis stream.
:return: True if writing to the stream succeeded, False if it failed.
"""
pending_features: List[Feature] = []
pending_features_size: int = 0

Expand All @@ -66,7 +122,7 @@ def write(self, job_id: str, features: List[Feature]) -> bool:
feature_size = sys.getsizeof(geojson.dumps(feature))
if (
self.batch_size and pending_features and len(pending_features) % self.batch_size == 0
) or pending_features_size + feature_size > (int(ServiceConfig.kinesis_max_record_size)):
) or pending_features_size + feature_size > int(ServiceConfig.kinesis_max_record_size):
self._flush_stream(job_id, pending_features)
pending_features = []
pending_features_size = 0
Expand Down
10 changes: 5 additions & 5 deletions test/aws/osml/model_runner/sink/test_kinesis_sink.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ def test_write_features_default_credentials(self):
},
)
kinesis_client_stub.add_response(
"put_record",
"put_records",
MOCK_KINESIS_RESPONSE,
{
"StreamName": TEST_RESULTS_STREAM,
Expand All @@ -125,7 +125,7 @@ def test_write_features_batch_size_one(self):
)
for index, feature in enumerate(self.test_feature_list):
kinesis_client_stub.add_response(
"put_record",
"put_records",
{"ShardId": "shardId-000000000000", "SequenceNumber": str(index)},
{
"StreamName": TEST_RESULTS_STREAM,
Expand Down Expand Up @@ -155,7 +155,7 @@ def test_write_batch_size_three(self):
)

kinesis_client_stub.add_response(
"put_record",
"put_records",
MOCK_KINESIS_RESPONSE,
{
"StreamName": TEST_RESULTS_STREAM,
Expand All @@ -164,7 +164,7 @@ def test_write_batch_size_three(self):
},
)
kinesis_client_stub.add_response(
"put_record",
"put_records",
MOCK_KINESIS_RESPONSE,
{
"StreamName": TEST_RESULTS_STREAM,
Expand All @@ -191,7 +191,7 @@ def test_write_oversized_record(self):
)

kinesis_client_stub.add_client_error(
"put_record",
"put_records",
service_error_code="ValidationException",
service_message="""An error occurred (ValidationException) when calling the PutRecord
operation: 1 validation error detected: Value at 'data' failed to satisfy constraint:
Expand Down

0 comments on commit 9385478

Please sign in to comment.