Skip to content

Commit

Permalink
fix: store topicNameSize as initialBatchedBytes in MessagesBatch
Browse files Browse the repository at this point in the history
  • Loading branch information
sjvanrossum committed Oct 11, 2024
1 parent e5a644b commit fad52c0
Showing 1 changed file with 7 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -313,7 +313,7 @@ public ApiFuture<String> publish(PubsubMessage message) {
}
MessagesBatch messagesBatch = messagesBatches.get(orderingKey);
if (messagesBatch == null) {
messagesBatch = new MessagesBatch(batchingSettings, orderingKey);
messagesBatch = new MessagesBatch(batchingSettings, topicNameSize, orderingKey);
messagesBatches.put(orderingKey, messagesBatch);
}

Expand Down Expand Up @@ -642,8 +642,7 @@ private static final class OutstandingPublish {
this.messageWrapper = messageWrapper;
this.messageSize =
CodedOutputStream.computeMessageSize(
PublishRequest.MESSAGES_FIELD_NUMBER,
messageWrapper.getPubsubMessage());
PublishRequest.MESSAGES_FIELD_NUMBER, messageWrapper.getPubsubMessage());
}
}

Expand Down Expand Up @@ -1100,12 +1099,15 @@ void release(long messageSize) {

private class MessagesBatch {
private List<OutstandingPublish> messages;
private int initialBatchedBytes;
private int batchedBytes;
private String orderingKey;
private final BatchingSettings batchingSettings;

private MessagesBatch(BatchingSettings batchingSettings, String orderingKey) {
private MessagesBatch(
BatchingSettings batchingSettings, int initialBatchedBytes, String orderingKey) {
this.batchingSettings = batchingSettings;
this.initialBatchedBytes = initialBatchedBytes;
this.orderingKey = orderingKey;
reset();
}
Expand All @@ -1118,7 +1120,7 @@ private OutstandingBatch popOutstandingBatch() {

private void reset() {
messages = new LinkedList<>();
batchedBytes = topicNameSize;
batchedBytes = initialBatchedBytes;
}

private boolean isEmpty() {
Expand Down

0 comments on commit fad52c0

Please sign in to comment.