Skip to content
This repository has been archived by the owner on Nov 14, 2024. It is now read-only.

[QoS] Fix/qos system table rate limiting #2739

Merged
merged 73 commits into from
Nov 24, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
73 commits
Select commit Hold shift + click to select a range
2e8c960
Fix SweepBatchConfig values to properly decrease to 1 with each failu…
tboam Nov 8, 2017
f116645
SweeperService logging improvements (#2618)
tboam Nov 8, 2017
c1e21ee
Refactor TracingKVS (#2643)
fsamuel-bs Nov 8, 2017
53afd91
Delete docs (#2657)
hsaraogi Nov 9, 2017
e25d806
[20 minute tasks] Add test for when a batch is full (#2655)
tboam Nov 9, 2017
88e3ffe
MetricRegistry log level downgrade + multiple timestamp tracker tests…
jeremyk-91 Nov 9, 2017
8b67855
Extract interface for Cassandra client (#2660)
fsamuel-bs Nov 10, 2017
c0c05f6
client -> namespace [no release notes] (#2654)
hsaraogi Nov 10, 2017
fede1c3
0.65.2 and 0.66.0 release notes (#2663)
fsamuel-bs Nov 10, 2017
a2be749
[QoS] Add getNamespace to AtlasDBConfig (#2661)
hsaraogi Nov 10, 2017
18d35b6
Live Reloading the TimeLock Block, Part 1: Pull to Push (#2621)
jeremyk-91 Nov 10, 2017
a953328
Live Reloading the TimeLock Block, Part 2: TransactionManagers Plumbi…
jeremyk-91 Nov 10, 2017
8fdd50b
[TTT] [no release notes] Document behaviour regarding index rows (#2658)
jeremyk-91 Nov 13, 2017
6fed36f
Refactor and Instrument CassandraClient api (#2665)
fsamuel-bs Nov 13, 2017
e8e85f9
Live Reloading the TimeLock Block, Part 3: Working with 0 Nodes (#2647)
jeremyk-91 Nov 13, 2017
74180cf
check immutable ts (#2406)
nziebart Nov 13, 2017
d4bf805
Propagate top-level KVS method names to CassandraClient (#2669)
fsamuel-bs Nov 13, 2017
247f60c
Extract cql executor interface (#2670)
fsamuel-bs Nov 13, 2017
913e5e7
bump awaitility (#2668)
hsaraogi Nov 13, 2017
cafc479
Bump Atlas on Tritium 0.8.4 to fix dependency conflicts (#2662)
Nov 14, 2017
7fb4d17
Correctly log Paxos events (#2674)
fsamuel-bs Nov 14, 2017
14216ac
Slow log and tracing (#2673)
fsamuel-bs Nov 14, 2017
78791cf
Refactor cassandra client (#2676)
fsamuel-bs Nov 14, 2017
22e129a
use supplier for object size [no release notes]
Nov 14, 2017
757a282
fix merge
Nov 14, 2017
a45809d
fix merge in AtlasDbConfig
Nov 14, 2017
e3bd685
rate limiting
Nov 15, 2017
dd55403
total-time
Nov 15, 2017
2318f69
qos config
Nov 15, 2017
c7dff29
respect max backoff itme
Nov 15, 2017
76fd664
query weights
Nov 16, 2017
710d216
extra tests
Nov 16, 2017
575b583
num rows
Nov 16, 2017
c9b4839
checkstyle
Nov 16, 2017
29ca959
fix tests
Nov 17, 2017
b35bb7f
no int casting
Nov 17, 2017
f014f56
Qos ete tests
hsaraogi Nov 17, 2017
46e1308
shouldFailIfWritingTooManyBytes
hsaraogi Nov 17, 2017
e6035d0
fix test
hsaraogi Nov 17, 2017
887aa5b
rm file
hsaraogi Nov 17, 2017
3a8f467
Remove metrics
hsaraogi Nov 17, 2017
08a07b2
Test shouldFailIfReadingTooManyBytes
hsaraogi Nov 17, 2017
da40d23
canBeWritingLargeNumberOfBytesConcurrently
hsaraogi Nov 17, 2017
27073f2
checkstyle
hsaraogi Nov 17, 2017
05531a1
cannotWriteLargeNumberOfBytesConcurrently
hsaraogi Nov 17, 2017
e370121
fix tests
hsaraogi Nov 20, 2017
8eef5d5
create tm in test
hsaraogi Nov 21, 2017
53f732e
More read tests (after writing a lot of data at once)
hsaraogi Nov 21, 2017
e2642cf
WIP
hsaraogi Nov 21, 2017
f1887ee
Merge branch 'feature/qos-service-api' into qos-ete-test
hsaraogi Nov 21, 2017
d18c007
Tests that should pas
hsaraogi Nov 21, 2017
fa0b731
Actually update the rate
hsaraogi Nov 21, 2017
6e4745e
Add another test
hsaraogi Nov 21, 2017
a01eef7
More tests and address comments
hsaraogi Nov 22, 2017
2aa69da
Dont extend etesetup
hsaraogi Nov 22, 2017
55a82f2
Make dumping data faster
hsaraogi Nov 22, 2017
2f49e0e
cleanup
hsaraogi Nov 22, 2017
be06b19
wip
hsaraogi Nov 22, 2017
d5b5a8a
Add back lost file
hsaraogi Nov 22, 2017
3f080db
Cleanup
hsaraogi Nov 22, 2017
504f345
Write tests
hsaraogi Nov 22, 2017
59e2b6b
numReadsPerThread -> numThreads
hsaraogi Nov 22, 2017
35327a9
More write tests, cleanup, check style fixes
hsaraogi Nov 22, 2017
b04b4fd
Refactor to avoid code duplication
hsaraogi Nov 22, 2017
c9ebe6e
Cleanup
hsaraogi Nov 22, 2017
af9d67c
cr comments
hsaraogi Nov 23, 2017
6f483d9
Small read/write after a rate-limited read/write
hsaraogi Nov 23, 2017
dd18730
annoying no new linw at eof
hsaraogi Nov 23, 2017
be0759c
Uniform parameters for hard limiting
hsaraogi Nov 23, 2017
0bd0177
Don't consume any estimated bytes for a _transaction or metadata tabl…
hsaraogi Nov 24, 2017
4b2cf92
Add tests
hsaraogi Nov 24, 2017
7ffd6d2
Merge branch 'feature/qos-service-api' into fix/qos-system-table-rate…
hsaraogi Nov 24, 2017
04b6419
cr comments
hsaraogi Nov 24, 2017
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import java.nio.ByteBuffer;
import java.util.List;
import java.util.Map;
import java.util.function.Function;

import org.apache.cassandra.thrift.CASResult;
import org.apache.cassandra.thrift.Cassandra;
Expand All @@ -43,12 +44,16 @@
import com.palantir.atlasdb.keyvalue.api.TableReference;
import com.palantir.atlasdb.keyvalue.cassandra.CassandraClient;
import com.palantir.atlasdb.keyvalue.cassandra.CqlQuery;
import com.palantir.atlasdb.keyvalue.cassandra.HiddenTables;
import com.palantir.atlasdb.qos.QosClient;
import com.palantir.atlasdb.transaction.impl.TransactionConstants;

@SuppressWarnings({"all"}) // thrift variable names.
public class QosCassandraClient implements CassandraClient {

private static final Logger log = LoggerFactory.getLogger(CassandraClient.class);
private static final Function<TableReference, Boolean> ZERO_ESTIMATE_DETERMINING_FUNCTION = tRef ->
tRef.equals(TransactionConstants.TRANSACTION_TABLE) || new HiddenTables().isHidden(tRef);

private final CassandraClient client;
private final QosClient qosClient;
Expand All @@ -69,7 +74,7 @@ public Map<ByteBuffer, List<ColumnOrSuperColumn>> multiget_slice(String kvsMetho
throws InvalidRequestException, UnavailableException, TimedOutException, TException {
return qosClient.executeRead(
() -> client.multiget_slice(kvsMethodName, tableRef, keys, predicate, consistency_level),
ThriftQueryWeighers.multigetSlice(keys));
ThriftQueryWeighers.multigetSlice(keys, ZERO_ESTIMATE_DETERMINING_FUNCTION.apply(tableRef)));
}

@Override
Expand All @@ -78,7 +83,7 @@ public List<KeySlice> get_range_slices(String kvsMethodName, TableReference tabl
throws InvalidRequestException, UnavailableException, TimedOutException, TException {
return qosClient.executeRead(
() -> client.get_range_slices(kvsMethodName, tableRef, predicate, range, consistency_level),
ThriftQueryWeighers.getRangeSlices(range));
ThriftQueryWeighers.getRangeSlices(range, ZERO_ESTIMATE_DETERMINING_FUNCTION.apply(tableRef)));
}

@Override
Expand All @@ -99,7 +104,7 @@ public ColumnOrSuperColumn get(TableReference tableReference, ByteBuffer key, by
throws InvalidRequestException, NotFoundException, UnavailableException, TimedOutException, TException {
return qosClient.executeRead(
() -> client.get(tableReference, key, column, consistency_level),
ThriftQueryWeighers.GET);
ThriftQueryWeighers.get(ZERO_ESTIMATE_DETERMINING_FUNCTION.apply(tableReference)));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,31 +50,79 @@ public final class ThriftQueryWeighers {
.timeTakenNanos(TimeUnit.MILLISECONDS.toNanos(2))
.build();

static final QueryWeight ZERO_ESTIMATED_WEIGHT = ImmutableQueryWeight.builder()
.numBytes(0)
.numDistinctRows(1)
.timeTakenNanos(TimeUnit.MILLISECONDS.toNanos(1))
.build();

private ThriftQueryWeighers() { }

static QosClient.QueryWeigher<Map<ByteBuffer, List<ColumnOrSuperColumn>>> multigetSlice(List<ByteBuffer> keys) {
return readWeigher(ThriftObjectSizeUtils::getApproximateSizeOfColsByKey, Map::size, keys.size());
static QosClient.QueryWeigher<Map<ByteBuffer, List<ColumnOrSuperColumn>>> multigetSlice(List<ByteBuffer> keys,
boolean zeroEstimate) {
return zeroEstimate
? readWeigherWithZeroEstimate(ThriftObjectSizeUtils::getApproximateSizeOfColsByKey, Map::size,
keys.size())
: readWeigher(ThriftObjectSizeUtils::getApproximateSizeOfColsByKey, Map::size, keys.size());
}

static QosClient.QueryWeigher<List<KeySlice>> getRangeSlices(KeyRange keyRange) {
return readWeigher(ThriftObjectSizeUtils::getApproximateSizeOfKeySlices, List::size, keyRange.count);
static QosClient.QueryWeigher<List<KeySlice>> getRangeSlices(KeyRange keyRange, boolean zeroEstimate) {
return zeroEstimate
? readWeigherWithZeroEstimate(ThriftObjectSizeUtils::getApproximateSizeOfKeySlices, List::size,
keyRange.count)
: readWeigher(ThriftObjectSizeUtils::getApproximateSizeOfKeySlices, List::size, keyRange.count);
}

static final QosClient.QueryWeigher<ColumnOrSuperColumn> GET =
readWeigher(ThriftObjectSizeUtils::getColumnOrSuperColumnSize, ignored -> 1, 1);
static QosClient.QueryWeigher<ColumnOrSuperColumn> get(boolean zeroEstimate) {
return zeroEstimate
? readWeigherWithZeroEstimate(ThriftObjectSizeUtils::getColumnOrSuperColumnSize, ignored -> 1, 1)
: readWeigher(ThriftObjectSizeUtils::getColumnOrSuperColumnSize, ignored -> 1, 1);
}

static final QosClient.QueryWeigher<CqlResult> EXECUTE_CQL3_QUERY =
// TODO(nziebart): we need to inspect the schema to see how many rows there are - a CQL row is NOT a
// partition. rows here will depend on the type of query executed in CqlExecutor: either (column, ts) pairs,
// or (key, column, ts) triplets
// Currently, transaction or metadata table queries dont use the CQL executor,
// but we should provide a way to estimate zero based on the tableRef if they do start using it.
readWeigher(ThriftObjectSizeUtils::getCqlResultSize, ignored -> 1, 1);

static QosClient.QueryWeigher<Void> batchMutate(Map<ByteBuffer, Map<String, List<Mutation>>> mutationMap) {
long numRows = mutationMap.size();
return writeWeigher(numRows, () -> ThriftObjectSizeUtils.getApproximateSizeOfMutationMap(mutationMap));
}

private static <T> QosClient.QueryWeigher<T> readWeigher(Function<T, Long> bytesRead, Function<T, Integer> numRows,
private static <T> QosClient.QueryWeigher<T> readWeigherWithZeroEstimate(Function<T, Long> bytesRead,
Function<T, Integer> numRows,
int numberOfQueriedRows) {
return new QosClient.QueryWeigher<T>() {
@Override
public QueryWeight estimate() {
return ZERO_ESTIMATED_WEIGHT;
}

@Override
public QueryWeight weighSuccess(T result, long timeTakenNanos) {
return ImmutableQueryWeight.builder()
.numBytes(safeGetNumBytesOrDefault(() -> bytesRead.apply(result)))
.timeTakenNanos(timeTakenNanos)
.numDistinctRows(numRows.apply(result))
.build();
}

@Override
public QueryWeight weighFailure(Exception error, long timeTakenNanos) {
return ImmutableQueryWeight.builder()
.from(DEFAULT_ESTIMATED_WEIGHT)
.numBytes(ESTIMATED_NUM_BYTES_PER_ROW * numberOfQueriedRows)
.timeTakenNanos(timeTakenNanos)
.build();
}
};
}

private static <T> QosClient.QueryWeigher<T> readWeigher(Function<T, Long> bytesRead,
Function<T, Integer> numRows,
int numberOfQueriedRows) {
return new QosClient.QueryWeigher<T>() {
@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import java.util.List;
import java.util.Map;

import org.apache.cassandra.thrift.Column;
import org.apache.cassandra.thrift.ColumnOrSuperColumn;
import org.apache.cassandra.thrift.CqlResult;
import org.apache.cassandra.thrift.KeyRange;
Expand All @@ -41,7 +40,6 @@ public class ThriftQueryWeighersTest {
private static final ByteBuffer BYTES1 = ByteBuffer.allocate(3);
private static final ByteBuffer BYTES2 = ByteBuffer.allocate(7);
private static final ColumnOrSuperColumn COLUMN_OR_SUPER = new ColumnOrSuperColumn();
private static final Column COLUMN = new Column();
private static final KeySlice KEY_SLICE = new KeySlice();
private static final Mutation MUTATION = new Mutation();

Expand All @@ -54,21 +52,33 @@ public class ThriftQueryWeighersTest {

@Test
public void multigetSliceWeigherEstimatesNumberOfBytesBasedOnNumberOfRows() {
long numBytesWithOneRow = ThriftQueryWeighers.multigetSlice(ImmutableList.of(BYTES1)).estimate().numBytes();
long numBytesWithTwoRows = ThriftQueryWeighers.multigetSlice(ImmutableList.of(BYTES1, BYTES2)).estimate()
.numBytes();
assertThatEstimatesAreCorrect(false,
ThriftQueryWeighers.multigetSlice(ImmutableList.of(BYTES1), false),
ThriftQueryWeighers.multigetSlice(ImmutableList.of(BYTES1, BYTES2), false));
}

assertThat(numBytesWithOneRow).isGreaterThan(0L);
assertThat(numBytesWithTwoRows).isEqualTo(numBytesWithOneRow * 2);
@Test
public void multigetSliceWithZeroEstimateWeigherEstimatesZeroNumberOfBytes() {
assertThatEstimatesAreCorrect(true,
ThriftQueryWeighers.multigetSlice(ImmutableList.of(BYTES1), true),
ThriftQueryWeighers.multigetSlice(ImmutableList.of(BYTES1, BYTES2), true));
}

@Test
public void multigetSliceWeigherReturnsCorrectNumRows() {
assertThatWeightSuccessReturnsCorrectNumberOfRows(
ThriftQueryWeighers.multigetSlice(ImmutableList.of(BYTES1), false));
assertThatWeightSuccessReturnsCorrectNumberOfRows(
ThriftQueryWeighers.multigetSlice(ImmutableList.of(BYTES1), true));
}

private void assertThatWeightSuccessReturnsCorrectNumberOfRows(
QosClient.QueryWeigher<Map<ByteBuffer, List<ColumnOrSuperColumn>>> mapQueryWeigher) {
Map<ByteBuffer, List<ColumnOrSuperColumn>> result = ImmutableMap.of(
BYTES1, ImmutableList.of(COLUMN_OR_SUPER, COLUMN_OR_SUPER),
BYTES2, ImmutableList.of(COLUMN_OR_SUPER));

long actualNumRows = ThriftQueryWeighers.multigetSlice(ImmutableList.of(BYTES1))
long actualNumRows = mapQueryWeigher
.weighSuccess(result, TIME_TAKEN)
.numDistinctRows();

Expand All @@ -77,26 +87,58 @@ public void multigetSliceWeigherReturnsCorrectNumRows() {

@Test
public void rangeSlicesWeigherEstimatesNumberOfBytesBasedOnNumberOfRows() {
long numBytesWithOneRow = ThriftQueryWeighers.getRangeSlices(new KeyRange(1)).estimate().numBytes();
long numBytesWithTwoRows = ThriftQueryWeighers.getRangeSlices(new KeyRange(2)).estimate().numBytes();
assertThatEstimatesAreCorrect(false,
ThriftQueryWeighers.getRangeSlices(new KeyRange(1), false),
ThriftQueryWeighers.getRangeSlices(new KeyRange(2), false));
}

@Test
public void rangeSlicesWeigherWithZeroEstimateEstimatesZeroNumberOfBytes() {
assertThatEstimatesAreCorrect(true,
ThriftQueryWeighers.getRangeSlices(new KeyRange(1), true),
ThriftQueryWeighers.getRangeSlices(new KeyRange(2), true));
}

assertThat(numBytesWithOneRow).isGreaterThan(0L);
assertThat(numBytesWithTwoRows).isEqualTo(numBytesWithOneRow * 2);
private void assertThatEstimatesAreCorrect(boolean zeroEstimate,
QosClient.QueryWeigher queryWeigherOneRow,
QosClient.QueryWeigher queryWeigherTwoRows) {
long numBytesWithOneRow = queryWeigherOneRow.estimate().numBytes();
long numBytesWithTwoRows = queryWeigherTwoRows.estimate().numBytes();

if (zeroEstimate) {
assertThat(numBytesWithOneRow).isEqualTo(0L);
} else {
assertThat(numBytesWithOneRow).isGreaterThan(0L);
}
assertThat(numBytesWithTwoRows).isEqualTo(zeroEstimate ? 0L : numBytesWithOneRow * 2);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: this block looks similar to the one at the end of assertThatEstimatesAreCorrect - we might want to extract a method?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

}

@Test
public void rangeSlicesWeigherReturnsCorrectNumRows() {
assertThatRangeSlicesWeigherReturnsCorrectNumRows(ThriftQueryWeighers.getRangeSlices(new KeyRange(1), false));
assertThatRangeSlicesWeigherReturnsCorrectNumRows(ThriftQueryWeighers.getRangeSlices(new KeyRange(1), true));
}

private void assertThatRangeSlicesWeigherReturnsCorrectNumRows(
QosClient.QueryWeigher<List<KeySlice>> weigher) {
List<KeySlice> result = ImmutableList.of(KEY_SLICE, KEY_SLICE, KEY_SLICE);

long actualNumRows = ThriftQueryWeighers.getRangeSlices(new KeyRange(1)).weighSuccess(result, TIME_TAKEN)
long actualNumRows = weigher
.weighSuccess(result, TIME_TAKEN)
.numDistinctRows();

assertThat(actualNumRows).isEqualTo(3);
}

@Test
public void getWeigherReturnsCorrectNumRows() {
long actualNumRows = ThriftQueryWeighers.GET.weighSuccess(COLUMN_OR_SUPER, TIME_TAKEN).numDistinctRows();
assertThatGetWeighSuccessReturnsOneRow(ThriftQueryWeighers.get(false));
assertThatGetWeighSuccessReturnsOneRow(ThriftQueryWeighers.get(true));
}

private void assertThatGetWeighSuccessReturnsOneRow(
QosClient.QueryWeigher<ColumnOrSuperColumn> weighQuery) {
long actualNumRows = weighQuery.weighSuccess(COLUMN_OR_SUPER, TIME_TAKEN).numDistinctRows();

assertThat(actualNumRows).isEqualTo(1);
}
Expand Down Expand Up @@ -126,25 +168,20 @@ public void batchMutateWeigherReturnsCorrectNumRows() {

@Test
public void multigetSliceWeigherReturnsDefaultEstimateForFailure() {
QueryWeight weight = ThriftQueryWeighers.multigetSlice(ImmutableList.of(BYTES1))
.weighFailure(new RuntimeException(), TIME_TAKEN);

assertThat(weight).isEqualTo(DEFAULT_WEIGHT);
assertThatWeighFailureReturnsDefaultWeight(ThriftQueryWeighers.multigetSlice(ImmutableList.of(BYTES1), false));
assertThatWeighFailureReturnsDefaultWeight(ThriftQueryWeighers.multigetSlice(ImmutableList.of(BYTES1), true));
}

@Test
public void getWeigherReturnsDefaultEstimateForFailure() {
QueryWeight weight = ThriftQueryWeighers.GET.weighFailure(new RuntimeException(), TIME_TAKEN);

assertThat(weight).isEqualTo(DEFAULT_WEIGHT);
assertThatWeighFailureReturnsDefaultWeight(ThriftQueryWeighers.get(false));
assertThatWeighFailureReturnsDefaultWeight(ThriftQueryWeighers.get(true));
}

@Test
public void getRangeSlicesWeigherReturnsDefaultEstimateForFailure() {
QueryWeight weight = ThriftQueryWeighers.getRangeSlices(new KeyRange(1))
.weighFailure(new RuntimeException(), TIME_TAKEN);

assertThat(weight).isEqualTo(DEFAULT_WEIGHT);
assertThatWeighFailureReturnsDefaultWeight(ThriftQueryWeighers.getRangeSlices(new KeyRange(1), false));
assertThatWeighFailureReturnsDefaultWeight(ThriftQueryWeighers.getRangeSlices(new KeyRange(1), true));
}

@Test
Expand All @@ -158,17 +195,21 @@ public void batchMutateWeigherReturnsEstimateForFailure() {
.from(weigher.estimate())
.timeTakenNanos(TIME_TAKEN)
.build();

QueryWeight actual = weigher.weighFailure(new RuntimeException(), TIME_TAKEN);

assertThat(actual).isEqualTo(expected);
}

@Test
public void cql3QueryWeigherReturnsDefaultEstimateForFailure() {
QueryWeight weight = ThriftQueryWeighers.EXECUTE_CQL3_QUERY.weighFailure(new RuntimeException(),
TIME_TAKEN);
assertThatWeighFailureReturnsDefaultWeight(ThriftQueryWeighers.EXECUTE_CQL3_QUERY);
}

private void assertThatWeighFailureReturnsDefaultWeight(
QosClient.QueryWeigher queryWeigher) {
QueryWeight weight = queryWeigher.weighFailure(new RuntimeException(), TIME_TAKEN);

assertThat(weight).isEqualTo(DEFAULT_WEIGHT);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -78,12 +78,11 @@ public void shouldBeAbleToReadLargeAmountsExceedingTheLimitSecondTimeWithSoftLim

@Test
public void shouldNotBeAbleToReadLargeAmountsIfSoftLimitSleepWillBeMoreThanConfiguredBackoffTime() {
assertThatThrownBy(() -> readOneBatchOfSize(200))
.isInstanceOf(RateLimitExceededException.class)
.hasMessage("Rate limited. Available capacity has been exhausted.");
// Have one quick limit-exceeding write, as the rate-limiter
// will let anything pass through until the limit is exceeded.
assertThat(readOneBatchOfSize(20)).hasSize(20);

// TODO(hsaraogi): This should not happen.
assertThatThrownBy(() -> readOneBatchOfSize(1))
assertThatThrownBy(() -> readOneBatchOfSize(100))
.isInstanceOf(RateLimitExceededException.class)
.hasMessage("Rate limited. Available capacity has been exhausted.");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,8 +73,10 @@ private <T, E extends Exception> T execute(
estimatedWeightMetric.ifPresent(metric -> metric.accept(estimatedWeight));

try {
Duration waitTime = rateLimiter.consumeWithBackoff(estimatedWeight.numBytes());
metrics.recordBackoffMicros(TimeUnit.NANOSECONDS.toMicros(waitTime.toNanos()));
if (estimatedWeight.numBytes() > 0) {
Duration waitTime = rateLimiter.consumeWithBackoff(estimatedWeight.numBytes());
metrics.recordBackoffMicros(TimeUnit.NANOSECONDS.toMicros(waitTime.toNanos()));
}
} catch (RateLimitExceededException ex) {
metrics.recordRateLimitedException();
throw ex;
Expand Down