Skip to content

Commit

Permalink
add non-persistent topic metrics (apache#28)
Browse files Browse the repository at this point in the history
Signed-off-by: druidliu <[email protected]>
  • Loading branch information
druidliu authored and qunzhong committed Aug 26, 2022
1 parent 9419c09 commit 1e502e7
Show file tree
Hide file tree
Showing 4 changed files with 51 additions and 30 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -58,5 +58,7 @@ public class AggregatedSubscriptionStats {

long totalMsgExpired;

double msgDropRate;

public Map<Consumer, AggregatedConsumerStats> consumerStat = new HashMap<>();
}
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,36 @@ private static Optional<CompactorMXBean> getCompactorMXBean(PulsarService pulsar
return Optional.ofNullable(compactor).map(c -> c.getStats());
}

private static void aggregateTopicStats(TopicStats stats, SubscriptionStats subscriptionStats,
AggregatedSubscriptionStats subsStats) {
stats.subscriptionsCount++;
stats.msgBacklog += subscriptionStats.msgBacklog;
subsStats.msgBacklog = subscriptionStats.msgBacklog;
subsStats.msgDelayed = subscriptionStats.msgDelayed;
subsStats.msgRateExpired = subscriptionStats.msgRateExpired;
subsStats.totalMsgExpired = subscriptionStats.totalMsgExpired;
subsStats.msgBacklogNoDelayed = subsStats.msgBacklog - subsStats.msgDelayed;
subsStats.lastExpireTimestamp = subscriptionStats.lastExpireTimestamp;
subsStats.lastAckedTimestamp = subscriptionStats.lastAckedTimestamp;
subsStats.lastConsumedFlowTimestamp = subscriptionStats.lastConsumedFlowTimestamp;
subsStats.lastConsumedTimestamp = subscriptionStats.lastConsumedTimestamp;
subsStats.lastMarkDeleteAdvancedTimestamp = subscriptionStats.lastMarkDeleteAdvancedTimestamp;
subscriptionStats.consumers.forEach(cStats -> {
stats.consumersCount++;
subsStats.unackedMessages += cStats.unackedMessages;
subsStats.msgRateRedeliver += cStats.msgRateRedeliver;
subsStats.msgRateOut += cStats.msgRateOut;
subsStats.msgThroughputOut += cStats.msgThroughputOut;
subsStats.bytesOutCounter += cStats.bytesOutCounter;
subsStats.msgOutCounter += cStats.msgOutCounter;
if (!subsStats.blockedSubscriptionOnUnackedMsgs && cStats.blockedConsumerOnUnackedMsgs) {
subsStats.blockedSubscriptionOnUnackedMsgs = true;
}
});
stats.rateOut += subsStats.msgRateOut;
stats.throughputOut += subsStats.msgThroughputOut;
}

private static void getTopicStats(Topic topic, TopicStats stats, boolean includeConsumerMetrics,
boolean includeProducerMetrics, boolean getPreciseBacklog, boolean subscriptionBacklogSize,
Optional<CompactorMXBean> compactorMXBean) {
Expand Down Expand Up @@ -175,37 +205,21 @@ private static void getTopicStats(Topic topic, TopicStats stats, boolean include
}
});

tStatus.subscriptions.forEach((subName, subscriptionStats) -> {
stats.subscriptionsCount++;
stats.msgBacklog += subscriptionStats.msgBacklog;

AggregatedSubscriptionStats subsStats = stats.subscriptionStats
.computeIfAbsent(subName, k -> new AggregatedSubscriptionStats());
subsStats.msgBacklog = subscriptionStats.msgBacklog;
subsStats.msgDelayed = subscriptionStats.msgDelayed;
subsStats.msgRateExpired = subscriptionStats.msgRateExpired;
subsStats.totalMsgExpired = subscriptionStats.totalMsgExpired;
subsStats.msgBacklogNoDelayed = subsStats.msgBacklog - subsStats.msgDelayed;
subsStats.lastExpireTimestamp = subscriptionStats.lastExpireTimestamp;
subsStats.lastAckedTimestamp = subscriptionStats.lastAckedTimestamp;
subsStats.lastConsumedFlowTimestamp = subscriptionStats.lastConsumedFlowTimestamp;
subsStats.lastConsumedTimestamp = subscriptionStats.lastConsumedTimestamp;
subsStats.lastMarkDeleteAdvancedTimestamp = subscriptionStats.lastMarkDeleteAdvancedTimestamp;
subscriptionStats.consumers.forEach(cStats -> {
stats.consumersCount++;
subsStats.unackedMessages += cStats.unackedMessages;
subsStats.msgRateRedeliver += cStats.msgRateRedeliver;
subsStats.msgRateOut += cStats.msgRateOut;
subsStats.msgThroughputOut += cStats.msgThroughputOut;
subsStats.bytesOutCounter += cStats.bytesOutCounter;
subsStats.msgOutCounter += cStats.msgOutCounter;
if (!subsStats.blockedSubscriptionOnUnackedMsgs && cStats.blockedConsumerOnUnackedMsgs) {
subsStats.blockedSubscriptionOnUnackedMsgs = true;
}
if (topic instanceof PersistentTopic) {
tStatus.subscriptions.forEach((subName, subscriptionStats) -> {
AggregatedSubscriptionStats subsStats = stats.subscriptionStats
.computeIfAbsent(subName, k -> new AggregatedSubscriptionStats());
aggregateTopicStats(stats, subscriptionStats, subsStats);
});
stats.rateOut += subsStats.msgRateOut;
stats.throughputOut += subsStats.msgThroughputOut;
});
} else {
((NonPersistentTopicStats) tStatus).getSubscriptions()
.forEach((subName, nonPersistentSubscriptionStats) -> {
AggregatedSubscriptionStats subsStats = stats.subscriptionStats
.computeIfAbsent(subName, k -> new AggregatedSubscriptionStats());
aggregateTopicStats(stats, nonPersistentSubscriptionStats, subsStats);
subsStats.msgDropRate += nonPersistentSubscriptionStats.getMsgDropRate();
});
}

// Consumer stats can be a lot if a subscription has many consumers
if (includeConsumerMetrics) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -267,6 +267,8 @@ static void printTopicStats(SimpleTextOutputStream stream, String cluster, Strin
subsStats.msgRateExpired, splitTopicAndPartitionIndexLabel);
metric(stream, cluster, namespace, topic, n, "pulsar_subscription_total_msg_expired",
subsStats.totalMsgExpired, splitTopicAndPartitionIndexLabel);
metric(stream, cluster, namespace, topic, n, "pulsar_subscription_msg_drop_rate",
subsStats.msgDropRate, splitTopicAndPartitionIndexLabel);
subsStats.consumerStat.forEach((c, consumerStats) -> {
metric(stream, cluster, namespace, topic, n, c.consumerName(), c.consumerId(),
"pulsar_consumer_msg_rate_redeliver", consumerStats.msgRateRedeliver,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@
import java.util.Objects;
import lombok.Getter;
import org.apache.pulsar.common.policies.data.NonPersistentSubscriptionStats;
import lombok.Getter;

import static com.google.common.base.Preconditions.checkNotNull;

/**
* Statistics for subscription to non-persistent topics.
Expand Down

0 comments on commit 1e502e7

Please sign in to comment.