diff --git a/lib/ConsumerImplBase.cc b/lib/ConsumerImplBase.cc index 851d41e8..098f2d5b 100644 --- a/lib/ConsumerImplBase.cc +++ b/lib/ConsumerImplBase.cc @@ -76,10 +76,7 @@ void ConsumerImplBase::doBatchReceiveTimeTask() { long diff = batchReceivePolicy_.getTimeoutMs() - (TimeUtils::currentTimeMillis() - batchReceive.createAt_); if (diff <= 0) { - Lock batchOptionLock(batchReceiveOptionMutex_); - notifyBatchPendingReceivedCallback(batchReceive.batchReceiveCallback_); - batchOptionLock.unlock(); - batchPendingReceives_.pop(); + notifyBatchPendingReceivedCallback(popBatchReceiveCallback()); } else { hasPendingReceives = true; timeToWaitMs = diff; @@ -96,20 +93,17 @@ void ConsumerImplBase::doBatchReceiveTimeTask() { void ConsumerImplBase::failPendingBatchReceiveCallback() { Lock lock(batchPendingReceiveMutex_); while (!batchPendingReceives_.empty()) { - OpBatchReceive opBatchReceive = batchPendingReceives_.front(); - batchPendingReceives_.pop(); - listenerExecutor_->postWork( - [opBatchReceive]() { opBatchReceive.batchReceiveCallback_(ResultAlreadyClosed, {}); }); + auto callback = popBatchReceiveCallback(); + listenerExecutor_->postWork([callback]() { callback(ResultAlreadyClosed, {}); }); } } void ConsumerImplBase::notifyBatchPendingReceivedCallback() { Lock lock(batchPendingReceiveMutex_); if (!batchPendingReceives_.empty()) { - OpBatchReceive& batchReceive = batchPendingReceives_.front(); - batchPendingReceives_.pop(); + auto callback = popBatchReceiveCallback(); lock.unlock(); - notifyBatchPendingReceivedCallback(batchReceive.batchReceiveCallback_); + notifyBatchPendingReceivedCallback(callback); } } diff --git a/lib/ConsumerImplBase.h b/lib/ConsumerImplBase.h index 1b7e86e1..79601e4d 100644 --- a/lib/ConsumerImplBase.h +++ b/lib/ConsumerImplBase.h @@ -112,6 +112,14 @@ class ConsumerImplBase : public HandlerBase { virtual void setNegativeAcknowledgeEnabledForTesting(bool enabled) = 0; + // Note: it should be protected by batchPendingReceiveMutex_ and called when `batchPendingReceives_` is + // not empty + BatchReceiveCallback popBatchReceiveCallback() { + auto callback = std::move(batchPendingReceives_.front().batchReceiveCallback_); + batchPendingReceives_.pop(); + return callback; + } + friend class MultiTopicsConsumerImpl; friend class PulsarFriend; };