From 93276ebd181fcf10a3d862e92ba615b544774387 Mon Sep 17 00:00:00 2001 From: thetumbled <843221020@qq.com> Date: Wed, 11 Dec 2024 16:38:18 +0800 Subject: [PATCH 1/2] fix. --- .../org/apache/pulsar/broker/stats/prometheus/TopicStats.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/TopicStats.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/TopicStats.java index e54a3710e1294..524d47e7c1b92 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/TopicStats.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/TopicStats.java @@ -360,7 +360,7 @@ public static void printTopicStats(PrometheusMetricStreams stream, TopicStats st writeSubscriptionMetric(stream, "pulsar_subscription_filter_rescheduled_msg_count", subsStats.filterRescheduledMsgCount, cluster, namespace, topic, sub, splitTopicAndPartitionIndexLabel); - writeSubscriptionMetric(stream, "pulsar_delayed_message_index_size_bytes", + writeSubscriptionMetric(stream, "pulsar_subscription_delayed_message_index_size_bytes", subsStats.delayedMessageIndexSizeInBytes, cluster, namespace, topic, sub, splitTopicAndPartitionIndexLabel); From 068e08835def882e441395e630fb49538d4d67a4 Mon Sep 17 00:00:00 2001 From: thetumbled <843221020@qq.com> Date: Mon, 13 Jan 2025 16:53:56 +0800 Subject: [PATCH 2/2] fix test code. --- .../persistent/PersistentTopicTest.java | 23 +++++++++++++++---- 1 file changed, 18 insertions(+), 5 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java index 903443d37bb07..1196161711224 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java @@ -424,10 +424,11 @@ public void testDelayedDeliveryTrackerMemoryUsageMetric(String topic, boolean ex @Cleanup Producer producer = client.newProducer(Schema.STRING).topic(topic).enableBatching(false).create(); + String subName = "test_sub"; @Cleanup Consumer consumer = client.newConsumer(Schema.STRING) .topic(topic) - .subscriptionName("test_sub") + .subscriptionName(subName) .subscriptionType(SubscriptionType.Shared) .messageListener((MessageListener) (consumer1, msg) -> { try { @@ -453,7 +454,13 @@ public void testDelayedDeliveryTrackerMemoryUsageMetric(String topic, boolean ex Multimap metricsMap = parseMetrics(metricsStr); Collection metrics = metricsMap.get("pulsar_delayed_message_index_size_bytes"); - Assert.assertTrue(metrics.size() > 0); + Collection subMetrics = metricsMap.get("pulsar_subscription_delayed_message_index_size_bytes"); + assertFalse(metrics.isEmpty()); + if (exposeTopicLevelMetrics) { + assertFalse(subMetrics.isEmpty()); + } else { + assertTrue(subMetrics.isEmpty()); + } int topicLevelNum = 0; int namespaceLevelNum = 0; @@ -462,14 +469,20 @@ public void testDelayedDeliveryTrackerMemoryUsageMetric(String topic, boolean ex if (exposeTopicLevelMetrics && metric.tags.get("topic").equals(topic)) { Assert.assertTrue(metric.value > 0); topicLevelNum++; - if ("test_sub".equals(metric.tags.get("subscription"))) { - subscriptionLevelNum++; - } } else if (!exposeTopicLevelMetrics && metric.tags.get("namespace").equals(namespace)) { Assert.assertTrue(metric.value > 0); namespaceLevelNum++; } } + if (exposeTopicLevelMetrics) { + for (Metric metric : subMetrics) { + if (metric.tags.get("topic").equals(topic) && + subName.equals(metric.tags.get("subscription"))) { + Assert.assertTrue(metric.value > 0); + subscriptionLevelNum++; + } + } + } if (exposeTopicLevelMetrics) { Assert.assertTrue(topicLevelNum > 0);