Skip to content

Commit

Permalink
[fix] [client] Fix Consumer should return configured batch receive ma…
Browse files Browse the repository at this point in the history
…x messages (apache#22619)

(cherry picked from commit 0219921)
(cherry picked from commit 29ee145)
  • Loading branch information
rdhabalia authored and nikhil-ctds committed May 15, 2024
1 parent f0f5448 commit bcfcb5c
Show file tree
Hide file tree
Showing 3 changed files with 37 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ public Object[][] batchReceivePolicyProvider() {
// Number of message limitation exceed receiverQueue size
{
BatchReceivePolicy.builder()
.maxNumMessages(70)
.maxNumMessages(50)
.build(), true, 50, false
},
// Number of message limitation exceed receiverQueue size and timeout limitation
Expand Down Expand Up @@ -147,7 +147,7 @@ public Object[][] batchReceivePolicyProvider() {
// Number of message limitation exceed receiverQueue size
{
BatchReceivePolicy.builder()
.maxNumMessages(70)
.maxNumMessages(50)
.build(), false, 50, false
},
// Number of message limitation exceed receiverQueue size and timeout limitation
Expand Down Expand Up @@ -248,7 +248,7 @@ public Object[][] batchReceivePolicyProvider() {
// Number of message limitation exceed receiverQueue size
{
BatchReceivePolicy.builder()
.maxNumMessages(70)
.maxNumMessages(50)
.build(), true, 50, true
},
// Number of message limitation exceed receiverQueue size and timeout limitation
Expand Down Expand Up @@ -283,7 +283,7 @@ public Object[][] batchReceivePolicyProvider() {
// Number of message limitation exceed receiverQueue size
{
BatchReceivePolicy.builder()
.maxNumMessages(70)
.maxNumMessages(50)
.build(), false, 50, true
},
// Number of message limitation exceed receiverQueue size and timeout limitation
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4825,6 +4825,35 @@ public void onSendAcknowledgement(Producer producer, Message message, MessageId
admin.topics().delete(topic, false);
}

/**
* It verifies that consumer receives configured number of messages into the batch.
* @throws Exception
*/
@Test
public void testBatchReceiveWithMaxBatchSize() throws Exception {
int maxBatchSize = 100;
final int internalQueueSize = 10;
final int maxBytes = 2000000;
final int timeOutInSeconds = 900;
final String topic = "persistent://my-property/my-ns/testBatchReceive";
BatchReceivePolicy batchReceivePolicy = BatchReceivePolicy.builder().maxNumBytes(maxBytes)
.maxNumMessages(maxBatchSize).timeout(timeOutInSeconds, TimeUnit.SECONDS).build();
@Cleanup
Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING).topic(topic)
.subscriptionName("my-subscriber-name")
.receiverQueueSize(internalQueueSize)
.batchReceivePolicy(batchReceivePolicy).subscribe();
@Cleanup
Producer<byte[]> producer = pulsarClient.newProducer().topic(topic).enableBatching(false).create();

final int numMessages = 100;
for (int i = 0; i < numMessages; i++) {
producer.newMessage().value(("value-" + i).getBytes(UTF_8)).eventTime((i + 1) * 100L).send();
}

assertEquals(consumer.batchReceive().size(), maxBatchSize);
}

private int compareMessageIds(MessageIdImpl messageId1, MessageIdImpl messageId2) {
if (messageId2.getLedgerId() < messageId1.getLedgerId()) {
return -1;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,10 @@ public CompletableFuture<Consumer<T>> subscribeAsync() {
return FutureUtil.failedFuture(
new InvalidConfigurationException("KeySharedPolicy must set with KeyShared subscription"));
}
if (conf.getBatchReceivePolicy() != null) {
conf.setReceiverQueueSize(
Math.max(conf.getBatchReceivePolicy().getMaxNumMessages(), conf.getReceiverQueueSize()));
}
CompletableFuture<Void> applyDLQConfig;
if (conf.isRetryEnable() && conf.getTopicNames().size() > 0) {
TopicName topicFirst = TopicName.get(conf.getTopicNames().iterator().next());
Expand Down

0 comments on commit bcfcb5c

Please sign in to comment.