diff --git a/lib/BatchMessageContainerBase.cc b/lib/BatchMessageContainerBase.cc index 0cf338fb..807a2615 100644 --- a/lib/BatchMessageContainerBase.cc +++ b/lib/BatchMessageContainerBase.cc @@ -92,7 +92,7 @@ void BatchMessageContainerBase::processAndClear( std::function opSendMsgCallback, FlushCallback flushCallback) { if (isEmpty()) { if (flushCallback) { - flushCallback(ResultOk); + // do nothing, flushCallback complete until the lastOpSend complete } } else { const auto numBatches = getNumBatches(); diff --git a/lib/ProducerImpl.cc b/lib/ProducerImpl.cc index 0b7f482d..2f708b18 100644 --- a/lib/ProducerImpl.cc +++ b/lib/ProducerImpl.cc @@ -372,8 +372,16 @@ void ProducerImpl::flushAsync(FlushCallback callback) { if (batchMessageContainer_) { Lock lock(mutex_); auto failures = batchMessageAndSend(callback); - lock.unlock(); - failures.complete(); + if (!pendingMessagesQueue_.empty()) { + auto& opSendMsg = pendingMessagesQueue_.back(); + lock.unlock(); + failures.complete(); + opSendMsg.addTrackerCallback(callback); + } else { + lock.unlock(); + failures.complete(); + callback(ResultOk); + } } else { Lock lock(mutex_); if (!pendingMessagesQueue_.empty()) { diff --git a/tests/ProducerTest.cc b/tests/ProducerTest.cc index 77cb6190..eeda2b47 100644 --- a/tests/ProducerTest.cc +++ b/tests/ProducerTest.cc @@ -439,6 +439,68 @@ TEST_P(ProducerTest, testFlushNoBatch) { client.close(); } +TEST_P(ProducerTest, testFlushBatch) { + Client client(serviceUrl); + + auto partitioned = GetParam(); + const auto topicName = std::string("testFlushNoBatch") + + (partitioned ? "partitioned-" : "-no-partitioned-") + + std::to_string(time(nullptr)); + + if (partitioned) { + // call admin api to make it partitioned + std::string url = adminUrl + "admin/v2/persistent/public/default/" + topicName + "/partitions"; + int res = makePutRequest(url, "5"); + LOG_INFO("res = " << res); + ASSERT_FALSE(res != 204 && res != 409); + } + + ProducerConfiguration producerConfiguration; + producerConfiguration.setBatchingEnabled(true); + producerConfiguration.setBatchingMaxMessages(10); + producerConfiguration.setBatchingMaxPublishDelayMs(1000); + producerConfiguration.setBatchingMaxAllowedSizeInBytes(4 * 1024 * 1024); + + // test all messages in batch has been sent + Producer producer; + ASSERT_EQ(ResultOk, client.createProducer(topicName, producerConfiguration, producer)); + + std::atomic_int needCallBack(100); + auto cb = [&needCallBack](Result code, const MessageId& msgId) { + ASSERT_EQ(code, ResultOk); + needCallBack.fetch_sub(1); + }; + + for (int i = 0; i < 100; ++i) { + Message msg = MessageBuilder().setContent("content").build(); + producer.sendAsync(msg, cb); + } + + producer.flush(); + ASSERT_EQ(needCallBack.load(), 0); + producer.close(); + + // test remain messages in batch not send + ASSERT_EQ(ResultOk, client.createProducer(topicName, producerConfiguration, producer)); + + std::atomic_int needCallBack2(105); + auto cb2 = [&needCallBack2](Result code, const MessageId& msgId) { + ASSERT_EQ(code, ResultOk); + needCallBack2.fetch_sub(1); + }; + + for (int i = 0; i < 105; ++i) { + Message msg = MessageBuilder().setContent("content").build(); + producer.sendAsync(msg, cb2); + } + + producer.flush(); + ASSERT_EQ(needCallBack2.load(), 0); + producer.close(); + + client.close(); +} + TEST(ProducerTest, testCloseSubProducerWhenFail) { Client client(serviceUrl);