Skip to content
This repository has been archived by the owner on Nov 14, 2024. It is now read-only.

[QoS] handle exceptions thrown by queries #2706

Merged
merged 1 commit into from
Nov 17, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Suppliers;
import com.palantir.atlasdb.keyvalue.cassandra.CassandraClient;
import com.palantir.atlasdb.qos.ImmutableQueryWeight;
Expand All @@ -42,7 +43,8 @@ public final class ThriftQueryWeighers {

private static final Logger log = LoggerFactory.getLogger(CassandraClient.class);

public static final QueryWeight DEFAULT_ESTIMATED_WEIGHT = ImmutableQueryWeight.builder()
@VisibleForTesting
static final QueryWeight DEFAULT_ESTIMATED_WEIGHT = ImmutableQueryWeight.builder()
.numBytes(100)
.numDistinctRows(1)
.timeTakenNanos(TimeUnit.MILLISECONDS.toNanos(2))
Expand Down Expand Up @@ -86,13 +88,21 @@ public QueryWeight estimate() {
}

@Override
public QueryWeight weigh(T result, long timeTakenNanos) {
public QueryWeight weighSuccess(T result, long timeTakenNanos) {
return ImmutableQueryWeight.builder()
.numBytes(safeGetNumBytesOrDefault(() -> bytesRead.apply(result)))
.timeTakenNanos(timeTakenNanos)
.numDistinctRows(numRows.apply(result))
.build();
}

@Override
public QueryWeight weighFailure(Exception error, long timeTakenNanos) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems a bit weird to pass in error and then not use it. (here and below)

return ImmutableQueryWeight.builder()
.from(estimate())
.timeTakenNanos(timeTakenNanos)
.build();
}
};
}

Expand All @@ -110,7 +120,15 @@ public QueryWeight estimate() {
}

