Skip to content

Commit

Permalink
Kinesis: fix maxBytesPerSecond throttling (#3035)
Browse files Browse the repository at this point in the history
  • Loading branch information
jtjeferreira authored Nov 22, 2023
1 parent e257852 commit d19f621
Show file tree
Hide file tree
Showing 2 changed files with 11 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -104,8 +104,8 @@ object KinesisFlow {
)(result: PutRecordsResponse): List[(PutRecordsResultEntry, T)] =
result.records.asScala.toList.zip(entries).map { case (res, (_, t)) => (res, t) }

private def getPayloadByteSize[T](record: (PutRecordsRequestEntry, T)): Int = record match {
case (request, _) => request.partitionKey.length + request.data.asByteBuffer.position()
private[kinesis] def getPayloadByteSize[T](record: (PutRecordsRequestEntry, T)): Int = record match {
case (request, _) => request.partitionKey.length + request.data.asByteArrayUnsafe.length
}

def byPartitionAndData(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,15 @@ class KinesisFlowSpec extends AnyWordSpec with Matchers with KinesisMock with Lo
sinkProbe.expectError(FailurePublishingRecords(requestError))
}
}

"compute payload size" in {
val r = PutRecordsRequestEntry
.builder()
.partitionKey("")
.data(SdkBytes.fromByteBuffer(ByteString("data").asByteBuffer))
.build()
KinesisFlow.getPayloadByteSize((r, "")) shouldBe 4
}
}

"KinesisFlowWithUserContext" must {
Expand Down

0 comments on commit d19f621

Please sign in to comment.