You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
Versions
Pulsar version 2.10.4
OS: Ubuntu(version=22.04)
Client(Python): pulsar-client(Version=3.3.0)
Minimal reproduce step
Create a partitioned topic with one partition
bin/pulsar-admin topics create-partitioned-topic persistent://public/default/test-consumer -p 1
Publish some messages
Create one consumer as following
consumer = client.subscribe( topic='persistent://public/default/test-consumer', subscription_name='test', consumer_type=ConsumerType.Shared, initial_position=InitialPosition.Earliest, receiver_queue_size=1, max_total_receiver_queue_size_across_partitions=1, consumer_name=None, negative_ack_redelivery_delay_ms=60000, unacked_messages_timeout_ms=3600000)
Receive and ack one message
consumer.receive(timeout_millis=30000)
consumer.acknowledge(message=message_id)
Check stats(Its showing 2 unack messages but its not received yet, also if we create another consumer and try to consume messages its not delivering)
What did you expect to see?
It should not show 2 unack messages, instead it should show 1 message since receiver size is 1.
What did you see instead?
I can see 2 unack in stats as following
Command: bin/pulsar-admin topics partitioned-stats persistent://public/default/test-consumer
Versions
Pulsar version 2.10.4
OS: Ubuntu(version=22.04)
Client(Python): pulsar-client(Version=3.3.0)
Minimal reproduce step
Create a partitioned topic with one partition
bin/pulsar-admin topics create-partitioned-topic persistent://public/default/test-consumer -p 1
Publish some messages
Create one consumer as following
consumer = client.subscribe( topic='persistent://public/default/test-consumer', subscription_name='test', consumer_type=ConsumerType.Shared, initial_position=InitialPosition.Earliest, receiver_queue_size=1, max_total_receiver_queue_size_across_partitions=1, consumer_name=None, negative_ack_redelivery_delay_ms=60000, unacked_messages_timeout_ms=3600000)
Receive and ack one message
consumer.receive(timeout_millis=30000)
consumer.acknowledge(message=message_id)
Check stats(Its showing 2 unack messages but its not received yet, also if we create another consumer and try to consume messages its not delivering)
What did you expect to see?
It should not show 2 unack messages, instead it should show 1 message since receiver size is 1.
What did you see instead?
I can see 2 unack in stats as following
Command: bin/pulsar-admin topics partitioned-stats persistent://public/default/test-consumer
{
"msgRateIn" : 0.0,
"msgThroughputIn" : 0.0,
"msgRateOut" : 0.07158759338136238,
"msgThroughputOut" : 4.271393071754622,
"bytesInCounter" : 179,
"msgInCounter" : 3,
"bytesOutCounter" : 179,
"msgOutCounter" : 3,
"averageMsgSize" : 0.0,
"msgChunkPublished" : false,
"storageSize" : 179,
"backlogSize" : 120,
"publishRateLimitedTimes" : 0,
"earliestMsgPublishTimeInBacklogs" : 0,
"offloadedStorageSize" : 0,
"lastOffloadLedgerId" : 0,
"lastOffloadSuccessTimeStamp" : 0,
"lastOffloadFailureTimeStamp" : 0,
"publishers" : [ ],
"waitingPublishers" : 0,
"subscriptions" : {
"test" : {
"msgRateOut" : 0.07158759338136238,
"msgThroughputOut" : 4.271393071754622,
"bytesOutCounter" : 179,
"msgOutCounter" : 3,
"msgRateRedeliver" : 0.0,
"messageAckRate" : 0.0,
"chunkedMessageRate" : 0,
"msgBacklog" : 2,
"backlogSize" : 0,
"earliestMsgPublishTimeInBacklog" : 0,
"msgBacklogNoDelayed" : 2,
"blockedSubscriptionOnUnackedMsgs" : false,
"msgDelayed" : 0,
"unackedMessages" : 2,
"msgRateExpired" : 0.0,
"totalMsgExpired" : 0,
"lastExpireTimestamp" : 0,
"lastConsumedFlowTimestamp" : 0,
"lastConsumedTimestamp" : 0,
"lastAckedTimestamp" : 0,
"lastMarkDeleteAdvancedTimestamp" : 0,
"consumers" : [ {
"msgRateOut" : 0.07158759338136238,
"msgThroughputOut" : 4.271393071754622,
"bytesOutCounter" : 179,
"msgOutCounter" : 3,
"msgRateRedeliver" : 0.0,
"messageAckRate" : 0.023862530971099603,
"chunkedMessageRate" : 0.0,
"availablePermits" : 0,
"unackedMessages" : 2,
"avgMessagesPerEntry" : 0,
"blockedConsumerOnUnackedMsgs" : false,
"lastAckedTimestamp" : 0,
"lastConsumedTimestamp" : 0
}, {
"msgRateOut" : 0.0,
"msgThroughputOut" : 0.0,
"bytesOutCounter" : 0,
"msgOutCounter" : 0,
"msgRateRedeliver" : 0.0,
"messageAckRate" : 0.0,
"chunkedMessageRate" : 0.0,
"availablePermits" : 1,
"unackedMessages" : 0,
"avgMessagesPerEntry" : 0,
"blockedConsumerOnUnackedMsgs" : false,
"lastAckedTimestamp" : 0,
"lastConsumedTimestamp" : 0
} ],
"isDurable" : true,
"isReplicated" : false,
"allowOutOfOrderDelivery" : false,
"consumersAfterMarkDeletePosition" : { },
"nonContiguousDeletedMessagesRanges" : 0,
"nonContiguousDeletedMessagesRangesSerializedSize" : 0,
"subscriptionProperties" : { },
"durable" : true,
"replicated" : false
}
},
"replication" : { },
"nonContiguousDeletedMessagesRanges" : 0,
"nonContiguousDeletedMessagesRangesSerializedSize" : 0,
"compaction" : {
"lastCompactionRemovedEventCount" : 0,
"lastCompactionSucceedTimestamp" : 0,
"lastCompactionFailedTimestamp" : 0,
"lastCompactionDurationTimeInMills" : 0
},
"metadata" : {
"partitions" : 1
},
"partitions" : { }
}
The text was updated successfully, but these errors were encountered: