Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Bug] Msgbacklog is not cleared when individual acknowledge chunked messages #320

Closed
2 tasks done
RobertIndie opened this issue Sep 26, 2023 · 0 comments · Fixed by #321
Closed
2 tasks done

[Bug] Msgbacklog is not cleared when individual acknowledge chunked messages #320

RobertIndie opened this issue Sep 26, 2023 · 0 comments · Fixed by #321
Assignees

Comments

@RobertIndie
Copy link
Member

Search before asking

  • I searched in the issues and found nothing similar.

Version

master

Minimal reproduce step

Reproducible code:

TEST_P(MessageChunkingTest, testEndToEnd) {
    const std::string topic =
        "MessageChunkingTest-EndToEnd-" + toString(GetParam()) + std::to_string(time(nullptr));
    Consumer consumer;
    createConsumer(topic, consumer);
    Producer producer;
    createProducer(topic, producer);

    constexpr int numMessages = 10;

    std::vector<MessageId> sendMessageIds;
    for (int i = 0; i < numMessages; i++) {
        MessageId messageId;
        ASSERT_EQ(ResultOk, producer.send(MessageBuilder().setContent(largeMessage).build(), messageId));
        auto chunkMsgId =
            std::dynamic_pointer_cast<ChunkMessageIdImpl>(PulsarFriend::getMessageIdImpl(messageId));
        ASSERT_TRUE(chunkMsgId);
        LOG_INFO("Send " << i << " to " << messageId);
        sendMessageIds.emplace_back(messageId);
    }

    Message msg;
    std::vector<MessageId> receivedMessageIds;
    for (int i = 0; i < numMessages; i++) {
        ASSERT_EQ(ResultOk, consumer.receive(msg, 3000));
        LOG_INFO("Receive " << msg.getLength() << " bytes from " << msg.getMessageId());
        ASSERT_EQ(msg.getDataAsString(), largeMessage);
        ASSERT_EQ(msg.getMessageId().batchIndex(), -1);
        ASSERT_EQ(msg.getMessageId().batchSize(), 0);
        auto messageId = msg.getMessageId();
        auto chunkMsgId =
            std::dynamic_pointer_cast<ChunkMessageIdImpl>(PulsarFriend::getMessageIdImpl(messageId));
        ASSERT_TRUE(chunkMsgId);
        receivedMessageIds.emplace_back(messageId);
        consumer.acknowledge(messageId);
    }
    ASSERT_EQ(receivedMessageIds, sendMessageIds);
    ASSERT_EQ(receivedMessageIds.front().ledgerId(), receivedMessageIds.front().ledgerId());
    ASSERT_GT(receivedMessageIds.back().entryId(), numMessages);

    // Verify the cache has been cleared
    auto& chunkedMessageCache = PulsarFriend::getChunkedMessageCache(consumer);
    ASSERT_EQ(chunkedMessageCache.size(), 0);

    BrokerConsumerStats consumerStats;
    waitUntil(
        std::chrono::seconds(10),
        [&] {
            if (consumer.getBrokerConsumerStats(consumerStats) != ResultOk) return false;
            LOG_DEBUG(consumerStats);
            return consumerStats.getMsgBacklog() == 0;
        },
        1000);
    ASSERT_EQ(consumerStats.getMsgBacklog(), 0); // Msg back log doesn't get cleared

    producer.close();
    consumer.close();
}

What did you expect to see?

The message backlog should be cleared. The above test should be passed.

What did you see instead?

The backlog doesn't get cleared.

Anything else?

The root cause is the consumer doesn't acknowledge all chunk message Ids

Are you willing to submit a PR?

  • I'm willing to submit a PR!
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging a pull request may close this issue.

1 participant