Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add metrics for writing or reading size of cursor #11500

Merged
merged 2 commits into from
Aug 10, 2021
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -70,4 +70,33 @@ public interface ManagedCursorMXBean {
*/
long getPersistZookeeperErrors();

/**
* Add write data to a ledger of a cursor (in bytes).
*
* @param size Size of data written to cursor (in bytes)
*/
void addWriteCursorLedgerSize(long size);
Technoboy- marked this conversation as resolved.
Show resolved Hide resolved
Technoboy- marked this conversation as resolved.
Show resolved Hide resolved

/**
* Add read data from a ledger of a cursor (in bytes).
*
* @param size Size of data read from cursor (in bytes)
*/
void addReadCursorLedgerSize(long size);

/**
* @return the size of data written to cursor (in bytes)
*/
long getWriteCursorLedgerSize();

/**
* @return the size of data written to cursor without replicas (in bytes)
*/
long getWriteCursorLedgerLogicalSize();

/**
* @return the size of data read from cursor (in bytes)
*/
long getReadCursorLedgerSize();

}
Original file line number Diff line number Diff line change
Expand Up @@ -421,6 +421,7 @@ protected void recoverFromLedger(final ManagedCursorInfo info, final VoidCallbac
}

LedgerEntry entry = seq.nextElement();
mbean.addReadCursorLedgerSize(entry.getLength());
PositionInfo positionInfo;
try {
positionInfo = PositionInfo.parseFrom(entry.getEntry());
Expand Down Expand Up @@ -2599,7 +2600,8 @@ void persistPositionToLedger(final LedgerHandle lh, MarkDeleteEntry mdEntry, fin
}

checkNotNull(lh);
lh.asyncAddEntry(pi.toByteArray(), (rc, lh1, entryId, ctx) -> {
byte[] data = pi.toByteArray();
lh.asyncAddEntry(data, (rc, lh1, entryId, ctx) -> {
if (rc == BKException.Code.OK) {
if (log.isDebugEnabled()) {
log.debug("[{}] Updated cursor {} position {} in meta-ledger {}", ledger.getName(), name, position,
Expand All @@ -2614,6 +2616,7 @@ void persistPositionToLedger(final LedgerHandle lh, MarkDeleteEntry mdEntry, fin
}

mbean.persistToLedger(true);
mbean.addWriteCursorLedgerSize(data.length);
callback.operationComplete();
} else {
log.warn("[{}] Error updating cursor {} position {} in meta-ledger {}: {}", ledger.getName(), name,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,10 @@ public class ManagedCursorMXBeanImpl implements ManagedCursorMXBean {
private final LongAdder persistZookeeperSucceed = new LongAdder();
private final LongAdder persistZookeeperFailed = new LongAdder();

private final LongAdder writeCursorLedgerSize = new LongAdder();
private final LongAdder writeCursorLedgerLogicalSize = new LongAdder();
private final LongAdder readCursorLedgerSize = new LongAdder();

private final ManagedCursor managedCursor;

public ManagedCursorMXBeanImpl(ManagedCursor managedCursor) {
Expand Down Expand Up @@ -83,4 +87,30 @@ public long getPersistZookeeperSucceed() {
public long getPersistZookeeperErrors() {
return persistZookeeperFailed.longValue();
}

@Override
public void addWriteCursorLedgerSize(final long size) {
writeCursorLedgerSize.add(size * ((ManagedCursorImpl) managedCursor).config.getWriteQuorumSize());
writeCursorLedgerLogicalSize.add(size);
}

@Override
public void addReadCursorLedgerSize(final long size) {
readCursorLedgerSize.add(size);
}

@Override
public long getWriteCursorLedgerSize() {
return writeCursorLedgerSize.longValue();
}

@Override
public long getWriteCursorLedgerLogicalSize() {
return writeCursorLedgerLogicalSize.longValue();
}

@Override
public long getReadCursorLedgerSize() {
return readCursorLedgerSize.longValue();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,9 @@ private List<Metrics> aggregate() {
metrics.put("brk_ml_cursor_persistLedgerErrors", cStats.getPersistLedgerErrors());
metrics.put("brk_ml_cursor_persistZookeeperSucceed", cStats.getPersistZookeeperSucceed());
metrics.put("brk_ml_cursor_persistZookeeperErrors", cStats.getPersistZookeeperErrors());
metrics.put("brk_ml_cursor_writeLedgerSize", cStats.getWriteCursorLedgerSize());
metrics.put("brk_ml_cursor_writeLedgerLogicalSize", cStats.getWriteCursorLedgerLogicalSize());
metrics.put("brk_ml_cursor_readLedgerSize", cStats.getReadCursorLedgerSize());
metricsCollection.add(metrics);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package org.apache.pulsar.broker.stats;

import lombok.Cleanup;
import org.apache.bookkeeper.client.PulsarMockLedgerHandle;
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
import org.apache.pulsar.broker.stats.metrics.ManagedCursorMetrics;
Expand Down Expand Up @@ -90,4 +91,66 @@ public void testManagedCursorMetrics() throws Exception {
Assert.assertEquals(metricsList.get(0).getMetrics().get("brk_ml_cursor_nonContiguousDeletedMessagesRange"), 0L);
}

@Test
public void testCursorReadWriteMetrics() throws Exception {
final String subName = "read-write";
final String topicName = "persistent://my-namespace/use/my-ns/read-write";
final int messageSize = 10;

ManagedCursorMetrics metrics = new ManagedCursorMetrics(pulsar);

List<Metrics> metricsList = metrics.generate();
Assert.assertTrue(metricsList.isEmpty());

metricsList = metrics.generate();
Assert.assertTrue(metricsList.isEmpty());

@Cleanup
Consumer<byte[]> consumer = pulsarClient.newConsumer()
.topic(topicName)
.subscriptionType(SubscriptionType.Shared)
.ackTimeout(1, TimeUnit.SECONDS)
.subscriptionName(subName)
.subscribe();

@Cleanup
Consumer<byte[]> consumer2 = pulsarClient.newConsumer()
.topic(topicName)
.subscriptionType(SubscriptionType.Shared)
.ackTimeout(1, TimeUnit.SECONDS)
.subscriptionName(subName + "-2")
.subscribe();

@Cleanup
Producer<byte[]> producer = pulsarClient.newProducer()
.topic(topicName)
.create();

for (PulsarMockLedgerHandle ledgerHandle : mockBookKeeper.getLedgerMap().values()) {
ledgerHandle.close();
}

for (int i = 0; i < messageSize; i++) {
String message = "my-message-" + i;
producer.send(message.getBytes());
if (i % 2 == 0) {
consumer.acknowledge(consumer.receive().getMessageId());
} else {
consumer2.acknowledge(consumer.receive().getMessageId());
}
}
metricsList = metrics.generate();
Assert.assertEquals(metricsList.size(), 3);
Assert.assertEquals(metricsList.get(0).getMetrics().get("brk_ml_cursor_writeLedgerSize"), 26L);
Technoboy- marked this conversation as resolved.
Show resolved Hide resolved
Assert.assertEquals(metricsList.get(0).getMetrics().get("brk_ml_cursor_writeLedgerLogicalSize"), 13L);
Assert.assertEquals(metricsList.get(0).getMetrics().get("brk_ml_cursor_readLedgerSize"), 0L);

Assert.assertEquals(metricsList.get(1).getMetrics().get("brk_ml_cursor_writeLedgerSize"), 26L);
Assert.assertEquals(metricsList.get(1).getMetrics().get("brk_ml_cursor_writeLedgerLogicalSize"), 13L);
Assert.assertEquals(metricsList.get(1).getMetrics().get("brk_ml_cursor_readLedgerSize"), 0L);

Assert.assertEquals(metricsList.get(2).getMetrics().get("brk_ml_cursor_writeLedgerSize"), 52L);
Assert.assertEquals(metricsList.get(2).getMetrics().get("brk_ml_cursor_writeLedgerLogicalSize"), 26L);
Assert.assertEquals(metricsList.get(2).getMetrics().get("brk_ml_cursor_readLedgerSize"), 0L);
}
}
3 changes: 3 additions & 0 deletions site2/docs/reference-metrics.md
Original file line number Diff line number Diff line change
Expand Up @@ -286,6 +286,9 @@ brk_ml_cursor_persistLedgerErrors(namespace="", ledger_name="", cursor_name:"")|
brk_ml_cursor_persistZookeeperSucceed(namespace="", ledger_name="", cursor_name:"")|Gauge|The number of acknowledgment states that is persistent to ZooKeeper.
brk_ml_cursor_persistZookeeperErrors(namespace="", ledger_name="", cursor_name:"")|Gauge|The number of ledger errors occurred when acknowledgment states fail to be persistent to ZooKeeper.
brk_ml_cursor_nonContiguousDeletedMessagesRange(namespace="", ledger_name="", cursor_name:"")|Gauge|The number of non-contiguous deleted messages ranges.
brk_ml_cursor_writeLedgerSize(namespace="", ledger_name="", cursor_name:"")|Gauge|The size of write to ledger.
brk_ml_cursor_writeLedgerLogicalSize(namespace="", ledger_name="", cursor_name:"")|Gauge|The size of write to ledger (accounting for without replicas).
brk_ml_cursor_readLedgerSize(namespace="", ledger_name="", cursor_name:"")|Gauge|The size of read from ledger.

### LoadBalancing metrics
All the loadbalancing metrics are labelled with the following labels:
Expand Down