Skip to content
This repository has been archived by the owner on Nov 14, 2024. It is now read-only.

Commit

Permalink
Don't NPE on null cqlResults (#3158)
Browse files Browse the repository at this point in the history
* Handle null cql responses

* release notes

* make kvsprofilinglogger contract more explicit

* add profiling client test
  • Loading branch information
jeremyk-91 authored May 11, 2018
1 parent 9d0eb83 commit 17225d1
Show file tree
Hide file tree
Showing 4 changed files with 156 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -188,8 +188,16 @@ public CqlResult execute_cql3_query(CqlQuery cqlQuery, Compression compression,
(KvsProfilingLogger.CallableCheckedException<CqlResult, TException>)
() -> client.execute_cql3_query(cqlQuery, compression, consistency),
(logger, timer) -> cqlQuery.logSlowResult(logger, timer),
(logger, cqlResult) -> logger.log("and returned {} rows, at time {}",
SafeArg.of("numRows", cqlResult.getRows().size()),
LoggingArgs.startTimeMillis(startTime)));
(logger, cqlResult) -> {
if (cqlResult.getRows() == null) {
// different from an empty list
logger.log("and returned null rows. The query was started at time {}",
LoggingArgs.startTimeMillis(startTime));
} else {
logger.log("and returned {} rows. The query was started at time {}",
SafeArg.of("numRows", cqlResult.getRows().size()),
LoggingArgs.startTimeMillis(startTime));
}
});
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
/*
* Copyright 2018 Palantir Technologies, Inc. All rights reserved.
*
* Licensed under the BSD-3 License (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://opensource.org/licenses/BSD-3-Clause
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.palantir.atlasdb.keyvalue.cassandra;

import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoMoreInteractions;
import static org.mockito.Mockito.when;

import java.nio.ByteBuffer;
import java.util.List;

import org.apache.cassandra.thrift.Column;
import org.apache.cassandra.thrift.ColumnOrSuperColumn;
import org.apache.cassandra.thrift.Compression;
import org.apache.cassandra.thrift.ConsistencyLevel;
import org.apache.cassandra.thrift.CqlResult;
import org.apache.cassandra.thrift.CqlResultType;
import org.apache.cassandra.thrift.CqlRow;
import org.apache.cassandra.thrift.SlicePredicate;
import org.apache.thrift.TException;
import org.junit.After;
import org.junit.Test;

import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.palantir.atlasdb.encoding.PtBytes;
import com.palantir.atlasdb.keyvalue.api.TableReference;

public class ProfilingCassandraClientTest {
private static final CqlQuery CQL_QUERY = new CqlQuery("SELECT * FROM atlasdb.foo LIMIT 1;");

private final CassandraClient delegate = mock(CassandraClient.class);
private final CassandraClient profilingClient = new ProfilingCassandraClient(delegate);

@After
public void noMoreInteractions() {
verifyNoMoreInteractions(delegate);
}

@Test
public void passesCqlQueriesAndResultsToAndFromDelegate() throws TException {
ByteBuffer byteBuffer = ByteBuffer.allocate(1);
CqlResult standardCqlResult = new CqlResult(CqlResultType.ROWS)
.setRows(ImmutableList.of(new CqlRow(byteBuffer, ImmutableList.of(new Column(byteBuffer)))));
setDelegateResponseToCqlQuery(standardCqlResult);

assertThat(executeCqlQuery()).isEqualTo(standardCqlResult);

verifyCqlQueryWasExecuted();
}

@Test
public void handlesNullCqlResponseFromDelegate() throws TException {
setDelegateResponseToCqlQuery(null);

assertThat(executeCqlQuery()).isNull();

verifyCqlQueryWasExecuted();
}

@Test
public void handlesCqlResponseWithVoidTypeFromDelegate() throws TException {
setDelegateResponseToCqlQuery(new CqlResult(CqlResultType.VOID));

assertThat(executeCqlQuery()).isEqualTo(new CqlResult(CqlResultType.VOID));

verifyCqlQueryWasExecuted();
}

@Test
public void handlesNullMultigetSliceResponseFromDelegate() throws TException {
ByteBuffer byteBuffer = ByteBuffer.wrap(PtBytes.toBytes("foo"));
List<ColumnOrSuperColumn> columns = ImmutableList.of(
new ColumnOrSuperColumn().setColumn(new Column(byteBuffer)));
ImmutableMap<ByteBuffer, List<ColumnOrSuperColumn>> resultMap = ImmutableMap.of(byteBuffer, columns);

when(delegate.multiget_slice(any(), any(), any(), any(), any())).thenReturn(resultMap);

assertThat(delegate.multiget_slice(
"getRows",
TableReference.createFromFullyQualifiedName("a.b"),
ImmutableList.of(byteBuffer),
new SlicePredicate(),
ConsistencyLevel.QUORUM)).isEqualTo(resultMap);

verify(delegate).multiget_slice(
"getRows",
TableReference.createFromFullyQualifiedName("a.b"),
ImmutableList.of(byteBuffer),
new SlicePredicate(),
ConsistencyLevel.QUORUM);
}

private void setDelegateResponseToCqlQuery(CqlResult result) throws TException {
when(delegate.execute_cql3_query(CQL_QUERY, Compression.NONE, ConsistencyLevel.QUORUM)).thenReturn(result);
}

private CqlResult executeCqlQuery() throws TException {
return profilingClient.execute_cql3_query(CQL_QUERY, Compression.NONE, ConsistencyLevel.QUORUM);
}

private void verifyCqlQueryWasExecuted() throws TException {
verify(delegate).execute_cql3_query(CQL_QUERY, Compression.NONE, ConsistencyLevel.QUORUM);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -107,13 +107,19 @@ public static <T, E extends Exception> T maybeLog(CallableCheckedException<T, E>
return maybeLog(action, primaryLogger, ((loggingFunction, result) -> {}));
}

/**
* Runs an action (which is a CallableCheckedException) through a {@link Monitor}, registering results on
* successful operations and exceptions on unsuccessful operations, as well as logging operations.
*
* Please see the documentation of {@link Monitor} for more information on how the logging functions are invoked.
*/
public static <T, E extends Exception> T maybeLog(CallableCheckedException<T, E> action,
BiConsumer<LoggingFunction, Stopwatch> primaryLogger,
BiConsumer<LoggingFunction, T> additonalLoggerWithAccessToResult) throws E {
BiConsumer<LoggingFunction, T> additionalLoggerWithAccessToResult) throws E {
if (log.isTraceEnabled() || slowlogger.isWarnEnabled()) {
Monitor<T> monitor = Monitor.createMonitor(
primaryLogger,
additonalLoggerWithAccessToResult,
additionalLoggerWithAccessToResult,
slowLogPredicate);
try {
T res = action.call();
Expand Down Expand Up @@ -167,6 +173,16 @@ void registerException(Exception ex) {
this.exception = ex;
}

/**
* Submits the outcome of an operation to log processors, accumulating their results into a single downstream
* logger call. Specifically:
*
* - the primary logger (which can use a stopwatch) is always invoked
* - if a non-null result has been registered, call the additionalLoggerWithAccessToResult on it
* - if an exception has been thrown, log it
*
* The additionalLoggerWithAccessToResult may assume that its argument is non-null.
*/
void log() {
stopwatch.stop();
Consumer<LoggingFunction> logger = (loggingMethod) -> {
Expand Down
5 changes: 5 additions & 0 deletions docs/source/release_notes/release-notes.rst
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,11 @@ develop
- If we make a successful request to a Cassandra client, we now remove it from the overall Cassandra service's blacklist.
(`Pull Request <https://github.com/palantir/atlasdb/pull/3156>`__)

* - |fixed|
- The (Thrift-backed) ``CassandraKeyValueService`` now returns correctly for CQL queries that return null.
Previously, they would throw an exception when we attempted to log information about the response.
(`Pull Request <https://github.com/palantir/atlasdb/pull/3158>`__)

=======
v0.83.0
=======
Expand Down

0 comments on commit 17225d1

Please sign in to comment.