Skip to content
This repository has been archived by the owner on Nov 14, 2024. It is now read-only.

Commit

Permalink
[QoS] Fix/qos system table rate limiting (#2739)
Browse files Browse the repository at this point in the history
* Fix SweepBatchConfig values to properly decrease to 1 with each failure and increase with each success (#2630)

* Fix SweepBatchConfig values to properly decrease to 1 with each failure and increase with each success

* add logging when we stop reducing the batch size multiplier

* further improve the tests

* Allow sweep to recover faster after backing off.  Before we would increase by 1% for each successive success, if we had reduced a value to 1 it would be 70 iterations before we got 2 and 700 iterations before we got back to 1000.  Now we always 25 iterations with the lower batch size and then try increasing the rate by doubling each time.  This means that when sweep has to back off it should speed up again quickly.

* Use an AtomicInteger to handle concurrent updates

* SweeperService logging improvements (#2618)

* SweeperServiceImpl now logs when it starts sweeping make it clear if it is running full sweep or not

* Added sweep parameters to the log lines

* no longer default the service parameter in the interface, this way we can see when the parameter isn't provided and we are defaulting to true.  Behaviour is unchanged but we can log a message when defaulting.

* Refactor TracingKVS (#2643)

* Wrap next() and hasNext() in traces

* Use span names as safe

* Remove iterator wrappings

* checkstyle

* refactor methods and remove misleading traces

* Fix unit tests

* release notes

* Final nits

* fix java arrays usage

* Delete docs (#2657)

* [20 minute tasks] Add test for when a batch is full (#2655)

* [no release notes] Drive-by add test for when a batch is full

* MetricRegistry log level downgrade + multiple timestamp tracker tests (#2636)

* change metrics manager to warn plus log the metric name

* more timestamp tracker tests

* release notes

* Extract interface for Cassandra client (#2660)

* Create a CassandraClient

* Propagate CassandraClient to all classes but CKVS

* Use CassandraClient on CKVS

* Propagate CassandraClient to remaining Impl classes

* Use CassandraClient in tests

* [no release notes]

* client -> namespace [no release notes] (#2654)

* 0.65.2 and 0.66.0 release notes (#2663)

* Release notes banners

* fix pr numbers

* [QoS] Add getNamespace to AtlasDBConfig (#2661)

* Add getNamespace [no release notes]

* Timelock client config cannot be empty

* Make it explicit that unspecified namespace is only possible for InMemoryKVS

* CR comments

* Live Reloading the TimeLock Block, Part 1: Pull to Push (#2621)

* thoughts

* More tests for RIH

* Paranoid logging

* statics

* javadoc part 1

* polling refreshable

* Unit tests

* Remove the old RIH

* lock lock

* Tests that test how we deal with exceptions

* logging

* [no release notes]

* CR comments part 1

* Make interval configurable

* Standard nasty time edge cases

* lastSeenValue does not need to be volatile

* Live Reloading the TimeLock Block, Part 2: TransactionManagers Plumbing (#2622)

* ServiceCreator.applyDynamic()

* Propagate config through TMs

* Json Serialization fixes

* Some refactoring

* lock/lock

* Fixed checkstyle

* CR comments part 1

* Switch to RPIH

* add test

* [no release notes] forthcoming in part 4

* checkstyle

* [TTT] [no release notes] Document behaviour regarding index rows (#2658)

* [no release notes] Document behaviour regarding index rows

* fix compile bug

* ``List``

* Refactor and Instrument CassandraClient api (#2665)

* Sanitize Client API

* Instrument CassandraClient

* checkstyle

* Address comment

* [no release notes]

* checkstyle

* Fix cas

* Live Reloading the TimeLock Block, Part 3: Working with 0 Nodes (#2647)

* 0 nodes part 1

* add support for 0 servers in a ServerListConfig

* extend deserialization tests

* More tests

* code defensively

* [no release notes] defer to 2648

* Fixed CR nits

* singleton server list

* check immutable ts (#2406)

* check immutable ts

* checkstyle

* release notes

* Fix TM creation

* checkstyle

* Propagate top-level KVS method names to CassandraClient (#2669)

* Propagate method names down to multiget_slice

* Add the corresponding KVS method to remaining methods

* Add TODO

* [no release notes]

* nit

* Extract cql executor interface (#2670)

* Instrument CqlExecutor

* [no release notes]

* bump awaitility (#2668)

* Upgrade to newer Awaitility.

* locks [no release notes]

* unused import

* Bump Atlas on Tritium 0.8.4 to fix dependency conflicts (#2662)

* Bump Atlas on Tritium 0.8.4 to fix dependency conflicts

* Add changes into missing file

* Doc changes

* Exclude Tracing and HdrHistogram from Tritium dependencies

* update locks

* Add excluded dependencies explicitly

* Fix merge conflict in relase notes

* Uncomment dependencies

* Regenerate locks

* Correctly log Paxos events (#2674)

* Log out Paxos values when recording Paxos events

* Updated release notes

* Checkstyle

* Pull request number

* Address comments

* fix docs

* Slow log and tracing (#2673)

* Trace and instrument the thrift client

* Instrument CqlExecutor

* Fix metric names of IntrumentedCassandraClient

* Fix nit

* Also log internal table references

* Checkstyle

* simplify metric names

* Address comments

* add slow logging to the cassandra thrift client

* add slow logging to cqlExecutor

* fix typos

* Add tracing to the CassandraClient

* trace cqlExecutor queries

* Add slow-logging in the CassandraClient

* Delete InstrumentedCC and InstrumentedCqlExec

* Fix small nits

* Checkstyle

* Add kvs method names to slow logs

* Fix wrapping of exception

* Extract CqlQuery

* Move kvs-slow-log and tracing of CqlExecutor to CCI

* Propagate execute_cql3_query api breaks

* checkstyle

* delete unused string

* checkstyle

* fix number of mutations on batch_mutate

* some refactors

* fix compile

* Refactor cassandra client (#2676)

* Extract TracingCassandraClient

Extract ProfilingCassandraClient

Move todos and some cleanup

Cherry-pick QoS metrics to develop (#2679)

* [QoS] Feature/qos meters (#2640)

* Metrics for bytes and counts in each read/write

* Refactors, dont throw if recordMetrics throws

* Use meters instead of histograms

* Multiget bytes

* Batch mutate exact size

* Cqlresult size

* Calculate exact byte sizes for all thrift objects

* tests and bugfixes - partial

* More tests and bugs fixed

* More tests and cr comments

* byte buffer size

* Remove register histogram

* checkstyle

* checkstyle

* locks and license

* Qos metrics CassandraClient

* Exclude unused classes

* fix cherry pick

* use supplier for object size [no release notes]

* fix merge in AtlasDbConfig

* rate limiting

* total-time

* qos config

* respect max backoff itme

* query weights

* extra tests

* num rows

* checkstyle

* fix tests

* no int casting

* Qos ete tests

* shouldFailIfWritingTooManyBytes

* fix test

* rm file

* Remove metrics

* Test shouldFailIfReadingTooManyBytes

* canBeWritingLargeNumberOfBytesConcurrently

* checkstyle

* cannotWriteLargeNumberOfBytesConcurrently

* fix tests

* create tm in test

* More read tests (after writing a lot of data at once)

* WIP

* Tests that should pas

* Actually update the rate

* Add another test

* More tests and address comments

* Dont extend etesetup

* Make dumping data faster

* cleanup

* wip

* Add back lost file

* Cleanup

* Write tests

* numReadsPerThread -> numThreads

* More write tests, cleanup, check style fixes

* Refactor to avoid code duplication

* Cleanup

* cr comments

* Small read/write after a rate-limited read/write

* annoying no new linw at eof

* Uniform parameters for hard limiting

* Don't consume any estimated bytes for a _transaction or metadata table query

* Add tests

* cr comments
  • Loading branch information
hsaraogi authored Nov 24, 2017
1 parent d2d7b18 commit be45487
Show file tree
Hide file tree
Showing 5 changed files with 140 additions and 45 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import java.nio.ByteBuffer;
import java.util.List;
import java.util.Map;
import java.util.function.Function;

import org.apache.cassandra.thrift.CASResult;
import org.apache.cassandra.thrift.Cassandra;
Expand All @@ -43,12 +44,16 @@
import com.palantir.atlasdb.keyvalue.api.TableReference;
import com.palantir.atlasdb.keyvalue.cassandra.CassandraClient;
import com.palantir.atlasdb.keyvalue.cassandra.CqlQuery;
import com.palantir.atlasdb.keyvalue.cassandra.HiddenTables;
import com.palantir.atlasdb.qos.QosClient;
import com.palantir.atlasdb.transaction.impl.TransactionConstants;

@SuppressWarnings({"all"}) // thrift variable names.
public class QosCassandraClient implements CassandraClient {

private static final Logger log = LoggerFactory.getLogger(CassandraClient.class);
private static final Function<TableReference, Boolean> ZERO_ESTIMATE_DETERMINING_FUNCTION = tRef ->
tRef.equals(TransactionConstants.TRANSACTION_TABLE) || new HiddenTables().isHidden(tRef);

private final CassandraClient client;
private final QosClient qosClient;
Expand All @@ -69,7 +74,7 @@ public Map<ByteBuffer, List<ColumnOrSuperColumn>> multiget_slice(String kvsMetho
throws InvalidRequestException, UnavailableException, TimedOutException, TException {
return qosClient.executeRead(
() -> client.multiget_slice(kvsMethodName, tableRef, keys, predicate, consistency_level),
ThriftQueryWeighers.multigetSlice(keys));
ThriftQueryWeighers.multigetSlice(keys, ZERO_ESTIMATE_DETERMINING_FUNCTION.apply(tableRef)));
}

@Override
Expand All @@ -78,7 +83,7 @@ public List<KeySlice> get_range_slices(String kvsMethodName, TableReference tabl
throws InvalidRequestException, UnavailableException, TimedOutException, TException {
return qosClient.executeRead(
() -> client.get_range_slices(kvsMethodName, tableRef, predicate, range, consistency_level),
ThriftQueryWeighers.getRangeSlices(range));
ThriftQueryWeighers.getRangeSlices(range, ZERO_ESTIMATE_DETERMINING_FUNCTION.apply(tableRef)));
}

@Override
Expand All @@ -99,7 +104,7 @@ public ColumnOrSuperColumn get(TableReference tableReference, ByteBuffer key, by
throws InvalidRequestException, NotFoundException, UnavailableException, TimedOutException, TException {
return qosClient.executeRead(
() -> client.get(tableReference, key, column, consistency_level),
ThriftQueryWeighers.GET);
ThriftQueryWeighers.get(ZERO_ESTIMATE_DETERMINING_FUNCTION.apply(tableReference)));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,31 +50,79 @@ public final class ThriftQueryWeighers {
.timeTakenNanos(TimeUnit.MILLISECONDS.toNanos(2))
.build();

static final QueryWeight ZERO_ESTIMATED_WEIGHT = ImmutableQueryWeight.builder()
.numBytes(0)
.numDistinctRows(1)
.timeTakenNanos(TimeUnit.MILLISECONDS.toNanos(1))
.build();

private ThriftQueryWeighers() { }

static QosClient.QueryWeigher<Map<ByteBuffer, List<ColumnOrSuperColumn>>> multigetSlice(List<ByteBuffer> keys) {
return readWeigher(ThriftObjectSizeUtils::getApproximateSizeOfColsByKey, Map::size, keys.size());
static QosClient.QueryWeigher<Map<ByteBuffer, List<ColumnOrSuperColumn>>> multigetSlice(List<ByteBuffer> keys,
boolean zeroEstimate) {
return zeroEstimate
? readWeigherWithZeroEstimate(ThriftObjectSizeUtils::getApproximateSizeOfColsByKey, Map::size,
keys.size())
: readWeigher(ThriftObjectSizeUtils::getApproximateSizeOfColsByKey, Map::size, keys.size());
}

static QosClient.QueryWeigher<List<KeySlice>> getRangeSlices(KeyRange keyRange) {
return readWeigher(ThriftObjectSizeUtils::getApproximateSizeOfKeySlices, List::size, keyRange.count);
static QosClient.QueryWeigher<List<KeySlice>> getRangeSlices(KeyRange keyRange, boolean zeroEstimate) {
return zeroEstimate
? readWeigherWithZeroEstimate(ThriftObjectSizeUtils::getApproximateSizeOfKeySlices, List::size,
keyRange.count)
: readWeigher(ThriftObjectSizeUtils::getApproximateSizeOfKeySlices, List::size, keyRange.count);
}

static final QosClient.QueryWeigher<ColumnOrSuperColumn> GET =
readWeigher(ThriftObjectSizeUtils::getColumnOrSuperColumnSize, ignored -> 1, 1);
static QosClient.QueryWeigher<ColumnOrSuperColumn> get(boolean zeroEstimate) {
return zeroEstimate
? readWeigherWithZeroEstimate(ThriftObjectSizeUtils::getColumnOrSuperColumnSize, ignored -> 1, 1)
: readWeigher(ThriftObjectSizeUtils::getColumnOrSuperColumnSize, ignored -> 1, 1);
}

static final QosClient.QueryWeigher<CqlResult> EXECUTE_CQL3_QUERY =
// TODO(nziebart): we need to inspect the schema to see how many rows there are - a CQL row is NOT a
// partition. rows here will depend on the type of query executed in CqlExecutor: either (column, ts) pairs,
// or (key, column, ts) triplets
// Currently, transaction or metadata table queries dont use the CQL executor,
// but we should provide a way to estimate zero based on the tableRef if they do start using it.
readWeigher(ThriftObjectSizeUtils::getCqlResultSize, ignored -> 1, 1);

static QosClient.QueryWeigher<Void> batchMutate(Map<ByteBuffer, Map<String, List<Mutation>>> mutationMap) {
long numRows = mutationMap.size();
return writeWeigher(numRows, () -> ThriftObjectSizeUtils.getApproximateSizeOfMutationMap(mutationMap));
}

private static <T> QosClient.QueryWeigher<T> readWeigher(Function<T, Long> bytesRead, Function<T, Integer> numRows,
private static <T> QosClient.QueryWeigher<T> readWeigherWithZeroEstimate(Function<T, Long> bytesRead,
Function<T, Integer> numRows,
int numberOfQueriedRows) {
return new QosClient.QueryWeigher<T>() {
@Override
public QueryWeight estimate() {
return ZERO_ESTIMATED_WEIGHT;
}

@Override
public QueryWeight weighSuccess(T result, long timeTakenNanos) {
return ImmutableQueryWeight.builder()
.numBytes(safeGetNumBytesOrDefault(() -> bytesRead.apply(result)))
.timeTakenNanos(timeTakenNanos)
.numDistinctRows(numRows.apply(result))
.build();
}

@Override
public QueryWeight weighFailure(Exception error, long timeTakenNanos) {
return ImmutableQueryWeight.builder()
.from(DEFAULT_ESTIMATED_WEIGHT)
.numBytes(ESTIMATED_NUM_BYTES_PER_ROW * numberOfQueriedRows)
.timeTakenNanos(timeTakenNanos)
.build();
}
};
}

private static <T> QosClient.QueryWeigher<T> readWeigher(Function<T, Long> bytesRead,
Function<T, Integer> numRows,
int numberOfQueriedRows) {
return new QosClient.QueryWeigher<T>() {
@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import java.util.List;
import java.util.Map;

import org.apache.cassandra.thrift.Column;
import org.apache.cassandra.thrift.ColumnOrSuperColumn;
import org.apache.cassandra.thrift.CqlResult;
import org.apache.cassandra.thrift.KeyRange;
Expand All @@ -41,7 +40,6 @@ public class ThriftQueryWeighersTest {
private static final ByteBuffer BYTES1 = ByteBuffer.allocate(3);
private static final ByteBuffer BYTES2 = ByteBuffer.allocate(7);
private static final ColumnOrSuperColumn COLUMN_OR_SUPER = new ColumnOrSuperColumn();
private static final Column COLUMN = new Column();
private static final KeySlice KEY_SLICE = new KeySlice();
private static final Mutation MUTATION = new Mutation();

Expand All @@ -54,21 +52,33 @@ public class ThriftQueryWeighersTest {

@Test
public void multigetSliceWeigherEstimatesNumberOfBytesBasedOnNumberOfRows() {
long numBytesWithOneRow = ThriftQueryWeighers.multigetSlice(ImmutableList.of(BYTES1)).estimate().numBytes();
long numBytesWithTwoRows = ThriftQueryWeighers.multigetSlice(ImmutableList.of(BYTES1, BYTES2)).estimate()
.numBytes();
assertThatEstimatesAreCorrect(false,
ThriftQueryWeighers.multigetSlice(ImmutableList.of(BYTES1), false),
ThriftQueryWeighers.multigetSlice(ImmutableList.of(BYTES1, BYTES2), false));
}

assertThat(numBytesWithOneRow).isGreaterThan(0L);
assertThat(numBytesWithTwoRows).isEqualTo(numBytesWithOneRow * 2);
@Test
public void multigetSliceWithZeroEstimateWeigherEstimatesZeroNumberOfBytes() {
assertThatEstimatesAreCorrect(true,
ThriftQueryWeighers.multigetSlice(ImmutableList.of(BYTES1), true),
ThriftQueryWeighers.multigetSlice(ImmutableList.of(BYTES1, BYTES2), true));
}

@Test
public void multigetSliceWeigherReturnsCorrectNumRows() {
assertThatWeightSuccessReturnsCorrectNumberOfRows(
ThriftQueryWeighers.multigetSlice(ImmutableList.of(BYTES1), false));
assertThatWeightSuccessReturnsCorrectNumberOfRows(
ThriftQueryWeighers.multigetSlice(ImmutableList.of(BYTES1), true));
}

private void assertThatWeightSuccessReturnsCorrectNumberOfRows(
QosClient.QueryWeigher<Map<ByteBuffer, List<ColumnOrSuperColumn>>> mapQueryWeigher) {
Map<ByteBuffer, List<ColumnOrSuperColumn>> result = ImmutableMap.of(
BYTES1, ImmutableList.of(COLUMN_OR_SUPER, COLUMN_OR_SUPER),
BYTES2, ImmutableList.of(COLUMN_OR_SUPER));

long actualNumRows = ThriftQueryWeighers.multigetSlice(ImmutableList.of(BYTES1))
long actualNumRows = mapQueryWeigher
.weighSuccess(result, TIME_TAKEN)
.numDistinctRows();

Expand All @@ -77,26 +87,58 @@ public void multigetSliceWeigherReturnsCorrectNumRows() {

@Test
public void rangeSlicesWeigherEstimatesNumberOfBytesBasedOnNumberOfRows() {
long numBytesWithOneRow = ThriftQueryWeighers.getRangeSlices(new KeyRange(1)).estimate().numBytes();
long numBytesWithTwoRows = ThriftQueryWeighers.getRangeSlices(new KeyRange(2)).estimate().numBytes();
assertThatEstimatesAreCorrect(false,
ThriftQueryWeighers.getRangeSlices(new KeyRange(1), false),
ThriftQueryWeighers.getRangeSlices(new KeyRange(2), false));
}

@Test
public void rangeSlicesWeigherWithZeroEstimateEstimatesZeroNumberOfBytes() {
assertThatEstimatesAreCorrect(true,
ThriftQueryWeighers.getRangeSlices(new KeyRange(1), true),
ThriftQueryWeighers.getRangeSlices(new KeyRange(2), true));
}

assertThat(numBytesWithOneRow).isGreaterThan(0L);
assertThat(numBytesWithTwoRows).isEqualTo(numBytesWithOneRow * 2);
private void assertThatEstimatesAreCorrect(boolean zeroEstimate,
QosClient.QueryWeigher queryWeigherOneRow,
QosClient.QueryWeigher queryWeigherTwoRows) {
long numBytesWithOneRow = queryWeigherOneRow.estimate().numBytes();
long numBytesWithTwoRows = queryWeigherTwoRows.estimate().numBytes();

if (zeroEstimate) {
assertThat(numBytesWithOneRow).isEqualTo(0L);
} else {
assertThat(numBytesWithOneRow).isGreaterThan(0L);
}
assertThat(numBytesWithTwoRows).isEqualTo(zeroEstimate ? 0L : numBytesWithOneRow * 2);
}

@Test
public void rangeSlicesWeigherReturnsCorrectNumRows() {
assertThatRangeSlicesWeigherReturnsCorrectNumRows(ThriftQueryWeighers.getRangeSlices(new KeyRange(1), false));
assertThatRangeSlicesWeigherReturnsCorrectNumRows(ThriftQueryWeighers.getRangeSlices(new KeyRange(1), true));
}

private void assertThatRangeSlicesWeigherReturnsCorrectNumRows(
QosClient.QueryWeigher<List<KeySlice>> weigher) {
List<KeySlice> result = ImmutableList.of(KEY_SLICE, KEY_SLICE, KEY_SLICE);

long actualNumRows = ThriftQueryWeighers.getRangeSlices(new KeyRange(1)).weighSuccess(result, TIME_TAKEN)
long actualNumRows = weigher
.weighSuccess(result, TIME_TAKEN)
.numDistinctRows();

assertThat(actualNumRows).isEqualTo(3);
}

@Test
public void getWeigherReturnsCorrectNumRows() {
long actualNumRows = ThriftQueryWeighers.GET.weighSuccess(COLUMN_OR_SUPER, TIME_TAKEN).numDistinctRows();
assertThatGetWeighSuccessReturnsOneRow(ThriftQueryWeighers.get(false));
assertThatGetWeighSuccessReturnsOneRow(ThriftQueryWeighers.get(true));
}

private void assertThatGetWeighSuccessReturnsOneRow(
QosClient.QueryWeigher<ColumnOrSuperColumn> weighQuery) {
long actualNumRows = weighQuery.weighSuccess(COLUMN_OR_SUPER, TIME_TAKEN).numDistinctRows();

assertThat(actualNumRows).isEqualTo(1);
}
Expand Down Expand Up @@ -126,25 +168,20 @@ public void batchMutateWeigherReturnsCorrectNumRows() {

@Test
public void multigetSliceWeigherReturnsDefaultEstimateForFailure() {
QueryWeight weight = ThriftQueryWeighers.multigetSlice(ImmutableList.of(BYTES1))
.weighFailure(new RuntimeException(), TIME_TAKEN);

assertThat(weight).isEqualTo(DEFAULT_WEIGHT);
assertThatWeighFailureReturnsDefaultWeight(ThriftQueryWeighers.multigetSlice(ImmutableList.of(BYTES1), false));
assertThatWeighFailureReturnsDefaultWeight(ThriftQueryWeighers.multigetSlice(ImmutableList.of(BYTES1), true));
}

@Test
public void getWeigherReturnsDefaultEstimateForFailure() {
QueryWeight weight = ThriftQueryWeighers.GET.weighFailure(new RuntimeException(), TIME_TAKEN);

assertThat(weight).isEqualTo(DEFAULT_WEIGHT);
assertThatWeighFailureReturnsDefaultWeight(ThriftQueryWeighers.get(false));
assertThatWeighFailureReturnsDefaultWeight(ThriftQueryWeighers.get(true));
}

@Test
public void getRangeSlicesWeigherReturnsDefaultEstimateForFailure() {
QueryWeight weight = ThriftQueryWeighers.getRangeSlices(new KeyRange(1))
.weighFailure(new RuntimeException(), TIME_TAKEN);

assertThat(weight).isEqualTo(DEFAULT_WEIGHT);
assertThatWeighFailureReturnsDefaultWeight(ThriftQueryWeighers.getRangeSlices(new KeyRange(1), false));
assertThatWeighFailureReturnsDefaultWeight(ThriftQueryWeighers.getRangeSlices(new KeyRange(1), true));
}

@Test
Expand All @@ -158,17 +195,21 @@ public void batchMutateWeigherReturnsEstimateForFailure() {
.from(weigher.estimate())
.timeTakenNanos(TIME_TAKEN)
.build();

QueryWeight actual = weigher.weighFailure(new RuntimeException(), TIME_TAKEN);

assertThat(actual).isEqualTo(expected);
}

@Test
public void cql3QueryWeigherReturnsDefaultEstimateForFailure() {
QueryWeight weight = ThriftQueryWeighers.EXECUTE_CQL3_QUERY.weighFailure(new RuntimeException(),
TIME_TAKEN);
assertThatWeighFailureReturnsDefaultWeight(ThriftQueryWeighers.EXECUTE_CQL3_QUERY);
}

private void assertThatWeighFailureReturnsDefaultWeight(
QosClient.QueryWeigher queryWeigher) {
QueryWeight weight = queryWeigher.weighFailure(new RuntimeException(), TIME_TAKEN);

assertThat(weight).isEqualTo(DEFAULT_WEIGHT);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -78,12 +78,11 @@ public void shouldBeAbleToReadLargeAmountsExceedingTheLimitSecondTimeWithSoftLim

@Test
public void shouldNotBeAbleToReadLargeAmountsIfSoftLimitSleepWillBeMoreThanConfiguredBackoffTime() {
assertThatThrownBy(() -> readOneBatchOfSize(200))
.isInstanceOf(RateLimitExceededException.class)
.hasMessage("Rate limited. Available capacity has been exhausted.");
// Have one quick limit-exceeding write, as the rate-limiter
// will let anything pass through until the limit is exceeded.
assertThat(readOneBatchOfSize(20)).hasSize(20);

// TODO(hsaraogi): This should not happen.
assertThatThrownBy(() -> readOneBatchOfSize(1))
assertThatThrownBy(() -> readOneBatchOfSize(100))
.isInstanceOf(RateLimitExceededException.class)
.hasMessage("Rate limited. Available capacity has been exhausted.");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,8 +73,10 @@ private <T, E extends Exception> T execute(
estimatedWeightMetric.ifPresent(metric -> metric.accept(estimatedWeight));

try {
Duration waitTime = rateLimiter.consumeWithBackoff(estimatedWeight.numBytes());
metrics.recordBackoffMicros(TimeUnit.NANOSECONDS.toMicros(waitTime.toNanos()));
if (estimatedWeight.numBytes() > 0) {
Duration waitTime = rateLimiter.consumeWithBackoff(estimatedWeight.numBytes());
metrics.recordBackoffMicros(TimeUnit.NANOSECONDS.toMicros(waitTime.toNanos()));
}
} catch (RateLimitExceededException ex) {
metrics.recordRateLimitedException();
throw ex;
Expand Down

0 comments on commit be45487

Please sign in to comment.