Skip to content
This repository has been archived by the owner on Nov 14, 2024. It is now read-only.

Commit

Permalink
cas metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
Nathan Ziebart committed Nov 17, 2017
1 parent b1251be commit 7665ace
Show file tree
Hide file tree
Showing 9 changed files with 81 additions and 44 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,10 @@ public void batch_mutate(String kvsMethodName, Map<ByteBuffer, Map<String, List<
ConsistencyLevel consistency_level)
throws InvalidRequestException, UnavailableException, TimedOutException, TException {
qosClient.executeWrite(
() -> client.batch_mutate(kvsMethodName, mutation_map, consistency_level),
() -> {
client.batch_mutate(kvsMethodName, mutation_map, consistency_level);
return null;
},
ThriftQueryWeighers.batchMutate(mutation_map));
}

Expand All @@ -103,9 +106,10 @@ public ColumnOrSuperColumn get(TableReference tableReference, ByteBuffer key, by
public CASResult cas(TableReference tableReference, ByteBuffer key, List<Column> expected, List<Column> updates,
ConsistencyLevel serial_consistency_level, ConsistencyLevel commit_consistency_level)
throws InvalidRequestException, UnavailableException, TimedOutException, TException {
// TODO(nziebart): should this be considered as a write or do we need to treat is as both read and write?
return client.cas(tableReference, key, expected, updates, serial_consistency_level,
commit_consistency_level);
return qosClient.executeWrite(
() -> client.cas(tableReference, key, expected, updates, serial_consistency_level,
commit_consistency_level),
ThriftQueryWeighers.cas(updates));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,12 @@ public static long getApproximateReadByteCount(List<KeySlice> slices) {
return getCollectionSize(slices, ThriftObjectSizeUtils::getKeySliceSize);
}

public static long getCasByteCount(List<Column> updates) {
// TODO(nziebart): CAS actually writes more bytes than this, because the associated Paxos negotations must
// be persisted
return getCollectionSize(updates, ThriftObjectSizeUtils::getColumnSize);
}

public static long getColumnOrSuperColumnSize(ColumnOrSuperColumn columnOrSuperColumn) {
if (columnOrSuperColumn == null) {
return getNullSize();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
import java.util.function.Function;
import java.util.function.Supplier;

import org.apache.cassandra.thrift.CASResult;
import org.apache.cassandra.thrift.Column;
import org.apache.cassandra.thrift.ColumnOrSuperColumn;
import org.apache.cassandra.thrift.CqlResult;
import org.apache.cassandra.thrift.KeySlice;
Expand Down Expand Up @@ -69,6 +71,13 @@ public static QosClient.QueryWeigher<Void> batchMutate(
return writeWeigher(numRows, () -> ThriftObjectSizeUtils.getApproximateWriteByteCount(mutationMap));
}

public static QosClient.QueryWeigher<CASResult> cas(List<Column> updates) {
// TODO(nziebart): technically CAS involves both reads and writes; currently we are just counting it as a read
// Also, it should probably be counted as multiple rows, since Paxos negotiations trigger serial writes
long numRows = 1;
return writeWeigher(numRows, () -> ThriftObjectSizeUtils.getCasByteCount(updates));
}

public static <T> QosClient.QueryWeigher<T> readWeigher(Function<T, Long> bytesRead, Function<T, Integer> numRows) {
return new QosClient.QueryWeigher<T>() {
@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -247,7 +247,17 @@ public void getKeySlicesSize() {
+ EMPTY_COLUMN_OR_SUPERCOLUMN_SIZE;

assertThat(ThriftObjectSizeUtils.getApproximateReadByteCount(slices)).isEqualTo(expectedSize);
}

@Test
public void getCasSize() {
List<Column> columns = ImmutableList.of(
TEST_COLUMN,
TEST_COLUMN);

long expectedSize = TEST_COLUMN_SIZE * 2;

assertThat(ThriftObjectSizeUtils.getCasByteCount(columns)).isEqualTo(expectedSize);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
import java.util.List;
import java.util.Map;

import org.apache.cassandra.thrift.CASResult;
import org.apache.cassandra.thrift.Column;
import org.apache.cassandra.thrift.ColumnOrSuperColumn;
import org.apache.cassandra.thrift.CqlResult;
import org.apache.cassandra.thrift.KeySlice;
Expand All @@ -35,7 +37,8 @@ public class ThriftQueryWeighersTest {

private static final ByteBuffer BYTES1 = ByteBuffer.allocate(3);
private static final ByteBuffer BYTES2 = ByteBuffer.allocate(7);
private static final ColumnOrSuperColumn COLUMN = new ColumnOrSuperColumn();
private static final ColumnOrSuperColumn COLUMN_OR_SUPER = new ColumnOrSuperColumn();
private static final Column COLUMN = new Column();
private static final KeySlice KEY_SLICE = new KeySlice();
private static final Mutation MUTATION = new Mutation();

Expand All @@ -44,8 +47,8 @@ public class ThriftQueryWeighersTest {
@Test
public void multigetSliceWeigherReturnsCorrectNumRows() {
Map<ByteBuffer, List<ColumnOrSuperColumn>> result = ImmutableMap.of(
BYTES1, ImmutableList.of(COLUMN, COLUMN),
BYTES2, ImmutableList.of(COLUMN));
BYTES1, ImmutableList.of(COLUMN_OR_SUPER, COLUMN_OR_SUPER),
BYTES2, ImmutableList.of(COLUMN_OR_SUPER));

long actualNumRows = ThriftQueryWeighers.MULTIGET_SLICE.weigh(result, UNIMPORTANT_ARG).numDistinctRows();

Expand All @@ -63,7 +66,7 @@ public void rangeSlicesWeigherReturnsCorrectNumRows() {

@Test
public void getWeigherReturnsCorrectNumRows() {
long actualNumRows = ThriftQueryWeighers.GET.weigh(COLUMN, UNIMPORTANT_ARG).numDistinctRows();
long actualNumRows = ThriftQueryWeighers.GET.weigh(COLUMN_OR_SUPER, UNIMPORTANT_ARG).numDistinctRows();

assertThat(actualNumRows).isEqualTo(1);
}
Expand All @@ -76,6 +79,14 @@ public void executeCql3QueryWeigherReturnsOneRowAlways() {
assertThat(actualNumRows).isEqualTo(1);
}

@Test
public void casQueryWeigherReturnsOneRowAlways() {
long actualNumRows = ThriftQueryWeighers.cas(ImmutableList.of(COLUMN, COLUMN)).weigh(new CASResult(true),
UNIMPORTANT_ARG).numDistinctRows();

assertThat(actualNumRows).isEqualTo(1);
}

@Test
public void batchMutateWeigherReturnsCorrectNumRows() {
Map<ByteBuffer, Map<String, List<Mutation>>> mutations = ImmutableMap.of(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,15 @@ public class FakeQosClient implements QosClient {
public static final FakeQosClient INSTANCE = new FakeQosClient();

@Override
public <T, E extends Exception> T executeRead(ReadQuery<T, E> query, QueryWeigher<T> weigher)
public <T, E extends Exception> T executeRead(Query<T, E> query, QueryWeigher<T> weigher)
throws E {
return query.execute();
}

@Override
public <T, E extends Exception> void executeWrite(WriteQuery<E> query,
QueryWeigher<Void> weigher) throws E {
query.execute();
public <T, E extends Exception> T executeWrite(Query<T, E> query, QueryWeigher<T> weigher)
throws E {
return query.execute();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -18,24 +18,21 @@

public interface QosClient {

interface ReadQuery<T, E extends Exception> {
interface Query<T, E extends Exception> {
T execute() throws E;
}

interface WriteQuery<E extends Exception> {
void execute() throws E;
}

interface QueryWeigher<T> {
QueryWeight estimate();
QueryWeight weigh(T result, long timeTakenNanos);
}

<T, E extends Exception> T executeRead(
ReadQuery<T, E> query,
Query<T, E> query,
QueryWeigher<T> weigher) throws E;

<T, E extends Exception> T executeWrite(
Query<T, E> query,
QueryWeigher<T> weigher) throws E;

<T, E extends Exception> void executeWrite(
WriteQuery<E> query,
QueryWeigher<Void> weigher) throws E;
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
*/
package com.palantir.atlasdb.qos.client;

import java.util.function.Consumer;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -23,14 +25,13 @@
import com.palantir.atlasdb.qos.QosClient;
import com.palantir.atlasdb.qos.QueryWeight;
import com.palantir.atlasdb.qos.metrics.QosMetrics;
import com.palantir.atlasdb.qos.ratelimit.QosRateLimiter;
import com.palantir.atlasdb.qos.ratelimit.QosRateLimiters;

public class AtlasDbQosClient implements QosClient {

private static final Logger log = LoggerFactory.getLogger(AtlasDbQosClient.class);

private static final Void NO_RESULT = null;

private final QosRateLimiters rateLimiters;
private final QosMetrics metrics;
private final Ticker ticker;
Expand All @@ -47,35 +48,33 @@ public static AtlasDbQosClient create(QosRateLimiters rateLimiters) {
}

@Override
public <T, E extends Exception> T executeRead(ReadQuery<T, E> query, QueryWeigher<T> weigher) throws E {
public <T, E extends Exception> T executeRead(Query<T, E> query, QueryWeigher<T> weigher) throws E {
return execute(query, weigher, rateLimiters.read(), metrics::recordRead);
}

@Override
public <T, E extends Exception> T executeWrite(Query<T, E> query, QueryWeigher<T> weigher) throws E {
return execute(query, weigher, rateLimiters.write(), metrics::recordWrite);
}

private <T, E extends Exception> T execute(
Query<T, E> query,
QueryWeigher<T> weigher,
QosRateLimiter rateLimiter,
Consumer<QueryWeight> weightMetric) throws E {
long estimatedNumBytes = weigher.estimate().numBytes();
rateLimiters.read().consumeWithBackoff(estimatedNumBytes);
rateLimiter.consumeWithBackoff(estimatedNumBytes);

// TODO(nziebart): decide what to do if we encounter a timeout exception
long startTimeNanos = ticker.read();
T result = query.execute();
long totalTimeNanos = ticker.read() - startTimeNanos;

QueryWeight actualWeight = weigher.weigh(result, totalTimeNanos);
metrics.recordRead(actualWeight);
rateLimiters.read().recordAdjustment(actualWeight.numBytes() - estimatedNumBytes);
weightMetric.accept(actualWeight);
rateLimiter.recordAdjustment(actualWeight.numBytes() - estimatedNumBytes);

return result;
}

@Override
public <T, E extends Exception> void executeWrite(WriteQuery<E> query, QueryWeigher<Void> weigher) throws E {
long estimatedNumBytes = weigher.estimate().numBytes();
rateLimiters.write().consumeWithBackoff(estimatedNumBytes);

// TODO(nziebart): decide what to do if we encounter a timeout exception
long startTimeNanos = ticker.read();
query.execute();
long totalTimeNanos = ticker.read() - startTimeNanos;

QueryWeight actualWeight = weigher.weigh(NO_RESULT, totalTimeNanos);
metrics.recordWrite(actualWeight);
rateLimiters.write().recordAdjustment(actualWeight.numBytes() - estimatedNumBytes);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ public void passesResultAndTimeToReadWeigher() throws TestCheckedException {

@Test
public void consumesSpecifiedNumUnitsForWrites() {
qosClient.executeWrite(() -> { }, weigher);
qosClient.executeWrite(() -> null, weigher);

verify(writeLimiter).consumeWithBackoff(ESTIMATED_BYTES);
verify(writeLimiter).recordAdjustment(ACTUAL_BYTES - ESTIMATED_BYTES);
Expand All @@ -110,7 +110,7 @@ public void consumesSpecifiedNumUnitsForWrites() {

@Test
public void recordsWriteMetrics() throws TestCheckedException {
qosClient.executeWrite(() -> { }, weigher);
qosClient.executeWrite(() -> null, weigher);

verify(metrics).recordWrite(ACTUAL_WEIGHT);
verifyNoMoreInteractions(metrics);
Expand Down

0 comments on commit 7665ace

Please sign in to comment.