From aefe39e2b3fd7c42030aa91c542bf03e4e018c86 Mon Sep 17 00:00:00 2001 From: fanjianye Date: Mon, 17 Jul 2023 16:41:06 +0800 Subject: [PATCH 1/3] fix flush batch message error --- lib/BatchMessageContainerBase.cc | 2 +- lib/ProducerImpl.cc | 12 +++++- tests/ProducerTest.cc | 63 ++++++++++++++++++++++++++++++++ 3 files changed, 74 insertions(+), 3 deletions(-) diff --git a/lib/BatchMessageContainerBase.cc b/lib/BatchMessageContainerBase.cc index 0cf338fb..807a2615 100644 --- a/lib/BatchMessageContainerBase.cc +++ b/lib/BatchMessageContainerBase.cc @@ -92,7 +92,7 @@ void BatchMessageContainerBase::processAndClear( std::function opSendMsgCallback, FlushCallback flushCallback) { if (isEmpty()) { if (flushCallback) { - flushCallback(ResultOk); + // do nothing, flushCallback complete until the lastOpSend complete } } else { const auto numBatches = getNumBatches(); diff --git a/lib/ProducerImpl.cc b/lib/ProducerImpl.cc index 0b7f482d..2f708b18 100644 --- a/lib/ProducerImpl.cc +++ b/lib/ProducerImpl.cc @@ -372,8 +372,16 @@ void ProducerImpl::flushAsync(FlushCallback callback) { if (batchMessageContainer_) { Lock lock(mutex_); auto failures = batchMessageAndSend(callback); - lock.unlock(); - failures.complete(); + if (!pendingMessagesQueue_.empty()) { + auto& opSendMsg = pendingMessagesQueue_.back(); + lock.unlock(); + failures.complete(); + opSendMsg.addTrackerCallback(callback); + } else { + lock.unlock(); + failures.complete(); + callback(ResultOk); + } } else { Lock lock(mutex_); if (!pendingMessagesQueue_.empty()) { diff --git a/tests/ProducerTest.cc b/tests/ProducerTest.cc index 77cb6190..f02a2c75 100644 --- a/tests/ProducerTest.cc +++ b/tests/ProducerTest.cc @@ -439,6 +439,69 @@ TEST_P(ProducerTest, testFlushNoBatch) { client.close(); } +TEST_P(ProducerTest, testFlushBatch) { + Client client(serviceUrl); + + auto partitioned = GetParam(); + const auto topicName = std::string("testFlushNoBatch") + + (partitioned ? "partitioned-" : "-no-partitioned-") + + std::to_string(time(nullptr)); + + if (partitioned) { + // call admin api to make it partitioned + std::string url = adminUrl + "admin/v2/persistent/public/default/" + topicName + "/partitions"; + int res = makePutRequest(url, "5"); + LOG_INFO("res = " << res); + ASSERT_FALSE(res != 204 && res != 409); + } + + ProducerConfiguration producerConfiguration; + producerConfiguration.setBatchingEnabled(true); + producerConfiguration.setBatchingMaxMessages(10); + producerConfiguration.setBatchingMaxPublishDelayMs(1000); + producerConfiguration.setBatchingMaxAllowedSizeInBytes(4*1024*1024); + + // test all messages in batch has been sent + Producer producer; + ASSERT_EQ(ResultOk, client.createProducer(topicName, producerConfiguration, producer)); + + std::atomic_int needCallBack(100); + auto cb = [&needCallBack](Result code, const MessageId& msgId) { + ASSERT_EQ(code, ResultOk); + needCallBack.fetch_sub(1); + }; + + for (int i = 0; i < 100; ++i) { + Message msg = MessageBuilder().setContent("content").build(); + producer.sendAsync(msg, cb); + } + + producer.flush(); + ASSERT_EQ(needCallBack.load(), 0); + producer.close(); + + + // test remain messages in batch not send + ASSERT_EQ(ResultOk, client.createProducer(topicName, producerConfiguration, producer)); + + std::atomic_int needCallBack2(105); + auto cb2 = [&needCallBack2](Result code, const MessageId& msgId) { + ASSERT_EQ(code, ResultOk); + needCallBack2.fetch_sub(1); + }; + + for (int i = 0; i < 105; ++i) { + Message msg = MessageBuilder().setContent("content").build(); + producer.sendAsync(msg, cb2); + } + + producer.flush(); + ASSERT_EQ(needCallBack2.load(), 0); + producer.close(); + + client.close(); +} + TEST(ProducerTest, testCloseSubProducerWhenFail) { Client client(serviceUrl); From 0e6b284d119e1882d5f8025b23db51bf9a3245d4 Mon Sep 17 00:00:00 2001 From: Yunze Xu Date: Mon, 24 Jul 2023 10:39:33 +0800 Subject: [PATCH 2/3] Update tests/ProducerTest.cc --- tests/ProducerTest.cc | 1 - 1 file changed, 1 deletion(-) diff --git a/tests/ProducerTest.cc b/tests/ProducerTest.cc index f02a2c75..d856e956 100644 --- a/tests/ProducerTest.cc +++ b/tests/ProducerTest.cc @@ -480,7 +480,6 @@ TEST_P(ProducerTest, testFlushBatch) { ASSERT_EQ(needCallBack.load(), 0); producer.close(); - // test remain messages in batch not send ASSERT_EQ(ResultOk, client.createProducer(topicName, producerConfiguration, producer)); From b408ec30cd56589352dd48c0ecbd8b8c24447a1b Mon Sep 17 00:00:00 2001 From: Yunze Xu Date: Mon, 24 Jul 2023 10:39:42 +0800 Subject: [PATCH 3/3] Fix format check --- tests/ProducerTest.cc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/ProducerTest.cc b/tests/ProducerTest.cc index d856e956..eeda2b47 100644 --- a/tests/ProducerTest.cc +++ b/tests/ProducerTest.cc @@ -459,7 +459,7 @@ TEST_P(ProducerTest, testFlushBatch) { producerConfiguration.setBatchingEnabled(true); producerConfiguration.setBatchingMaxMessages(10); producerConfiguration.setBatchingMaxPublishDelayMs(1000); - producerConfiguration.setBatchingMaxAllowedSizeInBytes(4*1024*1024); + producerConfiguration.setBatchingMaxAllowedSizeInBytes(4 * 1024 * 1024); // test all messages in batch has been sent Producer producer;