Skip to content

Commit

Permalink
Add metrics storageLogicalSize for the TopicStats and NamespaceStat…
Browse files Browse the repository at this point in the history
…s and `pulsar_storage_logical_size` to the prometheus endpoint.
  • Loading branch information
Technoboy- committed Jul 22, 2021
1 parent 8841c81 commit 5472bea
Show file tree
Hide file tree
Showing 10 changed files with 30 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,11 @@ public interface ManagedLedgerMXBean {
*/
long getStoredMessagesSize();

/**
* @return the total size of the messages in active ledgers (without accounting for replicas)
*/
long getStoredMessagesLogicalSize();

/**
* @return the number of backlog messages for all the consumers
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -281,6 +281,11 @@ public long getStoredMessagesSize() {
return managedLedger.getTotalSize() * managedLedger.getConfig().getWriteQuorumSize();
}

@Override
public long getStoredMessagesLogicalSize() {
return managedLedger.getTotalSize();
}

@Override
public long getNumberOfMessagesInBacklog() {
long count = 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ void updateStats(TopicStats stats) {
msgOutCounter += stats.msgOutCounter;

managedLedgerStats.storageSize += stats.managedLedgerStats.storageSize;
managedLedgerStats.storageLogicalSize += stats.managedLedgerStats.storageLogicalSize;
managedLedgerStats.backlogSize += stats.managedLedgerStats.backlogSize;
managedLedgerStats.offloadedStorageUsed += stats.managedLedgerStats.offloadedStorageUsed;
backlogQuotaLimit = Math.max(backlogQuotaLimit, stats.backlogQuotaLimit);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ public class ManagedLedgerStats {
long storageSize;
long backlogSize;
long offloadedStorageUsed;
long storageLogicalSize;

StatsBuckets storageWriteLatencyBuckets = new StatsBuckets(ManagedLedgerMBeanImpl.ENTRY_LATENCY_BUCKETS_USEC);
StatsBuckets storageLedgerWriteLatencyBuckets = new StatsBuckets(ManagedLedgerMBeanImpl.ENTRY_LATENCY_BUCKETS_USEC);
Expand All @@ -40,6 +41,7 @@ public void reset() {
storageReadRate = 0;
backlogSize = 0;
offloadedStorageUsed = 0;
storageLogicalSize = 0;

storageWriteLatencyBuckets.reset();
storageLedgerWriteLatencyBuckets.reset();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ private static void getTopicStats(Topic topic, TopicStats stats, boolean include
ManagedLedgerMBeanImpl mlStats = (ManagedLedgerMBeanImpl) ml.getStats();

stats.managedLedgerStats.storageSize = mlStats.getStoredMessagesSize();
stats.managedLedgerStats.storageLogicalSize = mlStats.getStoredMessagesLogicalSize();
stats.managedLedgerStats.backlogSize = ml.getEstimatedBacklogSize();
stats.managedLedgerStats.offloadedStorageUsed = ml.getOffloadedSize();
stats.backlogQuotaLimit = topic.getBacklogQuota().getLimitSize();
Expand Down Expand Up @@ -258,6 +259,7 @@ private static void printNamespaceStats(SimpleTextOutputStream stream, String cl
metric(stream, cluster, namespace, "pulsar_out_messages_total", stats.msgOutCounter);

metric(stream, cluster, namespace, "pulsar_storage_size", stats.managedLedgerStats.storageSize);
metric(stream, cluster, namespace, "pulsar_storage_logical_size", stats.managedLedgerStats.storageLogicalSize);
metric(stream, cluster, namespace, "pulsar_storage_backlog_size", stats.managedLedgerStats.backlogSize);
metric(stream, cluster, namespace, "pulsar_storage_offloaded_size",
stats.managedLedgerStats.offloadedStorageUsed);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ static void printTopicStats(SimpleTextOutputStream stream, String cluster, Strin
metric(stream, cluster, namespace, topic, "pulsar_average_msg_size", stats.averageMsgSize);

metric(stream, cluster, namespace, topic, "pulsar_storage_size", stats.managedLedgerStats.storageSize);
metric(stream, cluster, namespace, topic, "pulsar_storage_logical_size", stats.managedLedgerStats.storageLogicalSize);
metric(stream, cluster, namespace, topic, "pulsar_msg_backlog", stats.msgBacklog);
metric(stream, cluster, namespace, topic, "pulsar_storage_backlog_size",
stats.managedLedgerStats.backlogSize);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,7 @@ private static void generateManageLedgerStats(ManagedLedger managedLedger, Simpl
ManagedLedgerMBeanImpl mlStats = (ManagedLedgerMBeanImpl) managedLedger.getStats();

managedLedgerStats.storageSize = mlStats.getStoredMessagesSize();
managedLedgerStats.storageLogicalSize = mlStats.getStoredMessagesLogicalSize();
managedLedgerStats.backlogSize = managedLedger.getEstimatedBacklogSize();
managedLedgerStats.offloadedStorageUsed = managedLedger.getOffloadedSize();

Expand Down Expand Up @@ -167,6 +168,8 @@ private static void printManageLedgerStats(SimpleTextOutputStream stream, String

metrics(stream, cluster, namespace, topic, subscription,
"pulsar_storage_size", stats.storageSize);
metrics(stream, cluster, namespace, topic, subscription,
"pulsar_storage_logical_size", stats.storageLogicalSize);
metrics(stream, cluster, namespace, topic, subscription,
"pulsar_storage_backlog_size", stats.backlogSize);
metrics(stream, cluster, namespace, topic, subscription,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,10 @@ public void testManagedLedgerMetrics() throws Exception{
checkManagedLedgerMetrics(subName, 32, metric);
checkManagedLedgerMetrics(MLTransactionLogImpl.TRANSACTION_SUBSCRIPTION_NAME, 252, metric);

metric = metrics.get("pulsar_storage_logical_size");
checkManagedLedgerMetrics(subName, 16, metric);
checkManagedLedgerMetrics(MLTransactionLogImpl.TRANSACTION_SUBSCRIPTION_NAME, 126, metric);

metric = metrics.get("pulsar_storage_backlog_size");
checkManagedLedgerMetrics(subName, 16, metric);
checkManagedLedgerMetrics(MLTransactionLogImpl.TRANSACTION_SUBSCRIPTION_NAME, 126, metric);
Expand All @@ -245,6 +249,8 @@ public void testManagedLedgerMetrics() throws Exception{
metrics = parseMetrics(metricsStr);
metric = metrics.get("pulsar_storage_size");
assertEquals(metric.size(), 3);
metric = metrics.get("pulsar_storage_logical_size");
assertEquals(metric.size(), 2);
metric = metrics.get("pulsar_storage_backlog_size");
assertEquals(metric.size(), 2);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ public void testSimpleAggregation() {
topicStats1.throughputIn = 10240.0;
topicStats1.throughputOut = 20480.0;
topicStats1.managedLedgerStats.storageSize = 5120;
topicStats1.managedLedgerStats.storageLogicalSize = 2048;
topicStats1.msgBacklog = 30;
topicStats1.managedLedgerStats.storageWriteRate = 12.0;
topicStats1.managedLedgerStats.storageReadRate = 6.0;
Expand Down Expand Up @@ -70,6 +71,7 @@ public void testSimpleAggregation() {
topicStats2.throughputIn = 512.0;
topicStats2.throughputOut = 1024.5;
topicStats2.managedLedgerStats.storageSize = 1024;
topicStats2.managedLedgerStats.storageLogicalSize = 512;
topicStats2.msgBacklog = 7;
topicStats2.managedLedgerStats.storageWriteRate = 5.0;
topicStats2.managedLedgerStats.storageReadRate = 2.5;
Expand Down Expand Up @@ -108,6 +110,8 @@ public void testSimpleAggregation() {
assertEquals(nsStats.msgBacklog, 37);
assertEquals(nsStats.managedLedgerStats.storageWriteRate, 17.0);
assertEquals(nsStats.managedLedgerStats.storageReadRate, 8.5);
assertEquals(nsStats.managedLedgerStats.storageSize, 6144);
assertEquals(nsStats.managedLedgerStats.storageLogicalSize, 2560);

AggregatedReplicationStats nsReplStats = nsStats.replicationStats.get(namespace);
assertNotNull(nsReplStats);
Expand Down
1 change: 1 addition & 0 deletions site2/docs/reference-metrics.md
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,7 @@ All the namespace metrics are labelled with the following labels:
| pulsar_throughput_in | Gauge | The total throughput of the namespace coming into this broker (bytes/second). |
| pulsar_throughput_out | Gauge | The total throughput of the namespace going out from this broker (bytes/second). |
| pulsar_storage_size | Gauge | The total storage size of the topics in this namespace owned by this broker (bytes). |
| pulsar_storage_logical_size | Gauge | The storage size (without accounting for replicas) of the topics in this namespace owned by this broker (bytes). |
| pulsar_storage_backlog_size | Gauge | The total backlog size of the topics of this namespace owned by this broker (messages). |
| pulsar_storage_offloaded_size | Gauge | The total amount of the data in this namespace offloaded to the tiered storage (bytes). |
| pulsar_storage_write_rate | Gauge | The total message batches (entries) written to the storage for this namespace (message batches / second). |
Expand Down

0 comments on commit 5472bea

Please sign in to comment.