@Override
public QueryWeight weigh(T result, long timeTakenNanos) {
public QueryWeight weighSuccess(T result, long timeTakenNanos) {
return ImmutableQueryWeight.builder()
.from(estimate())
.timeTakenNanos(timeTakenNanos)
.build();
}

@Override
public QueryWeight weighFailure(Exception error, long timeTakenNanos) {
return ImmutableQueryWeight.builder()
.from(estimate())
.timeTakenNanos(timeTakenNanos)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,9 @@

import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.palantir.atlasdb.qos.ImmutableQueryWeight;
import com.palantir.atlasdb.qos.QosClient;
import com.palantir.atlasdb.qos.QueryWeight;

public class ThriftQueryWeighersTest {

Expand All @@ -42,15 +45,20 @@ public class ThriftQueryWeighersTest {
private static final KeySlice KEY_SLICE = new KeySlice();
private static final Mutation MUTATION = new Mutation();

private static final long UNIMPORTANT_ARG = 123L;
private static final long TIME_TAKEN = 123L;

private static final QueryWeight DEFAULT_WEIGHT = ImmutableQueryWeight.builder()
.from(ThriftQueryWeighers.DEFAULT_ESTIMATED_WEIGHT)
.timeTakenNanos(TIME_TAKEN)
.build();

@Test
public void multigetSliceWeigherReturnsCorrectNumRows() {
Map<ByteBuffer, List<ColumnOrSuperColumn>> result = ImmutableMap.of(
BYTES1, ImmutableList.of(COLUMN_OR_SUPER, COLUMN_OR_SUPER),
BYTES2, ImmutableList.of(COLUMN_OR_SUPER));

long actualNumRows = ThriftQueryWeighers.MULTIGET_SLICE.weigh(result, UNIMPORTANT_ARG).numDistinctRows();
long actualNumRows = ThriftQueryWeighers.MULTIGET_SLICE.weighSuccess(result, TIME_TAKEN).numDistinctRows();

assertThat(actualNumRows).isEqualTo(2);
}
Expand All @@ -59,30 +67,30 @@ public void multigetSliceWeigherReturnsCorrectNumRows() {
public void rangeSlicesWeigherReturnsCorrectNumRows() {
List<KeySlice> result = ImmutableList.of(KEY_SLICE, KEY_SLICE, KEY_SLICE);

long actualNumRows = ThriftQueryWeighers.GET_RANGE_SLICES.weigh(result, UNIMPORTANT_ARG).numDistinctRows();
long actualNumRows = ThriftQueryWeighers.GET_RANGE_SLICES.weighSuccess(result, TIME_TAKEN).numDistinctRows();

assertThat(actualNumRows).isEqualTo(3);
}

@Test
public void getWeigherReturnsCorrectNumRows() {
long actualNumRows = ThriftQueryWeighers.GET.weigh(COLUMN_OR_SUPER, UNIMPORTANT_ARG).numDistinctRows();
long actualNumRows = ThriftQueryWeighers.GET.weighSuccess(COLUMN_OR_SUPER, TIME_TAKEN).numDistinctRows();

assertThat(actualNumRows).isEqualTo(1);
}

@Test
public void executeCql3QueryWeigherReturnsOneRowAlways() {
long actualNumRows = ThriftQueryWeighers.EXECUTE_CQL3_QUERY.weigh(new CqlResult(),
UNIMPORTANT_ARG).numDistinctRows();
long actualNumRows = ThriftQueryWeighers.EXECUTE_CQL3_QUERY.weighSuccess(new CqlResult(),
TIME_TAKEN).numDistinctRows();

assertThat(actualNumRows).isEqualTo(1);
}

@Test
public void casQueryWeigherReturnsOneRowAlways() {
long actualNumRows = ThriftQueryWeighers.cas(ImmutableList.of(COLUMN, COLUMN)).weigh(new CASResult(true),
UNIMPORTANT_ARG).numDistinctRows();
long actualNumRows = ThriftQueryWeighers.cas(ImmutableList.of(COLUMN, COLUMN)).weighSuccess(new CASResult(true),
TIME_TAKEN).numDistinctRows();

assertThat(actualNumRows).isEqualTo(1);
}
Expand All @@ -96,10 +104,68 @@ public void batchMutateWeigherReturnsCorrectNumRows() {
BYTES2, ImmutableMap.of(
"baz", ImmutableList.of(MUTATION)));

long actualNumRows = ThriftQueryWeighers.batchMutate(mutations).weigh(null, UNIMPORTANT_ARG)
long actualNumRows = ThriftQueryWeighers.batchMutate(mutations).weighSuccess(null, TIME_TAKEN)
.numDistinctRows();

assertThat(actualNumRows).isEqualTo(3);
}

@Test
public void multigetSliceWeigherReturnsDefaultEstimateForFailure() {
QueryWeight weight = ThriftQueryWeighers.MULTIGET_SLICE.weighFailure(new RuntimeException(), TIME_TAKEN);

assertThat(weight).isEqualTo(DEFAULT_WEIGHT);
}

@Test
public void getWeigherReturnsDefaultEstimateForFailure() {
QueryWeight weight = ThriftQueryWeighers.GET.weighFailure(new RuntimeException(), TIME_TAKEN);

assertThat(weight).isEqualTo(DEFAULT_WEIGHT);
}

@Test
public void getRangeSlicesWeigherReturnsDefaultEstimateForFailure() {
QueryWeight weight = ThriftQueryWeighers.GET_RANGE_SLICES.weighFailure(new RuntimeException(), TIME_TAKEN);

assertThat(weight).isEqualTo(DEFAULT_WEIGHT);
}

@Test
public void batchMutateWeigherReturnsEstimateForFailure() {
Map<ByteBuffer, Map<String, List<Mutation>>> mutations = ImmutableMap.of(
BYTES1, ImmutableMap.of("foo", ImmutableList.of(MUTATION, MUTATION)));

QosClient.QueryWeigher<Void> weigher = ThriftQueryWeighers.batchMutate(mutations);

QueryWeight expected = ImmutableQueryWeight.builder()
.from(weigher.estimate())
.timeTakenNanos(TIME_TAKEN)
.build();
QueryWeight actual = weigher.weighFailure(new RuntimeException(), TIME_TAKEN);

assertThat(actual).isEqualTo(expected);
}

@Test
public void casWeigherReturnsEstimateForFailure() {
QosClient.QueryWeigher<CASResult> weigher = ThriftQueryWeighers.cas(ImmutableList.of(COLUMN, COLUMN));

QueryWeight expected = ImmutableQueryWeight.builder()
.from(weigher.estimate())
.timeTakenNanos(TIME_TAKEN)
.build();
QueryWeight actual = weigher.weighFailure(new RuntimeException(), TIME_TAKEN);

assertThat(actual).isEqualTo(expected);
}

@Test
public void cql3QueryWeigherReturnsDefaultEstimateForFailure() {
QueryWeight weight = ThriftQueryWeighers.EXECUTE_CQL3_QUERY.weighFailure(new RuntimeException(),
TIME_TAKEN);

assertThat(weight).isEqualTo(DEFAULT_WEIGHT);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,8 @@ interface Query<T, E extends Exception> {

interface QueryWeigher<T> {
QueryWeight estimate();
QueryWeight weigh(T result, long timeTakenNanos);
QueryWeight weighSuccess(T result, long timeTakenNanos);
QueryWeight weighFailure(Exception error, long timeTakenNanos);
}

<T, E extends Exception> T executeRead(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,14 @@
*/
package com.palantir.atlasdb.qos.client;

import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Stopwatch;
import com.google.common.base.Ticker;
import com.palantir.atlasdb.qos.QosClient;
import com.palantir.atlasdb.qos.QueryWeight;
Expand Down Expand Up @@ -65,16 +67,20 @@ private <T, E extends Exception> T execute(
long estimatedNumBytes = weigher.estimate().numBytes();
rateLimiter.consumeWithBackoff(estimatedNumBytes);

// TODO(nziebart): decide what to do if we encounter a timeout exception
long startTimeNanos = ticker.read();
T result = query.execute();
long totalTimeNanos = ticker.read() - startTimeNanos;
Stopwatch timer = Stopwatch.createStarted(ticker);

QueryWeight actualWeight = weigher.weigh(result, totalTimeNanos);
weightMetric.accept(actualWeight);
rateLimiter.recordAdjustment(actualWeight.numBytes() - estimatedNumBytes);

return result;
QueryWeight actualWeight = null;
try {
T result = query.execute();
actualWeight = weigher.weighSuccess(result, timer.elapsed(TimeUnit.NANOSECONDS));
return result;
} catch (Exception ex) {
actualWeight = weigher.weighFailure(ex, timer.elapsed(TimeUnit.NANOSECONDS));
throw ex;
} finally {
weightMetric.accept(actualWeight);
rateLimiter.recordAdjustment(actualWeight.numBytes() - estimatedNumBytes);
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyLong;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoMoreInteractions;
import static org.mockito.Mockito.when;
Expand Down Expand Up @@ -72,7 +73,9 @@ public void setUp() {
when(ticker.read()).thenReturn(START_NANOS).thenReturn(END_NANOS);

when(weigher.estimate()).thenReturn(ESTIMATED_WEIGHT);
when(weigher.weigh(any(), anyLong())).thenReturn(ACTUAL_WEIGHT);
when(weigher.weighSuccess(any(), anyLong())).thenReturn(ACTUAL_WEIGHT);
when(weigher.weighFailure(any(), anyLong())).thenReturn(ACTUAL_WEIGHT);

}

@Test
Expand All @@ -96,7 +99,7 @@ public void recordsReadMetrics() throws TestCheckedException {
public void passesResultAndTimeToReadWeigher() throws TestCheckedException {
qosClient.executeRead(() -> "foo", weigher);

verify(weigher).weigh("foo", TOTAL_NANOS);
verify(weigher).weighSuccess("foo", TOTAL_NANOS);
}

@Test
Expand All @@ -116,6 +119,39 @@ public void recordsWriteMetrics() throws TestCheckedException {
verifyNoMoreInteractions(metrics);
}

@Test
public void recordsReadMetricsOnFailure() throws TestCheckedException {
TestCheckedException error = new TestCheckedException();
assertThatThrownBy(() -> qosClient.executeRead(() -> {
throw error;
}, weigher)).isInstanceOf(TestCheckedException.class);

verify(metrics).recordRead(ACTUAL_WEIGHT);
verifyNoMoreInteractions(metrics);
}

@Test
public void recordsWriteMetricsOnFailure() throws TestCheckedException {
TestCheckedException error = new TestCheckedException();
assertThatThrownBy(() -> qosClient.executeWrite(() -> {
throw error;
}, weigher)).isInstanceOf(TestCheckedException.class);

verify(metrics).recordWrite(ACTUAL_WEIGHT);
verifyNoMoreInteractions(metrics);
}

@Test
public void passesExceptionToWeigherOnFailure() throws TestCheckedException {
TestCheckedException error = new TestCheckedException();
assertThatThrownBy(() -> qosClient.executeRead(() -> {
throw error;
}, weigher)).isInstanceOf(TestCheckedException.class);

verify(weigher).weighFailure(error, TOTAL_NANOS);
verify(weigher, never()).weighSuccess(any(), anyLong());
}

@Test
public void propagatesCheckedExceptions() throws TestCheckedException {
assertThatThrownBy(() -> qosClient.executeRead(() -> {
Expand Down