From 7665ace51d4d9eb2eea80c1269486bc5481e5a01 Mon Sep 17 00:00:00 2001 From: Nathan Ziebart Date: Fri, 17 Nov 2017 16:00:36 +0000 Subject: [PATCH] cas metrics --- .../cassandra/qos/QosCassandraClient.java | 12 ++++-- .../cassandra/qos/ThriftObjectSizeUtils.java | 6 +++ .../cassandra/qos/ThriftQueryWeighers.java | 9 ++++ .../atlasdb/ThriftObjectSizeUtilsTest.java | 10 +++++ .../qos/ThriftQueryWeighersTest.java | 19 +++++++-- .../palantir/atlasdb/qos/FakeQosClient.java | 9 ++-- .../com/palantir/atlasdb/qos/QosClient.java | 15 +++---- .../atlasdb/qos/client/AtlasDbQosClient.java | 41 +++++++++---------- .../qos/client/AtlasDbQosClientTest.java | 4 +- 9 files changed, 81 insertions(+), 44 deletions(-) diff --git a/atlasdb-cassandra/src/main/java/com/palantir/atlasdb/keyvalue/cassandra/qos/QosCassandraClient.java b/atlasdb-cassandra/src/main/java/com/palantir/atlasdb/keyvalue/cassandra/qos/QosCassandraClient.java index be0a912be7f..cf15f923cec 100644 --- a/atlasdb-cassandra/src/main/java/com/palantir/atlasdb/keyvalue/cassandra/qos/QosCassandraClient.java +++ b/atlasdb-cassandra/src/main/java/com/palantir/atlasdb/keyvalue/cassandra/qos/QosCassandraClient.java @@ -86,7 +86,10 @@ public void batch_mutate(String kvsMethodName, Map client.batch_mutate(kvsMethodName, mutation_map, consistency_level), + () -> { + client.batch_mutate(kvsMethodName, mutation_map, consistency_level); + return null; + }, ThriftQueryWeighers.batchMutate(mutation_map)); } @@ -103,9 +106,10 @@ public ColumnOrSuperColumn get(TableReference tableReference, ByteBuffer key, by public CASResult cas(TableReference tableReference, ByteBuffer key, List expected, List updates, ConsistencyLevel serial_consistency_level, ConsistencyLevel commit_consistency_level) throws InvalidRequestException, UnavailableException, TimedOutException, TException { - // TODO(nziebart): should this be considered as a write or do we need to treat is as both read and write? - return client.cas(tableReference, key, expected, updates, serial_consistency_level, - commit_consistency_level); + return qosClient.executeWrite( + () -> client.cas(tableReference, key, expected, updates, serial_consistency_level, + commit_consistency_level), + ThriftQueryWeighers.cas(updates)); } @Override diff --git a/atlasdb-cassandra/src/main/java/com/palantir/atlasdb/keyvalue/cassandra/qos/ThriftObjectSizeUtils.java b/atlasdb-cassandra/src/main/java/com/palantir/atlasdb/keyvalue/cassandra/qos/ThriftObjectSizeUtils.java index 42afbd151a6..85146ed117e 100644 --- a/atlasdb-cassandra/src/main/java/com/palantir/atlasdb/keyvalue/cassandra/qos/ThriftObjectSizeUtils.java +++ b/atlasdb-cassandra/src/main/java/com/palantir/atlasdb/keyvalue/cassandra/qos/ThriftObjectSizeUtils.java @@ -64,6 +64,12 @@ public static long getApproximateReadByteCount(List slices) { return getCollectionSize(slices, ThriftObjectSizeUtils::getKeySliceSize); } + public static long getCasByteCount(List updates) { + // TODO(nziebart): CAS actually writes more bytes than this, because the associated Paxos negotations must + // be persisted + return getCollectionSize(updates, ThriftObjectSizeUtils::getColumnSize); + } + public static long getColumnOrSuperColumnSize(ColumnOrSuperColumn columnOrSuperColumn) { if (columnOrSuperColumn == null) { return getNullSize(); diff --git a/atlasdb-cassandra/src/main/java/com/palantir/atlasdb/keyvalue/cassandra/qos/ThriftQueryWeighers.java b/atlasdb-cassandra/src/main/java/com/palantir/atlasdb/keyvalue/cassandra/qos/ThriftQueryWeighers.java index c9a018d9e69..9ba4503b4d3 100644 --- a/atlasdb-cassandra/src/main/java/com/palantir/atlasdb/keyvalue/cassandra/qos/ThriftQueryWeighers.java +++ b/atlasdb-cassandra/src/main/java/com/palantir/atlasdb/keyvalue/cassandra/qos/ThriftQueryWeighers.java @@ -23,6 +23,8 @@ import java.util.function.Function; import java.util.function.Supplier; +import org.apache.cassandra.thrift.CASResult; +import org.apache.cassandra.thrift.Column; import org.apache.cassandra.thrift.ColumnOrSuperColumn; import org.apache.cassandra.thrift.CqlResult; import org.apache.cassandra.thrift.KeySlice; @@ -69,6 +71,13 @@ public static QosClient.QueryWeigher batchMutate( return writeWeigher(numRows, () -> ThriftObjectSizeUtils.getApproximateWriteByteCount(mutationMap)); } + public static QosClient.QueryWeigher cas(List updates) { + // TODO(nziebart): technically CAS involves both reads and writes; currently we are just counting it as a read + // Also, it should probably be counted as multiple rows, since Paxos negotiations trigger serial writes + long numRows = 1; + return writeWeigher(numRows, () -> ThriftObjectSizeUtils.getCasByteCount(updates)); + } + public static QosClient.QueryWeigher readWeigher(Function bytesRead, Function numRows) { return new QosClient.QueryWeigher() { @Override diff --git a/atlasdb-cassandra/src/test/java/com/palantir/atlasdb/ThriftObjectSizeUtilsTest.java b/atlasdb-cassandra/src/test/java/com/palantir/atlasdb/ThriftObjectSizeUtilsTest.java index a55b655be25..3018385043f 100644 --- a/atlasdb-cassandra/src/test/java/com/palantir/atlasdb/ThriftObjectSizeUtilsTest.java +++ b/atlasdb-cassandra/src/test/java/com/palantir/atlasdb/ThriftObjectSizeUtilsTest.java @@ -247,7 +247,17 @@ public void getKeySlicesSize() { + EMPTY_COLUMN_OR_SUPERCOLUMN_SIZE; assertThat(ThriftObjectSizeUtils.getApproximateReadByteCount(slices)).isEqualTo(expectedSize); + } + + @Test + public void getCasSize() { + List columns = ImmutableList.of( + TEST_COLUMN, + TEST_COLUMN); + + long expectedSize = TEST_COLUMN_SIZE * 2; + assertThat(ThriftObjectSizeUtils.getCasByteCount(columns)).isEqualTo(expectedSize); } } diff --git a/atlasdb-cassandra/src/test/java/com/palantir/atlasdb/keyvalue/cassandra/qos/ThriftQueryWeighersTest.java b/atlasdb-cassandra/src/test/java/com/palantir/atlasdb/keyvalue/cassandra/qos/ThriftQueryWeighersTest.java index c19d01e508d..6f09405b058 100644 --- a/atlasdb-cassandra/src/test/java/com/palantir/atlasdb/keyvalue/cassandra/qos/ThriftQueryWeighersTest.java +++ b/atlasdb-cassandra/src/test/java/com/palantir/atlasdb/keyvalue/cassandra/qos/ThriftQueryWeighersTest.java @@ -22,6 +22,8 @@ import java.util.List; import java.util.Map; +import org.apache.cassandra.thrift.CASResult; +import org.apache.cassandra.thrift.Column; import org.apache.cassandra.thrift.ColumnOrSuperColumn; import org.apache.cassandra.thrift.CqlResult; import org.apache.cassandra.thrift.KeySlice; @@ -35,7 +37,8 @@ public class ThriftQueryWeighersTest { private static final ByteBuffer BYTES1 = ByteBuffer.allocate(3); private static final ByteBuffer BYTES2 = ByteBuffer.allocate(7); - private static final ColumnOrSuperColumn COLUMN = new ColumnOrSuperColumn(); + private static final ColumnOrSuperColumn COLUMN_OR_SUPER = new ColumnOrSuperColumn(); + private static final Column COLUMN = new Column(); private static final KeySlice KEY_SLICE = new KeySlice(); private static final Mutation MUTATION = new Mutation(); @@ -44,8 +47,8 @@ public class ThriftQueryWeighersTest { @Test public void multigetSliceWeigherReturnsCorrectNumRows() { Map> result = ImmutableMap.of( - BYTES1, ImmutableList.of(COLUMN, COLUMN), - BYTES2, ImmutableList.of(COLUMN)); + BYTES1, ImmutableList.of(COLUMN_OR_SUPER, COLUMN_OR_SUPER), + BYTES2, ImmutableList.of(COLUMN_OR_SUPER)); long actualNumRows = ThriftQueryWeighers.MULTIGET_SLICE.weigh(result, UNIMPORTANT_ARG).numDistinctRows(); @@ -63,7 +66,7 @@ public void rangeSlicesWeigherReturnsCorrectNumRows() { @Test public void getWeigherReturnsCorrectNumRows() { - long actualNumRows = ThriftQueryWeighers.GET.weigh(COLUMN, UNIMPORTANT_ARG).numDistinctRows(); + long actualNumRows = ThriftQueryWeighers.GET.weigh(COLUMN_OR_SUPER, UNIMPORTANT_ARG).numDistinctRows(); assertThat(actualNumRows).isEqualTo(1); } @@ -76,6 +79,14 @@ public void executeCql3QueryWeigherReturnsOneRowAlways() { assertThat(actualNumRows).isEqualTo(1); } + @Test + public void casQueryWeigherReturnsOneRowAlways() { + long actualNumRows = ThriftQueryWeighers.cas(ImmutableList.of(COLUMN, COLUMN)).weigh(new CASResult(true), + UNIMPORTANT_ARG).numDistinctRows(); + + assertThat(actualNumRows).isEqualTo(1); + } + @Test public void batchMutateWeigherReturnsCorrectNumRows() { Map>> mutations = ImmutableMap.of( diff --git a/qos-service-api/src/main/java/com/palantir/atlasdb/qos/FakeQosClient.java b/qos-service-api/src/main/java/com/palantir/atlasdb/qos/FakeQosClient.java index 7aaa05293e0..7ab0ab4a89d 100644 --- a/qos-service-api/src/main/java/com/palantir/atlasdb/qos/FakeQosClient.java +++ b/qos-service-api/src/main/java/com/palantir/atlasdb/qos/FakeQosClient.java @@ -21,14 +21,15 @@ public class FakeQosClient implements QosClient { public static final FakeQosClient INSTANCE = new FakeQosClient(); @Override - public T executeRead(ReadQuery query, QueryWeigher weigher) + public T executeRead(Query query, QueryWeigher weigher) throws E { return query.execute(); } @Override - public void executeWrite(WriteQuery query, - QueryWeigher weigher) throws E { - query.execute(); + public T executeWrite(Query query, QueryWeigher weigher) + throws E { + return query.execute(); } + } diff --git a/qos-service-api/src/main/java/com/palantir/atlasdb/qos/QosClient.java b/qos-service-api/src/main/java/com/palantir/atlasdb/qos/QosClient.java index 656be55b66b..775448b96a8 100644 --- a/qos-service-api/src/main/java/com/palantir/atlasdb/qos/QosClient.java +++ b/qos-service-api/src/main/java/com/palantir/atlasdb/qos/QosClient.java @@ -18,24 +18,21 @@ public interface QosClient { - interface ReadQuery { + interface Query { T execute() throws E; } - interface WriteQuery { - void execute() throws E; - } - interface QueryWeigher { QueryWeight estimate(); QueryWeight weigh(T result, long timeTakenNanos); } T executeRead( - ReadQuery query, + Query query, + QueryWeigher weigher) throws E; + + T executeWrite( + Query query, QueryWeigher weigher) throws E; - void executeWrite( - WriteQuery query, - QueryWeigher weigher) throws E; } diff --git a/qos-service-impl/src/main/java/com/palantir/atlasdb/qos/client/AtlasDbQosClient.java b/qos-service-impl/src/main/java/com/palantir/atlasdb/qos/client/AtlasDbQosClient.java index f1832ba2199..6732368940f 100644 --- a/qos-service-impl/src/main/java/com/palantir/atlasdb/qos/client/AtlasDbQosClient.java +++ b/qos-service-impl/src/main/java/com/palantir/atlasdb/qos/client/AtlasDbQosClient.java @@ -15,6 +15,8 @@ */ package com.palantir.atlasdb.qos.client; +import java.util.function.Consumer; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -23,14 +25,13 @@ import com.palantir.atlasdb.qos.QosClient; import com.palantir.atlasdb.qos.QueryWeight; import com.palantir.atlasdb.qos.metrics.QosMetrics; +import com.palantir.atlasdb.qos.ratelimit.QosRateLimiter; import com.palantir.atlasdb.qos.ratelimit.QosRateLimiters; public class AtlasDbQosClient implements QosClient { private static final Logger log = LoggerFactory.getLogger(AtlasDbQosClient.class); - private static final Void NO_RESULT = null; - private final QosRateLimiters rateLimiters; private final QosMetrics metrics; private final Ticker ticker; @@ -47,9 +48,22 @@ public static AtlasDbQosClient create(QosRateLimiters rateLimiters) { } @Override - public T executeRead(ReadQuery query, QueryWeigher weigher) throws E { + public T executeRead(Query query, QueryWeigher weigher) throws E { + return execute(query, weigher, rateLimiters.read(), metrics::recordRead); + } + + @Override + public T executeWrite(Query query, QueryWeigher weigher) throws E { + return execute(query, weigher, rateLimiters.write(), metrics::recordWrite); + } + + private T execute( + Query query, + QueryWeigher weigher, + QosRateLimiter rateLimiter, + Consumer weightMetric) throws E { long estimatedNumBytes = weigher.estimate().numBytes(); - rateLimiters.read().consumeWithBackoff(estimatedNumBytes); + rateLimiter.consumeWithBackoff(estimatedNumBytes); // TODO(nziebart): decide what to do if we encounter a timeout exception long startTimeNanos = ticker.read(); @@ -57,25 +71,10 @@ public T executeRead(ReadQuery query, QueryWeighe long totalTimeNanos = ticker.read() - startTimeNanos; QueryWeight actualWeight = weigher.weigh(result, totalTimeNanos); - metrics.recordRead(actualWeight); - rateLimiters.read().recordAdjustment(actualWeight.numBytes() - estimatedNumBytes); + weightMetric.accept(actualWeight); + rateLimiter.recordAdjustment(actualWeight.numBytes() - estimatedNumBytes); return result; } - @Override - public void executeWrite(WriteQuery query, QueryWeigher weigher) throws E { - long estimatedNumBytes = weigher.estimate().numBytes(); - rateLimiters.write().consumeWithBackoff(estimatedNumBytes); - - // TODO(nziebart): decide what to do if we encounter a timeout exception - long startTimeNanos = ticker.read(); - query.execute(); - long totalTimeNanos = ticker.read() - startTimeNanos; - - QueryWeight actualWeight = weigher.weigh(NO_RESULT, totalTimeNanos); - metrics.recordWrite(actualWeight); - rateLimiters.write().recordAdjustment(actualWeight.numBytes() - estimatedNumBytes); - } - } diff --git a/qos-service-impl/src/test/java/com/palantir/atlasdb/qos/client/AtlasDbQosClientTest.java b/qos-service-impl/src/test/java/com/palantir/atlasdb/qos/client/AtlasDbQosClientTest.java index b59df46e598..fc5d6f94791 100644 --- a/qos-service-impl/src/test/java/com/palantir/atlasdb/qos/client/AtlasDbQosClientTest.java +++ b/qos-service-impl/src/test/java/com/palantir/atlasdb/qos/client/AtlasDbQosClientTest.java @@ -101,7 +101,7 @@ public void passesResultAndTimeToReadWeigher() throws TestCheckedException { @Test public void consumesSpecifiedNumUnitsForWrites() { - qosClient.executeWrite(() -> { }, weigher); + qosClient.executeWrite(() -> null, weigher); verify(writeLimiter).consumeWithBackoff(ESTIMATED_BYTES); verify(writeLimiter).recordAdjustment(ACTUAL_BYTES - ESTIMATED_BYTES); @@ -110,7 +110,7 @@ public void consumesSpecifiedNumUnitsForWrites() { @Test public void recordsWriteMetrics() throws TestCheckedException { - qosClient.executeWrite(() -> { }, weigher); + qosClient.executeWrite(() -> null, weigher); verify(metrics).recordWrite(ACTUAL_WEIGHT); verifyNoMoreInteractions(metrics);