Skip to content

Commit

Permalink
[improve][client] PIP-393: Support configuring NegativeAckPrecisionBi…
Browse files Browse the repository at this point in the history
…tCnt while building consumer. (apache#23804)
  • Loading branch information
thetumbled authored and poorbarcode committed Jan 23, 2025
1 parent 53fab1c commit ec1e2c7
Show file tree
Hide file tree
Showing 3 changed files with 67 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -545,4 +545,51 @@ public void testMultiTopicConsumerConcurrentRedeliverAndReceive() throws Excepti
consumer.close();
admin.topics().deletePartitionedTopic("persistent://public/default/" + topic);
}

@DataProvider(name = "negativeAckPrecisionBitCnt")
public Object[][] negativeAckPrecisionBitCnt() {
return new Object[][]{
{1}, {2}, {3}, {4}, {5}, {6}, {7}, {8}, {9}, {10}, {11}, {12}
};
}

/**
* When negativeAckPrecisionBitCnt is greater than 0, the lower bits of the redelivery time will be truncated
* to reduce the memory occupation. If set to k, the redelivery time will be bucketed by 2^k ms, resulting in
* the redelivery time could be earlier(no later) than the expected time no more than 2^k ms.
* @throws Exception if an error occurs
*/
@Test(dataProvider = "negativeAckPrecisionBitCnt")
public void testConfigureNegativeAckPrecisionBitCnt(int negativeAckPrecisionBitCnt) throws Exception {
String topic = BrokerTestUtil.newUniqueName("testConfigureNegativeAckPrecisionBitCnt");
long timeDeviation = 1L << negativeAckPrecisionBitCnt;
long delayInMs = 2000;

@Cleanup
Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING)
.topic(topic)
.subscriptionName("sub1")
.acknowledgmentGroupTime(0, TimeUnit.SECONDS)
.subscriptionType(SubscriptionType.Shared)
.negativeAckRedeliveryDelay(delayInMs, TimeUnit.MILLISECONDS)
.negativeAckRedeliveryDelayPrecision(negativeAckPrecisionBitCnt)
.subscribe();

@Cleanup
Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
.topic(topic)
.create();
producer.sendAsync("test-0");
producer.flush();

// receive the message and negative ack
consumer.negativeAcknowledge(consumer.receive());
long expectedTime = System.currentTimeMillis() + delayInMs;

// receive the redelivered message and calculate the time deviation
// assert that the redelivery time is no earlier than the `expected time - timeDeviation`
Message<String> msg1 = consumer.receive();
assertTrue(System.currentTimeMillis() >= expectedTime - timeDeviation);
assertNotNull(msg1);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -243,6 +243,19 @@ public interface ConsumerBuilder<T> extends Cloneable {
*/
ConsumerBuilder<T> negativeAckRedeliveryDelay(long redeliveryDelay, TimeUnit timeUnit);

/**
* Sets the redelivery time precision bit count. The lower bits of the redelivery time will be
* trimmed to reduce the memory occupation. The default value is 8, which means the redelivery time
* will be bucketed by 256ms, the redelivery time could be earlier(no later) than the expected time,
* but no more than 256ms. If set to k, the redelivery time will be bucketed by 2^k ms.
* If the value is 0, the redelivery time will be accurate to ms.
*
* @param negativeAckPrecisionBitCnt
* The redelivery time precision bit count.
* @return the consumer builder instance
*/
ConsumerBuilder<T> negativeAckRedeliveryDelayPrecision(int negativeAckPrecisionBitCount);

/**
* Select the subscription type to be used when subscribing to a topic.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -281,6 +281,13 @@ public ConsumerBuilder<T> negativeAckRedeliveryDelay(long redeliveryDelay, TimeU
return this;
}

@Override
public ConsumerBuilder<T> negativeAckRedeliveryDelayPrecision(int negativeAckPrecisionBitCount) {
checkArgument(negativeAckPrecisionBitCount >= 0, "negativeAckPrecisionBitCount needs to be >= 0");
conf.setNegativeAckPrecisionBitCnt(negativeAckPrecisionBitCount);
return this;
}

@Override
public ConsumerBuilder<T> subscriptionType(@NonNull SubscriptionType subscriptionType) {
conf.setSubscriptionType(subscriptionType);
Expand Down

0 comments on commit ec1e2c7

Please sign in to comment.