From 1e502e7cd93c776ae9356adaa6f8117955145d10 Mon Sep 17 00:00:00 2001 From: druidliu Date: Tue, 22 Feb 2022 09:56:30 +0800 Subject: [PATCH] add non-persistent topic metrics (#28) Signed-off-by: druidliu --- .../AggregatedSubscriptionStats.java | 2 + .../prometheus/NamespaceStatsAggregator.java | 74 +++++++++++-------- .../broker/stats/prometheus/TopicStats.java | 2 + .../NonPersistentSubscriptionStatsImpl.java | 3 + 4 files changed, 51 insertions(+), 30 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/AggregatedSubscriptionStats.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/AggregatedSubscriptionStats.java index fb74daf419f59..c829be28e596a 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/AggregatedSubscriptionStats.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/AggregatedSubscriptionStats.java @@ -58,5 +58,7 @@ public class AggregatedSubscriptionStats { long totalMsgExpired; + double msgDropRate; + public Map consumerStat = new HashMap<>(); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java index 2025c59c1a1f7..b1766d580bd90 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java @@ -109,6 +109,36 @@ private static Optional getCompactorMXBean(PulsarService pulsar return Optional.ofNullable(compactor).map(c -> c.getStats()); } + private static void aggregateTopicStats(TopicStats stats, SubscriptionStats subscriptionStats, + AggregatedSubscriptionStats subsStats) { + stats.subscriptionsCount++; + stats.msgBacklog += subscriptionStats.msgBacklog; + subsStats.msgBacklog = subscriptionStats.msgBacklog; + subsStats.msgDelayed = subscriptionStats.msgDelayed; + subsStats.msgRateExpired = subscriptionStats.msgRateExpired; + subsStats.totalMsgExpired = subscriptionStats.totalMsgExpired; + subsStats.msgBacklogNoDelayed = subsStats.msgBacklog - subsStats.msgDelayed; + subsStats.lastExpireTimestamp = subscriptionStats.lastExpireTimestamp; + subsStats.lastAckedTimestamp = subscriptionStats.lastAckedTimestamp; + subsStats.lastConsumedFlowTimestamp = subscriptionStats.lastConsumedFlowTimestamp; + subsStats.lastConsumedTimestamp = subscriptionStats.lastConsumedTimestamp; + subsStats.lastMarkDeleteAdvancedTimestamp = subscriptionStats.lastMarkDeleteAdvancedTimestamp; + subscriptionStats.consumers.forEach(cStats -> { + stats.consumersCount++; + subsStats.unackedMessages += cStats.unackedMessages; + subsStats.msgRateRedeliver += cStats.msgRateRedeliver; + subsStats.msgRateOut += cStats.msgRateOut; + subsStats.msgThroughputOut += cStats.msgThroughputOut; + subsStats.bytesOutCounter += cStats.bytesOutCounter; + subsStats.msgOutCounter += cStats.msgOutCounter; + if (!subsStats.blockedSubscriptionOnUnackedMsgs && cStats.blockedConsumerOnUnackedMsgs) { + subsStats.blockedSubscriptionOnUnackedMsgs = true; + } + }); + stats.rateOut += subsStats.msgRateOut; + stats.throughputOut += subsStats.msgThroughputOut; + } + private static void getTopicStats(Topic topic, TopicStats stats, boolean includeConsumerMetrics, boolean includeProducerMetrics, boolean getPreciseBacklog, boolean subscriptionBacklogSize, Optional compactorMXBean) { @@ -175,37 +205,21 @@ private static void getTopicStats(Topic topic, TopicStats stats, boolean include } }); - tStatus.subscriptions.forEach((subName, subscriptionStats) -> { - stats.subscriptionsCount++; - stats.msgBacklog += subscriptionStats.msgBacklog; - - AggregatedSubscriptionStats subsStats = stats.subscriptionStats - .computeIfAbsent(subName, k -> new AggregatedSubscriptionStats()); - subsStats.msgBacklog = subscriptionStats.msgBacklog; - subsStats.msgDelayed = subscriptionStats.msgDelayed; - subsStats.msgRateExpired = subscriptionStats.msgRateExpired; - subsStats.totalMsgExpired = subscriptionStats.totalMsgExpired; - subsStats.msgBacklogNoDelayed = subsStats.msgBacklog - subsStats.msgDelayed; - subsStats.lastExpireTimestamp = subscriptionStats.lastExpireTimestamp; - subsStats.lastAckedTimestamp = subscriptionStats.lastAckedTimestamp; - subsStats.lastConsumedFlowTimestamp = subscriptionStats.lastConsumedFlowTimestamp; - subsStats.lastConsumedTimestamp = subscriptionStats.lastConsumedTimestamp; - subsStats.lastMarkDeleteAdvancedTimestamp = subscriptionStats.lastMarkDeleteAdvancedTimestamp; - subscriptionStats.consumers.forEach(cStats -> { - stats.consumersCount++; - subsStats.unackedMessages += cStats.unackedMessages; - subsStats.msgRateRedeliver += cStats.msgRateRedeliver; - subsStats.msgRateOut += cStats.msgRateOut; - subsStats.msgThroughputOut += cStats.msgThroughputOut; - subsStats.bytesOutCounter += cStats.bytesOutCounter; - subsStats.msgOutCounter += cStats.msgOutCounter; - if (!subsStats.blockedSubscriptionOnUnackedMsgs && cStats.blockedConsumerOnUnackedMsgs) { - subsStats.blockedSubscriptionOnUnackedMsgs = true; - } + if (topic instanceof PersistentTopic) { + tStatus.subscriptions.forEach((subName, subscriptionStats) -> { + AggregatedSubscriptionStats subsStats = stats.subscriptionStats + .computeIfAbsent(subName, k -> new AggregatedSubscriptionStats()); + aggregateTopicStats(stats, subscriptionStats, subsStats); }); - stats.rateOut += subsStats.msgRateOut; - stats.throughputOut += subsStats.msgThroughputOut; - }); + } else { + ((NonPersistentTopicStats) tStatus).getSubscriptions() + .forEach((subName, nonPersistentSubscriptionStats) -> { + AggregatedSubscriptionStats subsStats = stats.subscriptionStats + .computeIfAbsent(subName, k -> new AggregatedSubscriptionStats()); + aggregateTopicStats(stats, nonPersistentSubscriptionStats, subsStats); + subsStats.msgDropRate += nonPersistentSubscriptionStats.getMsgDropRate(); + }); + } // Consumer stats can be a lot if a subscription has many consumers if (includeConsumerMetrics) { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/TopicStats.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/TopicStats.java index 599c3d82f6fc5..39717217c9c75 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/TopicStats.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/TopicStats.java @@ -267,6 +267,8 @@ static void printTopicStats(SimpleTextOutputStream stream, String cluster, Strin subsStats.msgRateExpired, splitTopicAndPartitionIndexLabel); metric(stream, cluster, namespace, topic, n, "pulsar_subscription_total_msg_expired", subsStats.totalMsgExpired, splitTopicAndPartitionIndexLabel); + metric(stream, cluster, namespace, topic, n, "pulsar_subscription_msg_drop_rate", + subsStats.msgDropRate, splitTopicAndPartitionIndexLabel); subsStats.consumerStat.forEach((c, consumerStats) -> { metric(stream, cluster, namespace, topic, n, c.consumerName(), c.consumerId(), "pulsar_consumer_msg_rate_redeliver", consumerStats.msgRateRedeliver, diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/NonPersistentSubscriptionStatsImpl.java b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/NonPersistentSubscriptionStatsImpl.java index 7e5fb12b06449..3c1083fb7a481 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/NonPersistentSubscriptionStatsImpl.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/NonPersistentSubscriptionStatsImpl.java @@ -22,6 +22,9 @@ import java.util.Objects; import lombok.Getter; import org.apache.pulsar.common.policies.data.NonPersistentSubscriptionStats; +import lombok.Getter; + +import static com.google.common.base.Preconditions.checkNotNull; /** * Statistics for subscription to non-persistent topics.