Skip to content
This repository has been archived by the owner on Nov 14, 2024. It is now read-only.

Weight estimate metrics #2725

Merged
merged 1 commit into from
Nov 21, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
package com.palantir.atlasdb.qos.client;

import java.time.Duration;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;

Expand Down Expand Up @@ -53,22 +54,26 @@ public static AtlasDbQosClient create(QosRateLimiters rateLimiters) {

@Override
public <T, E extends Exception> T executeRead(Query<T, E> query, QueryWeigher<T> weigher) throws E {
return execute(query, weigher, rateLimiters.read(), metrics::recordRead);
return execute(query, weigher, rateLimiters.read(), Optional.of(metrics::recordReadEstimate),
metrics::recordRead);
}

@Override
public <T, E extends Exception> T executeWrite(Query<T, E> query, QueryWeigher<T> weigher) throws E {
return execute(query, weigher, rateLimiters.write(), metrics::recordWrite);
return execute(query, weigher, rateLimiters.write(), Optional.empty(), metrics::recordWrite);
}

private <T, E extends Exception> T execute(
Query<T, E> query,
QueryWeigher<T> weigher,
QosRateLimiter rateLimiter,
Optional<Consumer<QueryWeight>> estimatedWeightMetric,
Consumer<QueryWeight> weightMetric) throws E {
long estimatedNumBytes = weigher.estimate().numBytes();
QueryWeight estimatedWeight = weigher.estimate();
estimatedWeightMetric.ifPresent(metric -> metric.accept(estimatedWeight));

try {
Duration waitTime = rateLimiter.consumeWithBackoff(estimatedNumBytes);
Duration waitTime = rateLimiter.consumeWithBackoff(estimatedWeight.numBytes());
metrics.recordBackoffMicros(TimeUnit.NANOSECONDS.toMicros(waitTime.toNanos()));
} catch (RateLimitExceededException ex) {
metrics.recordRateLimitedException();
Expand All @@ -87,7 +92,7 @@ private <T, E extends Exception> T execute(
throw ex;
} finally {
weightMetric.accept(actualWeight);
rateLimiter.recordAdjustment(actualWeight.numBytes() - estimatedNumBytes);
rateLimiter.recordAdjustment(actualWeight.numBytes() - estimatedWeight.numBytes());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,9 @@ public class QosMetrics {
private final Meter readTime;
private final Meter rowsRead;

private final Meter estimatedBytesRead;
private final Meter estimatedRowsRead;

private final Meter writeRequestCount;
private final Meter bytesWritten;
private final Meter writeTime;
Expand All @@ -50,6 +53,9 @@ public QosMetrics() {
readTime = metricsManager.registerMeter(QosMetrics.class, "readTime");
rowsRead = metricsManager.registerMeter(QosMetrics.class, "rowsRead");

estimatedBytesRead = metricsManager.registerMeter(QosMetrics.class, "estimated.bytesRead");
estimatedRowsRead = metricsManager.registerMeter(QosMetrics.class, "estimated.rowsRead");

writeRequestCount = metricsManager.registerMeter(QosMetrics.class, "numWriteRequests");
bytesWritten = metricsManager.registerMeter(QosMetrics.class, "bytesWritten");
writeTime = metricsManager.registerMeter(QosMetrics.class, "writeTime");
Expand All @@ -59,6 +65,11 @@ public QosMetrics() {
rateLimitedExceptions = metricsManager.registerMeter(QosMetrics.class, "rateLimitedExceptions");
}

public void recordReadEstimate(QueryWeight weight) {
estimatedBytesRead.mark(weight.numBytes());
estimatedRowsRead.mark(weight.numDistinctRows());
}

public void recordRead(QueryWeight weight) {
readRequestCount.mark();
bytesRead.mark(weight.numBytes());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ public void consumesSpecifiedNumUnitsForReads() {
public void recordsReadMetrics() throws TestCheckedException {
qosClient.executeRead(() -> "foo", weigher);

verify(metrics).recordReadEstimate(ESTIMATED_WEIGHT);
verify(metrics).recordRead(ACTUAL_WEIGHT);
}

Expand All @@ -120,6 +121,7 @@ public void recordsWriteMetrics() throws TestCheckedException {
qosClient.executeWrite(() -> null, weigher);

verify(metrics).recordWrite(ACTUAL_WEIGHT);
verify(metrics, never()).recordReadEstimate(any());
}

@Test
Expand Down