diff --git a/atlasdb-cassandra/src/main/java/com/palantir/atlasdb/keyvalue/cassandra/CassandraClientFactory.java b/atlasdb-cassandra/src/main/java/com/palantir/atlasdb/keyvalue/cassandra/CassandraClientFactory.java index 3d8a3a5e7d8..a29facdd13a 100644 --- a/atlasdb-cassandra/src/main/java/com/palantir/atlasdb/keyvalue/cassandra/CassandraClientFactory.java +++ b/atlasdb-cassandra/src/main/java/com/palantir/atlasdb/keyvalue/cassandra/CassandraClientFactory.java @@ -46,6 +46,7 @@ import com.google.common.collect.Maps; import com.palantir.atlasdb.cassandra.CassandraCredentialsConfig; import com.palantir.atlasdb.cassandra.CassandraKeyValueServiceConfig; +import com.palantir.atlasdb.keyvalue.cassandra.qos.QosCassandraClient; import com.palantir.atlasdb.qos.QosClient; import com.palantir.atlasdb.util.AtlasDbMetrics; import com.palantir.common.exception.AtlasDbDependencyException; diff --git a/atlasdb-cassandra/src/main/java/com/palantir/atlasdb/keyvalue/cassandra/QosCassandraClient.java b/atlasdb-cassandra/src/main/java/com/palantir/atlasdb/keyvalue/cassandra/qos/QosCassandraClient.java similarity index 70% rename from atlasdb-cassandra/src/main/java/com/palantir/atlasdb/keyvalue/cassandra/QosCassandraClient.java rename to atlasdb-cassandra/src/main/java/com/palantir/atlasdb/keyvalue/cassandra/qos/QosCassandraClient.java index 18d60ca8f13..be0a912be7f 100644 --- a/atlasdb-cassandra/src/main/java/com/palantir/atlasdb/keyvalue/cassandra/QosCassandraClient.java +++ b/atlasdb-cassandra/src/main/java/com/palantir/atlasdb/keyvalue/cassandra/qos/QosCassandraClient.java @@ -14,13 +14,11 @@ * limitations under the License. */ -package com.palantir.atlasdb.keyvalue.cassandra; +package com.palantir.atlasdb.keyvalue.cassandra.qos; import java.nio.ByteBuffer; -import java.util.Collection; import java.util.List; import java.util.Map; -import java.util.function.Function; import org.apache.cassandra.thrift.CASResult; import org.apache.cassandra.thrift.Cassandra; @@ -43,13 +41,13 @@ import org.slf4j.LoggerFactory; import com.palantir.atlasdb.keyvalue.api.TableReference; +import com.palantir.atlasdb.keyvalue.cassandra.CassandraClient; +import com.palantir.atlasdb.keyvalue.cassandra.CqlQuery; import com.palantir.atlasdb.qos.QosClient; @SuppressWarnings({"all"}) // thrift variable names. public class QosCassandraClient implements CassandraClient { - private static final int DEFAULT_ESTIMATED_READ_BYTES = 100; - private static final Logger log = LoggerFactory.getLogger(CassandraClient.class); private final CassandraClient client; @@ -70,17 +68,8 @@ public Map> multiget_slice(String kvsMetho List keys, SlicePredicate predicate, ConsistencyLevel consistency_level) throws InvalidRequestException, UnavailableException, TimedOutException, TException { return qosClient.executeRead( - () -> DEFAULT_ESTIMATED_READ_BYTES, - () -> client.multiget_slice(kvsMethodName, tableRef, keys, - predicate, consistency_level), - this::getApproximateReadByteCount); - } - - private int getApproximateReadByteCount(Map> result) { - return getCollectionSize(result.entrySet(), - rowResult -> ThriftObjectSizeUtils.getByteBufferSize(rowResult.getKey()) - + getCollectionSize(rowResult.getValue(), - ThriftObjectSizeUtils::getColumnOrSuperColumnSize)); + () -> client.multiget_slice(kvsMethodName, tableRef, keys, predicate, consistency_level), + ThriftQueryWeighers.MULTIGET_SLICE); } @Override @@ -88,9 +77,8 @@ public List get_range_slices(String kvsMethodName, TableReference tabl KeyRange range, ConsistencyLevel consistency_level) throws InvalidRequestException, UnavailableException, TimedOutException, TException { return qosClient.executeRead( - () -> DEFAULT_ESTIMATED_READ_BYTES, () -> client.get_range_slices(kvsMethodName, tableRef, predicate, range, consistency_level), - result -> getCollectionSize(result, ThriftObjectSizeUtils::getKeySliceSize)); + ThriftQueryWeighers.GET_RANGE_SLICES); } @Override @@ -98,17 +86,8 @@ public void batch_mutate(String kvsMethodName, Map getApproximateWriteByteCount(mutation_map), - () -> client.batch_mutate(kvsMethodName, mutation_map, consistency_level)); - } - - private int getApproximateWriteByteCount(Map>> batchMutateMap) { - int approxBytesForKeys = getCollectionSize(batchMutateMap.keySet(), ThriftObjectSizeUtils::getByteBufferSize); - int approxBytesForValues = getCollectionSize(batchMutateMap.values(), currentMap -> - getCollectionSize(currentMap.keySet(), ThriftObjectSizeUtils::getStringSize) - + getCollectionSize(currentMap.values(), - mutations -> getCollectionSize(mutations, ThriftObjectSizeUtils::getMutationSize))); - return approxBytesForKeys + approxBytesForValues; + () -> client.batch_mutate(kvsMethodName, mutation_map, consistency_level), + ThriftQueryWeighers.batchMutate(mutation_map)); } @Override @@ -116,9 +95,8 @@ public ColumnOrSuperColumn get(TableReference tableReference, ByteBuffer key, by ConsistencyLevel consistency_level) throws InvalidRequestException, NotFoundException, UnavailableException, TimedOutException, TException { return qosClient.executeRead( - () -> DEFAULT_ESTIMATED_READ_BYTES, () -> client.get(tableReference, key, column, consistency_level), - ThriftObjectSizeUtils::getColumnOrSuperColumnSize); + ThriftQueryWeighers.GET); } @Override @@ -134,14 +112,10 @@ public CASResult cas(TableReference tableReference, ByteBuffer key, List public CqlResult execute_cql3_query(CqlQuery cqlQuery, Compression compression, ConsistencyLevel consistency) throws InvalidRequestException, UnavailableException, TimedOutException, SchemaDisagreementException, TException { - return qosClient.executeRead( - () -> DEFAULT_ESTIMATED_READ_BYTES, () -> client.execute_cql3_query(cqlQuery, compression, consistency), - ThriftObjectSizeUtils::getCqlResultSize); + ThriftQueryWeighers.EXECUTE_CQL3_QUERY); } - private int getCollectionSize(Collection collection, Function singleObjectSizeFunction) { - return ThriftObjectSizeUtils.getCollectionSize(collection, singleObjectSizeFunction); - } + } diff --git a/atlasdb-cassandra/src/main/java/com/palantir/atlasdb/keyvalue/cassandra/ThriftObjectSizeUtils.java b/atlasdb-cassandra/src/main/java/com/palantir/atlasdb/keyvalue/cassandra/qos/ThriftObjectSizeUtils.java similarity index 66% rename from atlasdb-cassandra/src/main/java/com/palantir/atlasdb/keyvalue/cassandra/ThriftObjectSizeUtils.java rename to atlasdb-cassandra/src/main/java/com/palantir/atlasdb/keyvalue/cassandra/qos/ThriftObjectSizeUtils.java index f5f95e0a2a0..42afbd151a6 100644 --- a/atlasdb-cassandra/src/main/java/com/palantir/atlasdb/keyvalue/cassandra/ThriftObjectSizeUtils.java +++ b/atlasdb-cassandra/src/main/java/com/palantir/atlasdb/keyvalue/cassandra/qos/ThriftObjectSizeUtils.java @@ -14,10 +14,11 @@ * limitations under the License. */ -package com.palantir.atlasdb.keyvalue.cassandra; +package com.palantir.atlasdb.keyvalue.cassandra.qos; import java.nio.ByteBuffer; import java.util.Collection; +import java.util.List; import java.util.Map; import java.util.function.Function; @@ -37,13 +38,33 @@ public final class ThriftObjectSizeUtils { - private static final int ONE_BYTE = 1; + private static final long ONE_BYTE = 1; private ThriftObjectSizeUtils() { // utility class } - public static int getColumnOrSuperColumnSize(ColumnOrSuperColumn columnOrSuperColumn) { + public static long getApproximateWriteByteCount(Map>> batchMutateMap) { + long approxBytesForKeys = getCollectionSize(batchMutateMap.keySet(), ThriftObjectSizeUtils::getByteBufferSize); + long approxBytesForValues = getCollectionSize(batchMutateMap.values(), + currentMap -> getCollectionSize(currentMap.keySet(), ThriftObjectSizeUtils::getStringSize) + + getCollectionSize(currentMap.values(), + mutations -> getCollectionSize(mutations, ThriftObjectSizeUtils::getMutationSize))); + return approxBytesForKeys + approxBytesForValues; + } + + public static long getApproximateReadByteCount(Map> result) { + return getCollectionSize(result.entrySet(), + rowResult -> ThriftObjectSizeUtils.getByteBufferSize(rowResult.getKey()) + + getCollectionSize(rowResult.getValue(), + ThriftObjectSizeUtils::getColumnOrSuperColumnSize)); + } + + public static long getApproximateReadByteCount(List slices) { + return getCollectionSize(slices, ThriftObjectSizeUtils::getKeySliceSize); + } + + public static long getColumnOrSuperColumnSize(ColumnOrSuperColumn columnOrSuperColumn) { if (columnOrSuperColumn == null) { return getNullSize(); } @@ -53,14 +74,14 @@ public static int getColumnOrSuperColumnSize(ColumnOrSuperColumn columnOrSuperCo + getCounterSuperColumnSize(columnOrSuperColumn.getCounter_super_column()); } - public static int getByteBufferSize(ByteBuffer byteBuffer) { + public static long getByteBufferSize(ByteBuffer byteBuffer) { if (byteBuffer == null) { return getNullSize(); } return byteBuffer.remaining(); } - public static int getMutationSize(Mutation mutation) { + public static long getMutationSize(Mutation mutation) { if (mutation == null) { return getNullSize(); } @@ -69,7 +90,7 @@ public static int getMutationSize(Mutation mutation) { mutation.getDeletion()); } - public static int getCqlResultSize(CqlResult cqlResult) { + public static long getCqlResultSize(CqlResult cqlResult) { if (cqlResult == null) { return getNullSize(); } @@ -79,7 +100,7 @@ public static int getCqlResultSize(CqlResult cqlResult) { + getCqlMetadataSize(cqlResult.getSchema()); } - public static int getKeySliceSize(KeySlice keySlice) { + public static long getKeySliceSize(KeySlice keySlice) { if (keySlice == null) { return getNullSize(); } @@ -88,15 +109,15 @@ public static int getKeySliceSize(KeySlice keySlice) { + getCollectionSize(keySlice.getColumns(), ThriftObjectSizeUtils::getColumnOrSuperColumnSize); } - public static int getStringSize(String string) { + public static long getStringSize(String string) { if (string == null) { return getNullSize(); } - return string.length() * Character.SIZE; + return string.length(); } - public static int getColumnSize(Column column) { + public static long getColumnSize(Column column) { if (column == null) { return getNullSize(); } @@ -107,7 +128,7 @@ public static int getColumnSize(Column column) { + getTimestampSize(); } - private static int getCounterSuperColumnSize(CounterSuperColumn counterSuperColumn) { + private static long getCounterSuperColumnSize(CounterSuperColumn counterSuperColumn) { if (counterSuperColumn == null) { return getNullSize(); } @@ -116,7 +137,7 @@ private static int getCounterSuperColumnSize(CounterSuperColumn counterSuperColu + getCollectionSize(counterSuperColumn.getColumns(), ThriftObjectSizeUtils::getCounterColumnSize); } - private static int getCounterColumnSize(CounterColumn counterColumn) { + private static long getCounterColumnSize(CounterColumn counterColumn) { if (counterColumn == null) { return getNullSize(); } @@ -124,7 +145,7 @@ private static int getCounterColumnSize(CounterColumn counterColumn) { return getByteArraySize(counterColumn.getName()) + getCounterValueSize(); } - private static int getSuperColumnSize(SuperColumn superColumn) { + private static long getSuperColumnSize(SuperColumn superColumn) { if (superColumn == null) { return getNullSize(); } @@ -133,7 +154,7 @@ private static int getSuperColumnSize(SuperColumn superColumn) { + getCollectionSize(superColumn.getColumns(), ThriftObjectSizeUtils::getColumnSize); } - private static int getDeletionSize(Deletion deletion) { + private static long getDeletionSize(Deletion deletion) { if (deletion == null) { return getNullSize(); } @@ -143,7 +164,7 @@ private static int getDeletionSize(Deletion deletion) { + getSlicePredicateSize(deletion.getPredicate()); } - private static int getSlicePredicateSize(SlicePredicate predicate) { + private static long getSlicePredicateSize(SlicePredicate predicate) { if (predicate == null) { return getNullSize(); } @@ -152,7 +173,7 @@ private static int getSlicePredicateSize(SlicePredicate predicate) { + getSliceRangeSize(predicate.getSlice_range()); } - private static int getSliceRangeSize(SliceRange sliceRange) { + private static long getSliceRangeSize(SliceRange sliceRange) { if (sliceRange == null) { return getNullSize(); } @@ -163,7 +184,7 @@ private static int getSliceRangeSize(SliceRange sliceRange) { + getSliceRangeCountSize(); } - private static int getCqlMetadataSize(CqlMetadata schema) { + private static long getCqlMetadataSize(CqlMetadata schema) { if (schema == null) { return getNullSize(); } @@ -174,13 +195,13 @@ private static int getCqlMetadataSize(CqlMetadata schema) { + getStringSize(schema.getDefault_value_type()); } - private static int getByteBufferStringMapSize(Map nameTypes) { + private static long getByteBufferStringMapSize(Map nameTypes) { return getCollectionSize(nameTypes.entrySet(), entry -> ThriftObjectSizeUtils.getByteBufferSize(entry.getKey()) + ThriftObjectSizeUtils.getStringSize(entry.getValue())); } - private static int getCqlRowSize(CqlRow cqlRow) { + private static long getCqlRowSize(CqlRow cqlRow) { if (cqlRow == null) { return getNullSize(); } @@ -188,47 +209,47 @@ private static int getCqlRowSize(CqlRow cqlRow) { + getCollectionSize(cqlRow.getColumns(), ThriftObjectSizeUtils::getColumnSize); } - private static int getThriftEnumSize() { + private static long getThriftEnumSize() { return Integer.BYTES; } - private static int getByteArraySize(byte[] byteArray) { + private static long getByteArraySize(byte[] byteArray) { if (byteArray == null) { return getNullSize(); } return byteArray.length; } - private static int getTimestampSize() { + private static long getTimestampSize() { return Long.BYTES; } - private static int getTtlSize() { + private static long getTtlSize() { return Integer.BYTES; } - private static int getCounterValueSize() { + private static long getCounterValueSize() { return Long.BYTES; } - private static int getReversedBooleanSize() { + private static long getReversedBooleanSize() { return ONE_BYTE; } - private static int getSliceRangeCountSize() { + private static long getSliceRangeCountSize() { return Integer.BYTES; } - private static int getNullSize() { + private static long getNullSize() { return Integer.BYTES; } - public static int getCollectionSize(Collection collection, Function sizeFunction) { + public static long getCollectionSize(Collection collection, Function sizeFunction) { if (collection == null) { return getNullSize(); } - int sum = 0; + long sum = 0; for (T item : collection) { sum += sizeFunction.apply(item); } diff --git a/atlasdb-cassandra/src/main/java/com/palantir/atlasdb/keyvalue/cassandra/qos/ThriftQueryWeighers.java b/atlasdb-cassandra/src/main/java/com/palantir/atlasdb/keyvalue/cassandra/qos/ThriftQueryWeighers.java new file mode 100644 index 00000000000..c9a018d9e69 --- /dev/null +++ b/atlasdb-cassandra/src/main/java/com/palantir/atlasdb/keyvalue/cassandra/qos/ThriftQueryWeighers.java @@ -0,0 +1,123 @@ +/* + * Copyright 2017 Palantir Technologies, Inc. All rights reserved. + * + * Licensed under the BSD-3 License (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://opensource.org/licenses/BSD-3-Clause + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.palantir.atlasdb.keyvalue.cassandra.qos; + +import java.nio.ByteBuffer; +import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeUnit; +import java.util.function.Function; +import java.util.function.Supplier; + +import org.apache.cassandra.thrift.ColumnOrSuperColumn; +import org.apache.cassandra.thrift.CqlResult; +import org.apache.cassandra.thrift.KeySlice; +import org.apache.cassandra.thrift.Mutation; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.base.Suppliers; +import com.palantir.atlasdb.keyvalue.cassandra.CassandraClient; +import com.palantir.atlasdb.qos.ImmutableQueryWeight; +import com.palantir.atlasdb.qos.QosClient; +import com.palantir.atlasdb.qos.QueryWeight; + +public final class ThriftQueryWeighers { + + private static final Logger log = LoggerFactory.getLogger(CassandraClient.class); + + public static final QueryWeight DEFAULT_ESTIMATED_WEIGHT = ImmutableQueryWeight.builder() + .numBytes(100) + .numDistinctRows(1) + .timeTakenNanos(TimeUnit.MILLISECONDS.toNanos(2)) + .build(); + + private ThriftQueryWeighers() { } + + public static final QosClient.QueryWeigher>> MULTIGET_SLICE = + readWeigher(ThriftObjectSizeUtils::getApproximateReadByteCount, Map::size); + + public static final QosClient.QueryWeigher> GET_RANGE_SLICES = + readWeigher(ThriftObjectSizeUtils::getApproximateReadByteCount, List::size); + + public static final QosClient.QueryWeigher GET = + readWeigher(ThriftObjectSizeUtils::getColumnOrSuperColumnSize, ignored -> 1); + + public static final QosClient.QueryWeigher EXECUTE_CQL3_QUERY = + // TODO(nziebart): we need to inspect the schema to see how many rows there are - a CQL row is NOT a + // partition. rows here will depend on the type of query executed in CqlExecutor: either (column, ts) pairs, + // or (key, column, ts) triplets + readWeigher(ThriftObjectSizeUtils::getCqlResultSize, ignored -> 1); + + public static QosClient.QueryWeigher batchMutate( + Map>> mutationMap) { + long numRows = mutationMap.size(); + return writeWeigher(numRows, () -> ThriftObjectSizeUtils.getApproximateWriteByteCount(mutationMap)); + } + + public static QosClient.QueryWeigher readWeigher(Function bytesRead, Function numRows) { + return new QosClient.QueryWeigher() { + @Override + public QueryWeight estimate() { + return DEFAULT_ESTIMATED_WEIGHT; + } + + @Override + public QueryWeight weigh(T result, long timeTakenNanos) { + return ImmutableQueryWeight.builder() + .numBytes(safeGetNumBytesOrDefault(() -> bytesRead.apply(result))) + .timeTakenNanos(timeTakenNanos) + .numDistinctRows(numRows.apply(result)) + .build(); + } + }; + } + + public static QosClient.QueryWeigher writeWeigher(long numRows, Supplier bytesWritten) { + Supplier weight = Suppliers.memoize(() -> safeGetNumBytesOrDefault(bytesWritten))::get; + + return new QosClient.QueryWeigher() { + @Override + public QueryWeight estimate() { + return ImmutableQueryWeight.builder() + .from(DEFAULT_ESTIMATED_WEIGHT) + .numBytes(weight.get()) + .numDistinctRows(numRows) + .build(); + } + + @Override + public QueryWeight weigh(T result, long timeTakenNanos) { + return ImmutableQueryWeight.builder() + .from(estimate()) + .timeTakenNanos(timeTakenNanos) + .build(); + } + }; + } + + // TODO(nziebart): we really shouldn't be needing to catch exceptions here + private static long safeGetNumBytesOrDefault(Supplier numBytes) { + try { + return numBytes.get(); + } catch (Exception e) { + log.warn("Error calculating number of bytes", e); + return DEFAULT_ESTIMATED_WEIGHT.numBytes(); + } + } + +} diff --git a/atlasdb-cassandra/src/test/java/com/palantir/atlasdb/ThriftObjectSizeUtilsTest.java b/atlasdb-cassandra/src/test/java/com/palantir/atlasdb/ThriftObjectSizeUtilsTest.java index a9f623361e0..a55b655be25 100644 --- a/atlasdb-cassandra/src/test/java/com/palantir/atlasdb/ThriftObjectSizeUtilsTest.java +++ b/atlasdb-cassandra/src/test/java/com/palantir/atlasdb/ThriftObjectSizeUtilsTest.java @@ -19,6 +19,8 @@ import static org.assertj.core.api.Assertions.assertThat; import java.nio.ByteBuffer; +import java.util.List; +import java.util.Map; import org.apache.cassandra.thrift.Column; import org.apache.cassandra.thrift.ColumnOrSuperColumn; @@ -33,15 +35,18 @@ import org.junit.Test; import com.google.common.collect.ImmutableList; -import com.palantir.atlasdb.keyvalue.cassandra.ThriftObjectSizeUtils; +import com.google.common.collect.ImmutableMap; +import com.palantir.atlasdb.keyvalue.cassandra.qos.ThriftObjectSizeUtils; public class ThriftObjectSizeUtilsTest { - private static final String TEST_MAME = "test"; - private static final Column TEST_COLUMN = new Column(ByteBuffer.wrap(TEST_MAME.getBytes())); + private static final String TEST_MAME = "foo"; + private static final ByteBuffer TEST_NAME_BYTES = ByteBuffer.wrap(TEST_MAME.getBytes()); + private static final Column TEST_COLUMN = new Column(TEST_NAME_BYTES); - - private static final long TEST_COLUMN_SIZE = 4L + TEST_MAME.getBytes().length + 4L + 8L; + private static final long TEST_NAME_SIZE = 3L; + private static final long TEST_NAME_BYTES_SIZE = TEST_NAME_BYTES.remaining(); + private static final long TEST_COLUMN_SIZE = TEST_NAME_BYTES_SIZE + 4L + 4L + 8L; private static final ColumnOrSuperColumn EMPTY_COLUMN_OR_SUPERCOLUMN = new ColumnOrSuperColumn(); private static final long EMPTY_COLUMN_OR_SUPERCOLUMN_SIZE = Integer.BYTES * 4; @@ -74,7 +79,7 @@ public void getSizeForColumnOrSuperColumnWithANonEmptyColumnAndSuperColumn() { .setColumn(TEST_COLUMN) .setSuper_column(new SuperColumn(ByteBuffer.wrap(TEST_MAME.getBytes()), ImmutableList.of(TEST_COLUMN))))) - .isEqualTo(Integer.BYTES * 2 + TEST_COLUMN_SIZE + TEST_MAME.getBytes().length + TEST_COLUMN_SIZE); + .isEqualTo(Integer.BYTES * 2 + TEST_COLUMN_SIZE + TEST_NAME_BYTES_SIZE + TEST_COLUMN_SIZE); } @Test @@ -198,4 +203,51 @@ public void getSizeForKeySliceWithKeyAndColumns() { .setColumns(ImmutableList.of(EMPTY_COLUMN_OR_SUPERCOLUMN)))) .isEqualTo(TEST_MAME.getBytes().length + EMPTY_COLUMN_OR_SUPERCOLUMN_SIZE); } + + @Test + public void getSizeForBatchMutate() { + Map>> batchMutateMap = ImmutableMap.of( + TEST_NAME_BYTES, + ImmutableMap.of( + TEST_MAME, + ImmutableList.of(new Mutation().setColumn_or_supercolumn(EMPTY_COLUMN_OR_SUPERCOLUMN)))); + + long expectedSize = TEST_NAME_BYTES_SIZE + + TEST_NAME_SIZE + + Integer.BYTES + + EMPTY_COLUMN_OR_SUPERCOLUMN_SIZE; + + assertThat(ThriftObjectSizeUtils.getApproximateWriteByteCount(batchMutateMap)).isEqualTo(expectedSize); + } + + @Test + public void getStringSize() { + assertThat(ThriftObjectSizeUtils.getStringSize(TEST_MAME)).isEqualTo(TEST_NAME_SIZE); + } + + @Test + public void getMultigetResultSize() { + Map> result = ImmutableMap.of( + TEST_NAME_BYTES, ImmutableList.of(EMPTY_COLUMN_OR_SUPERCOLUMN)); + + long expectedSize = TEST_NAME_BYTES_SIZE + + EMPTY_COLUMN_OR_SUPERCOLUMN_SIZE; + + assertThat(ThriftObjectSizeUtils.getApproximateReadByteCount(result)).isEqualTo(expectedSize); + } + + @Test + public void getKeySlicesSize() { + List slices = ImmutableList.of( + new KeySlice() + .setKey(TEST_NAME_BYTES) + .setColumns(ImmutableList.of(EMPTY_COLUMN_OR_SUPERCOLUMN))); + + long expectedSize = TEST_NAME_BYTES_SIZE + + EMPTY_COLUMN_OR_SUPERCOLUMN_SIZE; + + assertThat(ThriftObjectSizeUtils.getApproximateReadByteCount(slices)).isEqualTo(expectedSize); + + } + } diff --git a/atlasdb-cassandra/src/test/java/com/palantir/atlasdb/keyvalue/cassandra/QosCassandraClientTest.java b/atlasdb-cassandra/src/test/java/com/palantir/atlasdb/keyvalue/cassandra/QosCassandraClientTest.java index 38d762b3563..3283aab1bcc 100644 --- a/atlasdb-cassandra/src/test/java/com/palantir/atlasdb/keyvalue/cassandra/QosCassandraClientTest.java +++ b/atlasdb-cassandra/src/test/java/com/palantir/atlasdb/keyvalue/cassandra/QosCassandraClientTest.java @@ -38,6 +38,7 @@ import com.google.common.collect.ImmutableMap; import com.palantir.atlasdb.encoding.PtBytes; import com.palantir.atlasdb.keyvalue.api.TableReference; +import com.palantir.atlasdb.keyvalue.cassandra.qos.QosCassandraClient; import com.palantir.atlasdb.keyvalue.cassandra.thrift.SlicePredicates; import com.palantir.atlasdb.qos.QosClient; @@ -62,7 +63,7 @@ public void setUp() { public void multigetSliceChecksLimit() throws TException, LimitExceededException { client.multiget_slice("get", TEST_TABLE, ImmutableList.of(ROW_KEY), SLICE_PREDICATE, ConsistencyLevel.ANY); - verify(qosClient, times(1)).executeRead(any(), any(), any()); + verify(qosClient, times(1)).executeRead(any(), any()); verifyNoMoreInteractions(qosClient); } @@ -79,7 +80,7 @@ public void executeCqlQueryChecksLimit() throws TException, LimitExceededExcepti CqlQuery query = new CqlQuery("SELECT * FROM test_table LIMIT 1"); client.execute_cql3_query(query, Compression.NONE, ConsistencyLevel.ANY); - verify(qosClient, times(1)).executeRead(any(), any(), any()); + verify(qosClient, times(1)).executeRead(any(), any()); verifyNoMoreInteractions(qosClient); } @@ -87,7 +88,7 @@ public void executeCqlQueryChecksLimit() throws TException, LimitExceededExcepti public void getRangeSlicesChecksLimit() throws TException, LimitExceededException { client.get_range_slices("get", TEST_TABLE, SLICE_PREDICATE, new KeyRange(), ConsistencyLevel.ANY); - verify(qosClient, times(1)).executeRead(any(), any(), any()); + verify(qosClient, times(1)).executeRead(any(), any()); verifyNoMoreInteractions(qosClient); } } diff --git a/atlasdb-cassandra/src/test/java/com/palantir/atlasdb/keyvalue/cassandra/qos/ThriftQueryWeighersTest.java b/atlasdb-cassandra/src/test/java/com/palantir/atlasdb/keyvalue/cassandra/qos/ThriftQueryWeighersTest.java new file mode 100644 index 00000000000..c19d01e508d --- /dev/null +++ b/atlasdb-cassandra/src/test/java/com/palantir/atlasdb/keyvalue/cassandra/qos/ThriftQueryWeighersTest.java @@ -0,0 +1,94 @@ +/* + * Copyright 2017 Palantir Technologies, Inc. All rights reserved. + * + * Licensed under the BSD-3 License (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://opensource.org/licenses/BSD-3-Clause + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.palantir.atlasdb.keyvalue.cassandra.qos; + +import static org.assertj.core.api.Assertions.assertThat; + +import java.nio.ByteBuffer; +import java.util.List; +import java.util.Map; + +import org.apache.cassandra.thrift.ColumnOrSuperColumn; +import org.apache.cassandra.thrift.CqlResult; +import org.apache.cassandra.thrift.KeySlice; +import org.apache.cassandra.thrift.Mutation; +import org.junit.Test; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; + +public class ThriftQueryWeighersTest { + + private static final ByteBuffer BYTES1 = ByteBuffer.allocate(3); + private static final ByteBuffer BYTES2 = ByteBuffer.allocate(7); + private static final ColumnOrSuperColumn COLUMN = new ColumnOrSuperColumn(); + private static final KeySlice KEY_SLICE = new KeySlice(); + private static final Mutation MUTATION = new Mutation(); + + private static final long UNIMPORTANT_ARG = 123L; + + @Test + public void multigetSliceWeigherReturnsCorrectNumRows() { + Map> result = ImmutableMap.of( + BYTES1, ImmutableList.of(COLUMN, COLUMN), + BYTES2, ImmutableList.of(COLUMN)); + + long actualNumRows = ThriftQueryWeighers.MULTIGET_SLICE.weigh(result, UNIMPORTANT_ARG).numDistinctRows(); + + assertThat(actualNumRows).isEqualTo(2); + } + + @Test + public void rangeSlicesWeigherReturnsCorrectNumRows() { + List result = ImmutableList.of(KEY_SLICE, KEY_SLICE, KEY_SLICE); + + long actualNumRows = ThriftQueryWeighers.GET_RANGE_SLICES.weigh(result, UNIMPORTANT_ARG).numDistinctRows(); + + assertThat(actualNumRows).isEqualTo(3); + } + + @Test + public void getWeigherReturnsCorrectNumRows() { + long actualNumRows = ThriftQueryWeighers.GET.weigh(COLUMN, UNIMPORTANT_ARG).numDistinctRows(); + + assertThat(actualNumRows).isEqualTo(1); + } + + @Test + public void executeCql3QueryWeigherReturnsOneRowAlways() { + long actualNumRows = ThriftQueryWeighers.EXECUTE_CQL3_QUERY.weigh(new CqlResult(), + UNIMPORTANT_ARG).numDistinctRows(); + + assertThat(actualNumRows).isEqualTo(1); + } + + @Test + public void batchMutateWeigherReturnsCorrectNumRows() { + Map>> mutations = ImmutableMap.of( + BYTES1, ImmutableMap.of( + "table1", ImmutableList.of(MUTATION, MUTATION), + "table2", ImmutableList.of(MUTATION)), + BYTES2, ImmutableMap.of( + "table1", ImmutableList.of(MUTATION))); + + long actualNumRows = ThriftQueryWeighers.batchMutate(mutations).weigh(null, UNIMPORTANT_ARG) + .numDistinctRows(); + + assertThat(actualNumRows).isEqualTo(2); + } + +} diff --git a/atlasdb-config/src/main/java/com/palantir/atlasdb/config/AtlasDbRuntimeConfig.java b/atlasdb-config/src/main/java/com/palantir/atlasdb/config/AtlasDbRuntimeConfig.java index 30ca96ce1cc..df5fbf01319 100644 --- a/atlasdb-config/src/main/java/com/palantir/atlasdb/config/AtlasDbRuntimeConfig.java +++ b/atlasdb-config/src/main/java/com/palantir/atlasdb/config/AtlasDbRuntimeConfig.java @@ -23,7 +23,7 @@ import com.fasterxml.jackson.databind.annotation.JsonDeserialize; import com.fasterxml.jackson.databind.annotation.JsonSerialize; import com.palantir.atlasdb.AtlasDbConstants; -import com.palantir.remoting.api.config.service.ServiceConfiguration; +import com.palantir.atlasdb.qos.config.QosClientConfig; @JsonDeserialize(as = ImmutableAtlasDbRuntimeConfig.class) @JsonSerialize(as = ImmutableAtlasDbRuntimeConfig.class) @@ -61,7 +61,10 @@ public long getTimestampCacheSize() { return AtlasDbConstants.DEFAULT_TIMESTAMP_CACHE_SIZE; } - public abstract Optional getQosServiceConfiguration(); + @Value.Default + public QosClientConfig qos() { + return QosClientConfig.DEFAULT; + } /** * Runtime live-reloadable parameters for communicating with TimeLock. diff --git a/atlasdb-config/src/main/java/com/palantir/atlasdb/factory/TransactionManagers.java b/atlasdb-config/src/main/java/com/palantir/atlasdb/factory/TransactionManagers.java index e57ee08c3d9..cf1450fb8eb 100644 --- a/atlasdb-config/src/main/java/com/palantir/atlasdb/factory/TransactionManagers.java +++ b/atlasdb-config/src/main/java/com/palantir/atlasdb/factory/TransactionManagers.java @@ -18,8 +18,6 @@ import java.util.Optional; import java.util.Set; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.function.Consumer; import java.util.function.Supplier; @@ -30,7 +28,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.codahale.metrics.InstrumentedScheduledExecutorService; import com.codahale.metrics.MetricRegistry; import com.fasterxml.jackson.annotation.JsonIgnore; import com.google.common.annotations.VisibleForTesting; @@ -72,11 +69,10 @@ import com.palantir.atlasdb.persistentlock.KvsBackedPersistentLockService; import com.palantir.atlasdb.persistentlock.NoOpPersistentLockService; import com.palantir.atlasdb.persistentlock.PersistentLockService; -import com.palantir.atlasdb.qos.FakeQosClient; import com.palantir.atlasdb.qos.QosClient; -import com.palantir.atlasdb.qos.QosService; import com.palantir.atlasdb.qos.client.AtlasDbQosClient; -import com.palantir.atlasdb.qos.ratelimit.QosRateLimiter; +import com.palantir.atlasdb.qos.config.QosClientConfig; +import com.palantir.atlasdb.qos.ratelimit.QosRateLimiters; import com.palantir.atlasdb.schema.generated.SweepTableFactory; import com.palantir.atlasdb.spi.AtlasDbFactory; import com.palantir.atlasdb.spi.KeyValueServiceConfig; @@ -117,9 +113,6 @@ import com.palantir.lock.impl.LockServiceImpl; import com.palantir.lock.v2.TimelockService; import com.palantir.logsafe.UnsafeArg; -import com.palantir.remoting.api.config.service.ServiceConfiguration; -import com.palantir.remoting3.clients.ClientConfigurations; -import com.palantir.remoting3.jaxrs.JaxRsClient; import com.palantir.timestamp.TimestampService; import com.palantir.timestamp.TimestampStoreInvalidator; import com.palantir.util.OptionalResolver; @@ -316,8 +309,7 @@ SerializableTransactionManager serializable() { java.util.function.Supplier runtimeConfigSupplier = () -> runtimeConfigSupplier().get().orElse(defaultRuntime); - - QosClient qosClient = getQosClient(runtimeConfigSupplier.get().getQosServiceConfiguration()); + QosClient qosClient = getQosClient(runtimeConfigSupplier.get().qos()); ServiceDiscoveringAtlasSupplier atlasFactory = new ServiceDiscoveringAtlasSupplier( @@ -411,20 +403,12 @@ SerializableTransactionManager serializable() { return transactionManager; } - private QosClient getQosClient(Optional serviceConfiguration) { - return serviceConfiguration.map(this::createAtlasDbQosClient).orElse(FakeQosClient.INSTANCE); - } - - private QosClient createAtlasDbQosClient(ServiceConfiguration serviceConfiguration) { - QosService qosService = JaxRsClient.create(QosService.class, - userAgent(), - ClientConfigurations.of(serviceConfiguration)); + private QosClient getQosClient(QosClientConfig config) { // TODO(nziebart): create a RefreshingRateLimiter - ScheduledExecutorService scheduler = new InstrumentedScheduledExecutorService( - Executors.newSingleThreadScheduledExecutor(), - AtlasDbMetrics.getMetricRegistry(), - "qos-client-executor"); - return AtlasDbQosClient.create(QosRateLimiter.create()); + QosRateLimiters rateLimiters = QosRateLimiters.create( + config.limits(), + config.maxBackoffSleepTime().toMilliseconds()); + return AtlasDbQosClient.create(rateLimiters); } private static boolean areTransactionManagerInitializationPrerequisitesSatisfied( diff --git a/atlasdb-config/src/test/java/com/palantir/atlasdb/factory/TransactionManagersTest.java b/atlasdb-config/src/test/java/com/palantir/atlasdb/factory/TransactionManagersTest.java index 729b4f0e69f..5a6258b43bd 100644 --- a/atlasdb-config/src/test/java/com/palantir/atlasdb/factory/TransactionManagersTest.java +++ b/atlasdb-config/src/test/java/com/palantir/atlasdb/factory/TransactionManagersTest.java @@ -73,6 +73,7 @@ import com.palantir.atlasdb.config.TimeLockClientConfig; import com.palantir.atlasdb.factory.startup.TimeLockMigrator; import com.palantir.atlasdb.memory.InMemoryAtlasDbConfig; +import com.palantir.atlasdb.qos.config.QosClientConfig; import com.palantir.atlasdb.table.description.GenericTestSchema; import com.palantir.atlasdb.transaction.impl.SerializableTransactionManager; import com.palantir.atlasdb.util.MetricsRule; @@ -193,7 +194,7 @@ public void setup() throws JsonProcessingException { runtimeConfig = mock(AtlasDbRuntimeConfig.class); when(runtimeConfig.timestampClient()).thenReturn(ImmutableTimestampClientConfig.of(false)); - when(runtimeConfig.getQosServiceConfiguration()).thenReturn(Optional.empty()); + when(runtimeConfig.qos()).thenReturn(QosClientConfig.DEFAULT); when(runtimeConfig.timelockRuntime()).thenReturn(Optional.empty()); environment = mock(Consumer.class); diff --git a/qos-service-api/build.gradle b/qos-service-api/build.gradle index 60b3ef8dbf4..421a6862cd3 100644 --- a/qos-service-api/build.gradle +++ b/qos-service-api/build.gradle @@ -17,5 +17,8 @@ dependencies { exclude (module:'okhttp') exclude (module:'jsr305') } + + processor group: 'org.immutables', name: 'value' + } diff --git a/qos-service-api/src/main/java/com/palantir/atlasdb/qos/FakeQosClient.java b/qos-service-api/src/main/java/com/palantir/atlasdb/qos/FakeQosClient.java index 4b24852a266..7aaa05293e0 100644 --- a/qos-service-api/src/main/java/com/palantir/atlasdb/qos/FakeQosClient.java +++ b/qos-service-api/src/main/java/com/palantir/atlasdb/qos/FakeQosClient.java @@ -16,22 +16,19 @@ package com.palantir.atlasdb.qos; -import java.util.function.Function; -import java.util.function.Supplier; - public class FakeQosClient implements QosClient { public static final FakeQosClient INSTANCE = new FakeQosClient(); @Override - public T executeRead(Supplier estimatedWeight, ReadQuery query, - Function weigher) throws E { + public T executeRead(ReadQuery query, QueryWeigher weigher) + throws E { return query.execute(); } @Override - public void executeWrite(Supplier weight, WriteQuery query) - throws E { + public void executeWrite(WriteQuery query, + QueryWeigher weigher) throws E { query.execute(); } } diff --git a/qos-service-api/src/main/java/com/palantir/atlasdb/qos/QosClient.java b/qos-service-api/src/main/java/com/palantir/atlasdb/qos/QosClient.java index 26d85e9d4c0..656be55b66b 100644 --- a/qos-service-api/src/main/java/com/palantir/atlasdb/qos/QosClient.java +++ b/qos-service-api/src/main/java/com/palantir/atlasdb/qos/QosClient.java @@ -16,9 +16,6 @@ package com.palantir.atlasdb.qos; -import java.util.function.Function; -import java.util.function.Supplier; - public interface QosClient { interface ReadQuery { @@ -29,12 +26,16 @@ interface WriteQuery { void execute() throws E; } + interface QueryWeigher { + QueryWeight estimate(); + QueryWeight weigh(T result, long timeTakenNanos); + } + T executeRead( - Supplier estimatedWeight, ReadQuery query, - Function weigher) throws E; + QueryWeigher weigher) throws E; void executeWrite( - Supplier weight, - WriteQuery query) throws E; + WriteQuery query, + QueryWeigher weigher) throws E; } diff --git a/qos-service-api/src/main/java/com/palantir/atlasdb/qos/QosService.java b/qos-service-api/src/main/java/com/palantir/atlasdb/qos/QosService.java index 469caed1427..1dced841626 100644 --- a/qos-service-api/src/main/java/com/palantir/atlasdb/qos/QosService.java +++ b/qos-service-api/src/main/java/com/palantir/atlasdb/qos/QosService.java @@ -30,5 +30,5 @@ public interface QosService { @POST @Consumes(MediaType.APPLICATION_JSON) @Produces(MediaType.APPLICATION_JSON) - int getLimit(@Safe @PathParam("client") String client); + long getLimit(@Safe @PathParam("client") String client); } diff --git a/qos-service-api/src/main/java/com/palantir/atlasdb/qos/QueryWeight.java b/qos-service-api/src/main/java/com/palantir/atlasdb/qos/QueryWeight.java new file mode 100644 index 00000000000..6e36a73c4dc --- /dev/null +++ b/qos-service-api/src/main/java/com/palantir/atlasdb/qos/QueryWeight.java @@ -0,0 +1,37 @@ +/* + * Copyright 2017 Palantir Technologies, Inc. All rights reserved. + * + * Licensed under the BSD-3 License (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://opensource.org/licenses/BSD-3-Clause + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.palantir.atlasdb.qos; + +import java.util.concurrent.TimeUnit; + +import org.immutables.value.Value; + +@Value.Immutable +public interface QueryWeight { + + long numBytes(); + + long numDistinctRows(); + + // TODO(nziebart): need to standardize everyhting to longs, and handle casting to int in QosRateLimiter + long timeTakenNanos(); + + default long timeTakenMicros() { + return TimeUnit.NANOSECONDS.toMicros(timeTakenNanos()); + } + +} diff --git a/qos-service-impl/src/integTest/java/com/palantir/atlasdb/qos/QosServiceIntegrationTest.java b/qos-service-impl/src/integTest/java/com/palantir/atlasdb/qos/QosServiceIntegrationTest.java index 51642b0c450..4d355fa2978 100644 --- a/qos-service-impl/src/integTest/java/com/palantir/atlasdb/qos/QosServiceIntegrationTest.java +++ b/qos-service-impl/src/integTest/java/com/palantir/atlasdb/qos/QosServiceIntegrationTest.java @@ -50,8 +50,8 @@ public class QosServiceIntegrationTest { @Test public void returnsConfiguredLimits() { - assertThat(service.getLimit("test")).isEqualTo(10L); - assertThat(service.getLimit("test2")).isEqualTo(20L); + assertThat(service.getLimit("test")).isEqualTo(10); + assertThat(service.getLimit("test2")).isEqualTo(20); } } diff --git a/qos-service-impl/src/main/java/com/palantir/atlasdb/qos/QosResource.java b/qos-service-impl/src/main/java/com/palantir/atlasdb/qos/QosResource.java index b7443421cfb..ed199b69e4b 100644 --- a/qos-service-impl/src/main/java/com/palantir/atlasdb/qos/QosResource.java +++ b/qos-service-impl/src/main/java/com/palantir/atlasdb/qos/QosResource.java @@ -29,7 +29,7 @@ public QosResource(Supplier config) { } @Override - public int getLimit(String client) { - return config.get().clientLimits().getOrDefault(client, Integer.MAX_VALUE); + public long getLimit(String client) { + return config.get().clientLimits().getOrDefault(client, Long.MAX_VALUE); } } diff --git a/qos-service-impl/src/main/java/com/palantir/atlasdb/qos/client/AtlasDbQosClient.java b/qos-service-impl/src/main/java/com/palantir/atlasdb/qos/client/AtlasDbQosClient.java index 997c75cce36..f1832ba2199 100644 --- a/qos-service-impl/src/main/java/com/palantir/atlasdb/qos/client/AtlasDbQosClient.java +++ b/qos-service-impl/src/main/java/com/palantir/atlasdb/qos/client/AtlasDbQosClient.java @@ -15,83 +15,67 @@ */ package com.palantir.atlasdb.qos.client; -import java.util.concurrent.TimeUnit; -import java.util.function.Function; -import java.util.function.Supplier; - import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Ticker; import com.palantir.atlasdb.qos.QosClient; -import com.palantir.atlasdb.qos.QosMetrics; -import com.palantir.atlasdb.qos.ratelimit.QosRateLimiter; +import com.palantir.atlasdb.qos.QueryWeight; +import com.palantir.atlasdb.qos.metrics.QosMetrics; +import com.palantir.atlasdb.qos.ratelimit.QosRateLimiters; public class AtlasDbQosClient implements QosClient { private static final Logger log = LoggerFactory.getLogger(AtlasDbQosClient.class); - private final QosRateLimiter rateLimiter; + private static final Void NO_RESULT = null; + + private final QosRateLimiters rateLimiters; private final QosMetrics metrics; private final Ticker ticker; - public static AtlasDbQosClient create(QosRateLimiter rateLimiter) { - return new AtlasDbQosClient(rateLimiter, new QosMetrics(), Ticker.systemTicker()); + public static AtlasDbQosClient create(QosRateLimiters rateLimiters) { + return new AtlasDbQosClient(rateLimiters, new QosMetrics(), Ticker.systemTicker()); } @VisibleForTesting - AtlasDbQosClient(QosRateLimiter rateLimiter, QosMetrics metrics, Ticker ticker) { + AtlasDbQosClient(QosRateLimiters rateLimiters, QosMetrics metrics, Ticker ticker) { this.metrics = metrics; - this.rateLimiter = rateLimiter; + this.rateLimiters = rateLimiters; this.ticker = ticker; } @Override - public T executeRead( - Supplier estimatedWeigher, - ReadQuery query, - Function weigher) throws E { - int estimatedWeight = getWeight(estimatedWeigher, 1); - rateLimiter.consumeWithBackoff(estimatedWeight); + public T executeRead(ReadQuery query, QueryWeigher weigher) throws E { + long estimatedNumBytes = weigher.estimate().numBytes(); + rateLimiters.read().consumeWithBackoff(estimatedNumBytes); // TODO(nziebart): decide what to do if we encounter a timeout exception long startTimeNanos = ticker.read(); T result = query.execute(); long totalTimeNanos = ticker.read() - startTimeNanos; - int actualWeight = getWeight(() -> weigher.apply(result), estimatedWeight); - metrics.updateReadCount(); - metrics.updateBytesRead(actualWeight); - metrics.updateReadTimeMicros(TimeUnit.NANOSECONDS.toMicros(totalTimeNanos)); - rateLimiter.recordAdjustment(actualWeight - estimatedWeight); + QueryWeight actualWeight = weigher.weigh(result, totalTimeNanos); + metrics.recordRead(actualWeight); + rateLimiters.read().recordAdjustment(actualWeight.numBytes() - estimatedNumBytes); return result; } @Override - public void executeWrite(Supplier weigher, WriteQuery query) throws E { - int weight = getWeight(weigher, 1); - rateLimiter.consumeWithBackoff(weight); + public void executeWrite(WriteQuery query, QueryWeigher weigher) throws E { + long estimatedNumBytes = weigher.estimate().numBytes(); + rateLimiters.write().consumeWithBackoff(estimatedNumBytes); // TODO(nziebart): decide what to do if we encounter a timeout exception long startTimeNanos = ticker.read(); query.execute(); long totalTimeNanos = ticker.read() - startTimeNanos; - metrics.updateWriteCount(); - metrics.updateWriteTimeMicros(TimeUnit.NANOSECONDS.toMicros(totalTimeNanos)); - metrics.updateBytesWritten(weight); - } - - // TODO(nziebart): error handling in the weight calculation should be responsibility of the caller - private Integer getWeight(Supplier weigher, int fallback) { - try { - return weigher.get(); - } catch (Exception e) { - log.warn("Exception while calculating response weight", e); - return fallback; - } + QueryWeight actualWeight = weigher.weigh(NO_RESULT, totalTimeNanos); + metrics.recordWrite(actualWeight); + rateLimiters.write().recordAdjustment(actualWeight.numBytes() - estimatedNumBytes); } } diff --git a/qos-service-impl/src/main/java/com/palantir/atlasdb/qos/config/QosClientConfig.java b/qos-service-impl/src/main/java/com/palantir/atlasdb/qos/config/QosClientConfig.java new file mode 100644 index 00000000000..313a02f3d7d --- /dev/null +++ b/qos-service-impl/src/main/java/com/palantir/atlasdb/qos/config/QosClientConfig.java @@ -0,0 +1,47 @@ +/* + * Copyright 2017 Palantir Technologies, Inc. All rights reserved. + * + * Licensed under the BSD-3 License (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://opensource.org/licenses/BSD-3-Clause + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.palantir.atlasdb.qos.config; + +import java.util.Optional; + +import org.immutables.value.Value; + +import com.fasterxml.jackson.databind.annotation.JsonDeserialize; +import com.fasterxml.jackson.databind.annotation.JsonSerialize; +import com.palantir.remoting.api.config.service.HumanReadableDuration; +import com.palantir.remoting.api.config.service.ServiceConfiguration; + +@Value.Immutable +@JsonDeserialize(as = ImmutableQosClientConfig.class) +@JsonSerialize(as = ImmutableQosClientConfig.class) +public abstract class QosClientConfig { + + public static final QosClientConfig DEFAULT = ImmutableQosClientConfig.builder().build(); + + public abstract Optional qosService(); + + @Value.Default + public HumanReadableDuration maxBackoffSleepTime() { + return HumanReadableDuration.seconds(10); + } + + @Value.Default + public QosLimitsConfig limits() { + return QosLimitsConfig.DEFAULT_NO_LIMITS; + } + +} diff --git a/qos-service-impl/src/main/java/com/palantir/atlasdb/qos/config/QosLimitsConfig.java b/qos-service-impl/src/main/java/com/palantir/atlasdb/qos/config/QosLimitsConfig.java new file mode 100644 index 00000000000..e932632ddaa --- /dev/null +++ b/qos-service-impl/src/main/java/com/palantir/atlasdb/qos/config/QosLimitsConfig.java @@ -0,0 +1,41 @@ +/* + * Copyright 2017 Palantir Technologies, Inc. All rights reserved. + * + * Licensed under the BSD-3 License (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://opensource.org/licenses/BSD-3-Clause + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.palantir.atlasdb.qos.config; + +import org.immutables.value.Value; + +import com.fasterxml.jackson.databind.annotation.JsonDeserialize; +import com.fasterxml.jackson.databind.annotation.JsonSerialize; + +@Value.Immutable +@JsonDeserialize(as = ImmutableQosLimitsConfig.class) +@JsonSerialize(as = ImmutableQosLimitsConfig.class) +public abstract class QosLimitsConfig { + + public static final QosLimitsConfig DEFAULT_NO_LIMITS = ImmutableQosLimitsConfig.builder().build(); + + @Value.Default + public int readBytesPerSecond() { + return Integer.MAX_VALUE; + } + + @Value.Default + public int writeBytesPerSecond() { + return Integer.MAX_VALUE; + } + +} diff --git a/qos-service-impl/src/main/java/com/palantir/atlasdb/qos/config/QosServiceRuntimeConfig.java b/qos-service-impl/src/main/java/com/palantir/atlasdb/qos/config/QosServiceRuntimeConfig.java index 0faa8222f34..8ce8305589d 100644 --- a/qos-service-impl/src/main/java/com/palantir/atlasdb/qos/config/QosServiceRuntimeConfig.java +++ b/qos-service-impl/src/main/java/com/palantir/atlasdb/qos/config/QosServiceRuntimeConfig.java @@ -27,5 +27,5 @@ @JsonSerialize(as = ImmutableQosServiceRuntimeConfig.class) @Value.Immutable public abstract class QosServiceRuntimeConfig { - public abstract Map clientLimits(); + public abstract Map clientLimits(); } diff --git a/qos-service-impl/src/main/java/com/palantir/atlasdb/qos/QosMetrics.java b/qos-service-impl/src/main/java/com/palantir/atlasdb/qos/metrics/QosMetrics.java similarity index 69% rename from qos-service-impl/src/main/java/com/palantir/atlasdb/qos/QosMetrics.java rename to qos-service-impl/src/main/java/com/palantir/atlasdb/qos/metrics/QosMetrics.java index d3d8f2f0573..d8ee536efb2 100644 --- a/qos-service-impl/src/main/java/com/palantir/atlasdb/qos/QosMetrics.java +++ b/qos-service-impl/src/main/java/com/palantir/atlasdb/qos/metrics/QosMetrics.java @@ -14,53 +14,51 @@ * limitations under the License. */ -package com.palantir.atlasdb.qos; +package com.palantir.atlasdb.qos.metrics; import com.codahale.metrics.Meter; +import com.palantir.atlasdb.qos.QueryWeight; import com.palantir.atlasdb.util.MetricsManager; +// TODO(nziebart): needs tests public class QosMetrics { private final MetricsManager metricsManager = new MetricsManager(); private final Meter readRequestCount; private final Meter bytesRead; private final Meter readTime; + private final Meter rowsRead; private final Meter writeRequestCount; private final Meter bytesWritten; private final Meter writeTime; + private final Meter rowsWritten; + public QosMetrics() { readRequestCount = metricsManager.registerMeter(QosMetrics.class, "numReadRequests"); bytesRead = metricsManager.registerMeter(QosMetrics.class, "bytesRead"); readTime = metricsManager.registerMeter(QosMetrics.class, "readTime"); + rowsRead = metricsManager.registerMeter(QosMetrics.class, "rowsRead"); writeRequestCount = metricsManager.registerMeter(QosMetrics.class, "numWriteRequests"); bytesWritten = metricsManager.registerMeter(QosMetrics.class, "bytesWritten"); writeTime = metricsManager.registerMeter(QosMetrics.class, "writeTime"); + rowsWritten = metricsManager.registerMeter(QosMetrics.class, "rowsWritten"); } - public void updateReadCount() { + public void recordRead(QueryWeight weight) { readRequestCount.mark(); + bytesRead.mark(weight.numBytes()); + readTime.mark(weight.timeTakenMicros()); + rowsRead.mark(weight.numDistinctRows()); } - public void updateWriteCount() { + public void recordWrite(QueryWeight weight) { writeRequestCount.mark(); + bytesWritten.mark(weight.numBytes()); + writeTime.mark(weight.timeTakenMicros()); + rowsWritten.mark(weight.numDistinctRows()); } - public void updateBytesRead(long numBytes) { - bytesRead.mark(numBytes); - } - - public void updateBytesWritten(long numBytes) { - bytesWritten.mark(numBytes); - } - - public void updateReadTimeMicros(long readTimeMicros) { - readTime.mark(readTimeMicros); - } - - public void updateWriteTimeMicros(long writeTimeMicros) { - readTime.mark(writeTimeMicros); - } } diff --git a/qos-service-impl/src/main/java/com/palantir/atlasdb/qos/ratelimit/QosRateLimiter.java b/qos-service-impl/src/main/java/com/palantir/atlasdb/qos/ratelimit/QosRateLimiter.java index 4112e03d9e1..299a417f371 100644 --- a/qos-service-impl/src/main/java/com/palantir/atlasdb/qos/ratelimit/QosRateLimiter.java +++ b/qos-service-impl/src/main/java/com/palantir/atlasdb/qos/ratelimit/QosRateLimiter.java @@ -21,6 +21,7 @@ import java.util.concurrent.TimeUnit; import com.google.common.annotations.VisibleForTesting; +import com.google.common.primitives.Ints; /** * A rate limiter for database queries, based on "units" of expense. This limiter strives to maintain an upper limit on @@ -34,27 +35,28 @@ public class QosRateLimiter { private static final double MAX_BURST_SECONDS = 5; private static final double UNLIMITED_RATE = Double.MAX_VALUE; - private static final int MAX_WAIT_TIME_SECONDS = 10; + private final long maxBackoffTimeMillis; private RateLimiter rateLimiter; - public static QosRateLimiter create() { - return new QosRateLimiter(RateLimiter.SleepingStopwatch.createFromSystemTimer()); + public static QosRateLimiter create(long maxBackoffTimeMillis) { + return new QosRateLimiter(RateLimiter.SleepingStopwatch.createFromSystemTimer(), maxBackoffTimeMillis); } @VisibleForTesting - QosRateLimiter(RateLimiter.SleepingStopwatch stopwatch) { + QosRateLimiter(RateLimiter.SleepingStopwatch stopwatch, long maxBackoffTimeMillis) { rateLimiter = new SmoothRateLimiter.SmoothBursty( stopwatch, MAX_BURST_SECONDS); rateLimiter.setRate(UNLIMITED_RATE); + this.maxBackoffTimeMillis = maxBackoffTimeMillis; } /** * Update the allowed rate, in units per second. */ - public void updateRate(int unitsPerSecond) { + public void updateRate(double unitsPerSecond) { rateLimiter.setRate(unitsPerSecond); } @@ -64,11 +66,11 @@ public void updateRate(int unitsPerSecond) { * * @return the amount of time slept for, if any */ - public Duration consumeWithBackoff(int estimatedNumUnits) { + public Duration consumeWithBackoff(long estimatedNumUnits) { Optional waitTime = rateLimiter.tryAcquire( - estimatedNumUnits, - MAX_WAIT_TIME_SECONDS, - TimeUnit.SECONDS); + Ints.saturatedCast(estimatedNumUnits), // TODO(nziebart): deal with longs + maxBackoffTimeMillis, + TimeUnit.MILLISECONDS); if (!waitTime.isPresent()) { throw new RuntimeException("rate limited"); @@ -78,13 +80,13 @@ public Duration consumeWithBackoff(int estimatedNumUnits) { } /** - * Records an adjustment to the original estimate of units consumed passed to {@link #consumeWithBackoff(int)}. This + * Records an adjustment to the original estimate of units consumed passed to {@link #consumeWithBackoff}. This * should be called after a query returns, when the exact number of units consumed is known. This value may be - * positive (if the original estimate was too small) or negative (if the original estimate was too large. + * positive (if the original estimate was too small) or negative (if the original estimate was too large). */ - public void recordAdjustment(int adjustmentUnits) { + public void recordAdjustment(long adjustmentUnits) { if (adjustmentUnits > 0) { - rateLimiter.steal(adjustmentUnits); + rateLimiter.steal(Ints.saturatedCast(adjustmentUnits)); // TODO(nziebart): deal with longs } // TODO(nziebart): handle negative case } diff --git a/qos-service-impl/src/main/java/com/palantir/atlasdb/qos/ratelimit/QosRateLimiters.java b/qos-service-impl/src/main/java/com/palantir/atlasdb/qos/ratelimit/QosRateLimiters.java new file mode 100644 index 00000000000..a60696dc649 --- /dev/null +++ b/qos-service-impl/src/main/java/com/palantir/atlasdb/qos/ratelimit/QosRateLimiters.java @@ -0,0 +1,43 @@ +/* + * Copyright 2017 Palantir Technologies, Inc. All rights reserved. + * + * Licensed under the BSD-3 License (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://opensource.org/licenses/BSD-3-Clause + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.palantir.atlasdb.qos.ratelimit; + +import org.immutables.value.Value; + +import com.palantir.atlasdb.qos.config.QosLimitsConfig; + +@Value.Immutable +public interface QosRateLimiters { + + static QosRateLimiters create(QosLimitsConfig config, long maxBackoffSleepTimeMillis) { + QosRateLimiter readLimiter = QosRateLimiter.create(maxBackoffSleepTimeMillis); + readLimiter.updateRate(config.readBytesPerSecond()); + + QosRateLimiter writeLimiter = QosRateLimiter.create(maxBackoffSleepTimeMillis); + writeLimiter.updateRate(config.writeBytesPerSecond()); + + return ImmutableQosRateLimiters.builder() + .read(readLimiter) + .write(writeLimiter) + .build(); + } + + QosRateLimiter read(); + + QosRateLimiter write(); + +} diff --git a/qos-service-impl/src/test/java/com/palantir/atlasdb/qos/AtlasDbQosClientTest.java b/qos-service-impl/src/test/java/com/palantir/atlasdb/qos/AtlasDbQosClientTest.java deleted file mode 100644 index f6f1ff61c74..00000000000 --- a/qos-service-impl/src/test/java/com/palantir/atlasdb/qos/AtlasDbQosClientTest.java +++ /dev/null @@ -1,103 +0,0 @@ -/* - * Copyright 2017 Palantir Technologies, Inc. All rights reserved. - * - * Licensed under the BSD-3 License (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://opensource.org/licenses/BSD-3-Clause - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.palantir.atlasdb.qos; - -import static org.assertj.core.api.Assertions.assertThatThrownBy; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.verifyNoMoreInteractions; -import static org.mockito.Mockito.when; - -import org.junit.Before; -import org.junit.Test; - -import com.google.common.base.Ticker; -import com.palantir.atlasdb.qos.client.AtlasDbQosClient; -import com.palantir.atlasdb.qos.ratelimit.QosRateLimiter; - -public class AtlasDbQosClientTest { - - private static final int ESTIMATED_BYTES = 10; - private static final int ACTUAL_BYTES = 51; - private static final long START_NANOS = 1100L; - private static final long END_NANOS = 5500L; - private static final long TOTAL_TIME_MICROS = 4; - - private QosService qosService = mock(QosService.class); - private QosRateLimiter rateLimiter = mock(QosRateLimiter.class); - private QosMetrics metrics = mock(QosMetrics.class); - private Ticker ticker = mock(Ticker.class); - - private AtlasDbQosClient qosClient = new AtlasDbQosClient(rateLimiter, metrics, ticker); - - @Before - public void setUp() { - when(qosService.getLimit("test-client")).thenReturn(100); - - when(ticker.read()).thenReturn(START_NANOS).thenReturn(END_NANOS); - } - - @Test - public void consumesSpecifiedNumUnitsForReads() { - qosClient.executeRead(() -> ESTIMATED_BYTES, () -> "foo", ignored -> ACTUAL_BYTES); - - verify(rateLimiter).consumeWithBackoff(ESTIMATED_BYTES); - verify(rateLimiter).recordAdjustment(ACTUAL_BYTES - ESTIMATED_BYTES); - verifyNoMoreInteractions(rateLimiter); - } - - @Test - public void recordsReadMetrics() throws TestCheckedException { - qosClient.executeRead(() -> ESTIMATED_BYTES, () -> "foo", ignored -> ACTUAL_BYTES); - - verify(metrics).updateReadCount(); - verify(metrics).updateBytesRead(ACTUAL_BYTES); - verify(metrics).updateReadTimeMicros(TOTAL_TIME_MICROS); - verifyNoMoreInteractions(metrics); - } - - @Test - public void consumesSpecifiedNumUnitsForWrites() { - qosClient.executeWrite(() -> ACTUAL_BYTES, () -> { }); - - verify(rateLimiter).consumeWithBackoff(ACTUAL_BYTES); - verifyNoMoreInteractions(rateLimiter); - } - - @Test - public void recordsWriteMetrics() throws TestCheckedException { - qosClient.executeWrite(() -> ACTUAL_BYTES, () -> { }); - - verify(metrics).updateWriteCount(); - verify(metrics).updateBytesWritten(ACTUAL_BYTES); - verify(metrics).updateWriteTimeMicros(TOTAL_TIME_MICROS); - verifyNoMoreInteractions(metrics); - } - - @Test - public void propagatesCheckedExceptions() throws TestCheckedException { - assertThatThrownBy(() -> qosClient.executeRead(() -> 1, () -> { - throw new TestCheckedException(); - }, ignored -> 1)).isInstanceOf(TestCheckedException.class); - - assertThatThrownBy(() -> qosClient.executeWrite(() -> 1, () -> { - throw new TestCheckedException(); - })).isInstanceOf(TestCheckedException.class); - } - - static class TestCheckedException extends Exception { } -} diff --git a/qos-service-impl/src/test/java/com/palantir/atlasdb/qos/QosClientConfigDeserializationTest.java b/qos-service-impl/src/test/java/com/palantir/atlasdb/qos/QosClientConfigDeserializationTest.java new file mode 100644 index 00000000000..1edfe1e9b16 --- /dev/null +++ b/qos-service-impl/src/test/java/com/palantir/atlasdb/qos/QosClientConfigDeserializationTest.java @@ -0,0 +1,68 @@ +/* + * Copyright 2017 Palantir Technologies, Inc. All rights reserved. + * + * Licensed under the BSD-3 License (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://opensource.org/licenses/BSD-3-Clause + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.palantir.atlasdb.qos; + +import static org.assertj.core.api.Assertions.assertThat; + +import java.io.File; +import java.io.IOException; +import java.nio.file.Paths; + +import org.junit.Test; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.dataformat.yaml.YAMLFactory; +import com.fasterxml.jackson.datatype.guava.GuavaModule; +import com.fasterxml.jackson.datatype.jdk8.Jdk8Module; +import com.palantir.atlasdb.qos.config.ImmutableQosClientConfig; +import com.palantir.atlasdb.qos.config.ImmutableQosLimitsConfig; +import com.palantir.atlasdb.qos.config.QosClientConfig; +import com.palantir.atlasdb.qos.config.QosServiceRuntimeConfig; +import com.palantir.remoting.api.config.service.HumanReadableDuration; +import com.palantir.remoting.api.config.service.ServiceConfiguration; +import com.palantir.remoting.api.config.ssl.SslConfiguration; +import com.palantir.remoting3.ext.jackson.ShimJdk7Module; + +public class QosClientConfigDeserializationTest { + + private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(new YAMLFactory()) + .registerModule(new GuavaModule()) + .registerModule(new ShimJdk7Module()) + .registerModule(new Jdk8Module()); + + @Test + public void canDeserializeFromYaml() throws IOException { + QosClientConfig expected = ImmutableQosClientConfig.builder() + .qosService( + ServiceConfiguration.builder() + .addUris("http://localhost:8080") + .security(SslConfiguration.of(Paths.get("trustStore.jks"))) + .build()) + .maxBackoffSleepTime(HumanReadableDuration.seconds(20)) + .limits(ImmutableQosLimitsConfig.builder() + .readBytesPerSecond(123) + .writeBytesPerSecond(456) + .build()) + .build(); + + File configFile = new File(QosServiceRuntimeConfig.class.getResource("/qos-client.yml").getPath()); + QosClientConfig config = OBJECT_MAPPER.readValue(configFile, QosClientConfig.class); + + assertThat(config).isEqualTo(expected); + } + +} diff --git a/qos-service-impl/src/test/java/com/palantir/atlasdb/qos/QosRuntimeConfigDeserializationTest.java b/qos-service-impl/src/test/java/com/palantir/atlasdb/qos/QosRuntimeConfigDeserializationTest.java index 2b7bb0449ae..9edcedafa2b 100644 --- a/qos-service-impl/src/test/java/com/palantir/atlasdb/qos/QosRuntimeConfigDeserializationTest.java +++ b/qos-service-impl/src/test/java/com/palantir/atlasdb/qos/QosRuntimeConfigDeserializationTest.java @@ -37,11 +37,11 @@ public class QosRuntimeConfigDeserializationTest { @Test public void canDeserializeQosServerConfiguration() throws IOException { - File testConfigFile = new File(QosServiceRuntimeConfig.class.getResource("/qos.yml").getPath()); + File testConfigFile = new File(QosServiceRuntimeConfig.class.getResource("/qos-server.yml").getPath()); QosServiceRuntimeConfig configuration = OBJECT_MAPPER.readValue(testConfigFile, QosServiceRuntimeConfig.class); assertThat(configuration).isEqualTo(ImmutableQosServiceRuntimeConfig.builder() - .clientLimits(ImmutableMap.of("test", 10, "test2", 20)) + .clientLimits(ImmutableMap.of("test", 10L, "test2", 20L)) .build()); } } diff --git a/qos-service-impl/src/test/java/com/palantir/atlasdb/qos/QosServiceRuntimeConfigTest.java b/qos-service-impl/src/test/java/com/palantir/atlasdb/qos/QosServiceRuntimeConfigTest.java index 129ac04db62..c5c81c11e23 100644 --- a/qos-service-impl/src/test/java/com/palantir/atlasdb/qos/QosServiceRuntimeConfigTest.java +++ b/qos-service-impl/src/test/java/com/palantir/atlasdb/qos/QosServiceRuntimeConfigTest.java @@ -30,14 +30,14 @@ public void canBuildFromEmptyClientLimits() { @Test public void canBuildFromSingleClientLimit() { ImmutableQosServiceRuntimeConfig.builder() - .clientLimits(ImmutableMap.of("test_client", 10)) + .clientLimits(ImmutableMap.of("test_client", 10L)) .build(); } @Test public void canBuildFromMultipleClientLimits() { ImmutableQosServiceRuntimeConfig.builder() - .clientLimits(ImmutableMap.of("test_client", 10, "test_client2", 100)) + .clientLimits(ImmutableMap.of("test_client", 10L, "test_client2", 100L)) .build(); } } diff --git a/qos-service-impl/src/test/java/com/palantir/atlasdb/qos/QosServiceTest.java b/qos-service-impl/src/test/java/com/palantir/atlasdb/qos/QosServiceTest.java index 15fdeb1b64b..006cafef984 100644 --- a/qos-service-impl/src/test/java/com/palantir/atlasdb/qos/QosServiceTest.java +++ b/qos-service-impl/src/test/java/com/palantir/atlasdb/qos/QosServiceTest.java @@ -39,20 +39,20 @@ public class QosServiceTest { public void defaultsToNoLimit() { when(config.get()).thenReturn(configWithLimits(ImmutableMap.of())); - assertThat(resource.getLimit("foo")).isEqualTo(Integer.MAX_VALUE); + assertThat(resource.getLimit("foo")).isEqualTo(Long.MAX_VALUE); } @Test public void canLiveReloadLimits() { when(config.get()) - .thenReturn(configWithLimits(ImmutableMap.of("foo", 10))) - .thenReturn(configWithLimits(ImmutableMap.of("foo", 20))); + .thenReturn(configWithLimits(ImmutableMap.of("foo", 10L))) + .thenReturn(configWithLimits(ImmutableMap.of("foo", 20L))); - assertEquals(10, resource.getLimit("foo")); - assertEquals(20, resource.getLimit("foo")); + assertEquals(10L, resource.getLimit("foo")); + assertEquals(20L, resource.getLimit("foo")); } - private QosServiceRuntimeConfig configWithLimits(Map limits) { + private QosServiceRuntimeConfig configWithLimits(Map limits) { return ImmutableQosServiceRuntimeConfig.builder().clientLimits(limits).build(); } } diff --git a/qos-service-impl/src/test/java/com/palantir/atlasdb/qos/client/AtlasDbQosClientTest.java b/qos-service-impl/src/test/java/com/palantir/atlasdb/qos/client/AtlasDbQosClientTest.java new file mode 100644 index 00000000000..b59df46e598 --- /dev/null +++ b/qos-service-impl/src/test/java/com/palantir/atlasdb/qos/client/AtlasDbQosClientTest.java @@ -0,0 +1,131 @@ +/* + * Copyright 2017 Palantir Technologies, Inc. All rights reserved. + * + * Licensed under the BSD-3 License (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://opensource.org/licenses/BSD-3-Clause + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.palantir.atlasdb.qos.client; + +import static org.assertj.core.api.Assertions.assertThatThrownBy; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyLong; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoMoreInteractions; +import static org.mockito.Mockito.when; + +import org.junit.Before; +import org.junit.Test; + +import com.google.common.base.Ticker; +import com.palantir.atlasdb.qos.ImmutableQueryWeight; +import com.palantir.atlasdb.qos.QosClient; +import com.palantir.atlasdb.qos.QueryWeight; +import com.palantir.atlasdb.qos.metrics.QosMetrics; +import com.palantir.atlasdb.qos.ratelimit.ImmutableQosRateLimiters; +import com.palantir.atlasdb.qos.ratelimit.QosRateLimiter; +import com.palantir.atlasdb.qos.ratelimit.QosRateLimiters; + +public class AtlasDbQosClientTest { + + private static final int ESTIMATED_BYTES = 10; + private static final int ACTUAL_BYTES = 51; + private static final long START_NANOS = 1100L; + private static final long END_NANOS = 5500L; + private static final long TOTAL_NANOS = END_NANOS - START_NANOS; + + private static final QueryWeight ESTIMATED_WEIGHT = ImmutableQueryWeight.builder() + .numBytes(ESTIMATED_BYTES) + .numDistinctRows(1) + .timeTakenNanos((int) TOTAL_NANOS) + .build(); + + private static final QueryWeight ACTUAL_WEIGHT = ImmutableQueryWeight.builder() + .numBytes(ACTUAL_BYTES) + .numDistinctRows(10) + .timeTakenNanos((int) TOTAL_NANOS) + .build(); + + private QosClient.QueryWeigher weigher = mock(QosClient.QueryWeigher.class); + + private QosRateLimiter readLimiter = mock(QosRateLimiter.class); + private QosRateLimiter writeLimiter = mock(QosRateLimiter.class); + private QosRateLimiters rateLimiters = ImmutableQosRateLimiters.builder() + .read(readLimiter).write(writeLimiter).build(); + private QosMetrics metrics = mock(QosMetrics.class); + private Ticker ticker = mock(Ticker.class); + + private AtlasDbQosClient qosClient = new AtlasDbQosClient(rateLimiters, metrics, ticker); + + @Before + public void setUp() { + when(ticker.read()).thenReturn(START_NANOS).thenReturn(END_NANOS); + + when(weigher.estimate()).thenReturn(ESTIMATED_WEIGHT); + when(weigher.weigh(any(), anyLong())).thenReturn(ACTUAL_WEIGHT); + } + + @Test + public void consumesSpecifiedNumUnitsForReads() { + qosClient.executeRead(() -> "foo", weigher); + + verify(readLimiter).consumeWithBackoff(ESTIMATED_BYTES); + verify(readLimiter).recordAdjustment(ACTUAL_BYTES - ESTIMATED_BYTES); + verifyNoMoreInteractions(readLimiter, writeLimiter); + } + + @Test + public void recordsReadMetrics() throws TestCheckedException { + qosClient.executeRead(() -> "foo", weigher); + + verify(metrics).recordRead(ACTUAL_WEIGHT); + verifyNoMoreInteractions(metrics); + } + + @Test + public void passesResultAndTimeToReadWeigher() throws TestCheckedException { + qosClient.executeRead(() -> "foo", weigher); + + verify(weigher).weigh("foo", TOTAL_NANOS); + } + + @Test + public void consumesSpecifiedNumUnitsForWrites() { + qosClient.executeWrite(() -> { }, weigher); + + verify(writeLimiter).consumeWithBackoff(ESTIMATED_BYTES); + verify(writeLimiter).recordAdjustment(ACTUAL_BYTES - ESTIMATED_BYTES); + verifyNoMoreInteractions(readLimiter, writeLimiter); + } + + @Test + public void recordsWriteMetrics() throws TestCheckedException { + qosClient.executeWrite(() -> { }, weigher); + + verify(metrics).recordWrite(ACTUAL_WEIGHT); + verifyNoMoreInteractions(metrics); + } + + @Test + public void propagatesCheckedExceptions() throws TestCheckedException { + assertThatThrownBy(() -> qosClient.executeRead(() -> { + throw new TestCheckedException(); + }, weigher)).isInstanceOf(TestCheckedException.class); + + assertThatThrownBy(() -> qosClient.executeWrite(() -> { + throw new TestCheckedException(); + }, weigher)).isInstanceOf(TestCheckedException.class); + } + + static class TestCheckedException extends Exception {} +} diff --git a/qos-service-impl/src/test/java/com/palantir/atlasdb/qos/ratelimit/QosRateLimiterTest.java b/qos-service-impl/src/test/java/com/palantir/atlasdb/qos/ratelimit/QosRateLimiterTest.java index 8d88ad58b21..c68278e43d3 100644 --- a/qos-service-impl/src/test/java/com/palantir/atlasdb/qos/ratelimit/QosRateLimiterTest.java +++ b/qos-service-impl/src/test/java/com/palantir/atlasdb/qos/ratelimit/QosRateLimiterTest.java @@ -30,9 +30,10 @@ public class QosRateLimiterTest { private static final long START_TIME_MICROS = 0L; + private static final long MAX_BACKOFF_TIME_MILLIS = 10_000; RateLimiter.SleepingStopwatch stopwatch = mock(RateLimiter.SleepingStopwatch.class); - QosRateLimiter limiter = new QosRateLimiter(stopwatch); + QosRateLimiter limiter = new QosRateLimiter(stopwatch, MAX_BACKOFF_TIME_MILLIS); @Before public void before() { @@ -63,6 +64,15 @@ public void limitsByThrowingIfSleepTimeIsTooGreat() { .hasMessageContaining("rate limited"); } + @Test + public void doesNotThrowIfMaxBackoffTimeIsVeryLarge() { + QosRateLimiter limiterWithLargeBackoffLimit = new QosRateLimiter(stopwatch, Long.MAX_VALUE); + limiterWithLargeBackoffLimit.updateRate(10); + + limiterWithLargeBackoffLimit.consumeWithBackoff(1_000_000_000); + limiterWithLargeBackoffLimit.consumeWithBackoff(1_000_000_000); + } + @Test public void consumingAdditionalUnitsPenalizesFutureCallers() { limiter.updateRate(10); diff --git a/qos-service-impl/src/test/resources/qos-client.yml b/qos-service-impl/src/test/resources/qos-client.yml new file mode 100644 index 00000000000..163da250c04 --- /dev/null +++ b/qos-service-impl/src/test/resources/qos-client.yml @@ -0,0 +1,10 @@ +qosService: + uris: + - http://localhost:8080 + security: + trustStorePath: trustStore.jks +maxBackoffSleepTime: 20 seconds +limits: + readBytesPerSecond: 123 + writeBytesPerSecond: 456 + diff --git a/qos-service-impl/src/test/resources/qos.yml b/qos-service-impl/src/test/resources/qos-server.yml similarity index 100% rename from qos-service-impl/src/test/resources/qos.yml rename to qos-service-impl/src/test/resources/qos-server.yml