diff --git a/atlasdb-cassandra/src/main/java/com/palantir/atlasdb/keyvalue/cassandra/QosCassandraClient.java b/atlasdb-cassandra/src/main/java/com/palantir/atlasdb/keyvalue/cassandra/QosCassandraClient.java new file mode 100644 index 00000000000..bfaec55afda --- /dev/null +++ b/atlasdb-cassandra/src/main/java/com/palantir/atlasdb/keyvalue/cassandra/QosCassandraClient.java @@ -0,0 +1,173 @@ +/* + * Copyright 2017 Palantir Technologies, Inc. All rights reserved. + * + * Licensed under the BSD-3 License (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://opensource.org/licenses/BSD-3-Clause + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.palantir.atlasdb.keyvalue.cassandra; + +import java.nio.ByteBuffer; +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.function.Function; +import java.util.function.Supplier; + +import org.apache.cassandra.thrift.CASResult; +import org.apache.cassandra.thrift.Cassandra; +import org.apache.cassandra.thrift.Column; +import org.apache.cassandra.thrift.ColumnOrSuperColumn; +import org.apache.cassandra.thrift.Compression; +import org.apache.cassandra.thrift.ConsistencyLevel; +import org.apache.cassandra.thrift.CqlResult; +import org.apache.cassandra.thrift.InvalidRequestException; +import org.apache.cassandra.thrift.KeyRange; +import org.apache.cassandra.thrift.KeySlice; +import org.apache.cassandra.thrift.Mutation; +import org.apache.cassandra.thrift.NotFoundException; +import org.apache.cassandra.thrift.SchemaDisagreementException; +import org.apache.cassandra.thrift.SlicePredicate; +import org.apache.cassandra.thrift.TimedOutException; +import org.apache.cassandra.thrift.UnavailableException; +import org.apache.thrift.TException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.palantir.atlasdb.keyvalue.api.TableReference; +import com.palantir.atlasdb.qos.QosClient; + +@SuppressWarnings({"all"}) // thrift variable names. +public class QosCassandraClient implements CassandraClient { + private final Logger log = LoggerFactory.getLogger(CassandraClient.class); + + private final CassandraClient client; + private final QosMetrics qosMetrics; + private final QosClient qosClient; + + public QosCassandraClient(CassandraClient client, QosClient qosClient) { + this.client = client; + this.qosClient = qosClient; + qosMetrics = new QosMetrics(); + } + + @Override + public Cassandra.Client rawClient() { + return client.rawClient(); + } + + @Override + public Map> multiget_slice(String kvsMethodName, TableReference tableRef, + List keys, SlicePredicate predicate, ConsistencyLevel consistency_level) + throws InvalidRequestException, UnavailableException, TimedOutException, TException { + qosClient.checkLimit(); + + Map> result = client.multiget_slice(kvsMethodName, tableRef, keys, + predicate, consistency_level); + recordBytesRead(() -> getApproximateReadByteCount(result)); + return result; + } + + private long getApproximateReadByteCount(Map> result) { + return getCollectionSize(result.entrySet(), + rowResult -> ThriftObjectSizeUtils.getByteBufferSize(rowResult.getKey()) + + getCollectionSize(rowResult.getValue(), + ThriftObjectSizeUtils::getColumnOrSuperColumnSize)); + } + + @Override + public List get_range_slices(String kvsMethodName, TableReference tableRef, SlicePredicate predicate, + KeyRange range, ConsistencyLevel consistency_level) + throws InvalidRequestException, UnavailableException, TimedOutException, TException { + qosClient.checkLimit(); + + List result = client.get_range_slices(kvsMethodName, tableRef, predicate, range, consistency_level); + recordBytesRead(() -> getCollectionSize(result, ThriftObjectSizeUtils::getKeySliceSize)); + return result; + } + + @Override + public void batch_mutate(String kvsMethodName, Map>> mutation_map, + ConsistencyLevel consistency_level) + throws InvalidRequestException, UnavailableException, TimedOutException, TException { + qosClient.checkLimit(); + + client.batch_mutate(kvsMethodName, mutation_map, consistency_level); + recordBytesWritten(() -> getApproximateWriteByteCount(mutation_map)); + } + + private long getApproximateWriteByteCount(Map>> batchMutateMap) { + long approxBytesForKeys = getCollectionSize(batchMutateMap.keySet(), ThriftObjectSizeUtils::getByteBufferSize); + long approxBytesForValues = getCollectionSize(batchMutateMap.values(), currentMap -> + getCollectionSize(currentMap.keySet(), ThriftObjectSizeUtils::getStringSize) + + getCollectionSize(currentMap.values(), + mutations -> getCollectionSize(mutations, ThriftObjectSizeUtils::getMutationSize))); + return approxBytesForKeys + approxBytesForValues; + } + + @Override + public ColumnOrSuperColumn get(TableReference tableReference, ByteBuffer key, byte[] column, + ConsistencyLevel consistency_level) + throws InvalidRequestException, NotFoundException, UnavailableException, TimedOutException, TException { + qosClient.checkLimit(); + + ColumnOrSuperColumn result = client.get(tableReference, key, column, consistency_level); + recordBytesRead(() -> ThriftObjectSizeUtils.getColumnOrSuperColumnSize(result)); + return result; + } + + @Override + public CASResult cas(TableReference tableReference, ByteBuffer key, List expected, List updates, + ConsistencyLevel serial_consistency_level, ConsistencyLevel commit_consistency_level) + throws InvalidRequestException, UnavailableException, TimedOutException, TException { + qosClient.checkLimit(); + + CASResult result = client.cas(tableReference, key, expected, updates, serial_consistency_level, + commit_consistency_level); + recordBytesWritten(() -> getCollectionSize(updates, ThriftObjectSizeUtils::getColumnSize)); + recordBytesRead(() -> getCollectionSize(updates, ThriftObjectSizeUtils::getColumnSize)); + return result; + } + + @Override + public CqlResult execute_cql3_query(CqlQuery cqlQuery, Compression compression, ConsistencyLevel consistency) + throws InvalidRequestException, UnavailableException, TimedOutException, SchemaDisagreementException, + TException { + qosClient.checkLimit(); + + CqlResult cqlResult = client.execute_cql3_query(cqlQuery, compression, consistency); + recordBytesRead(() -> ThriftObjectSizeUtils.getCqlResultSize(cqlResult)); + return cqlResult; + } + + private void recordBytesRead(Supplier numBytesRead) { + try { + qosMetrics.updateReadCount(); + qosMetrics.updateBytesRead(numBytesRead.get()); + } catch (Exception e) { + log.warn("Encountered an exception when recording read metrics.", e); + } + } + + private void recordBytesWritten(Supplier numBytesWritten) { + try { + qosMetrics.updateWriteCount(); + qosMetrics.updateBytesWritten(numBytesWritten.get()); + } catch (Exception e) { + log.warn("Encountered an exception when recording write metrics.", e); + } + } + + private long getCollectionSize(Collection collection, Function singleObjectSizeFunction) { + return ThriftObjectSizeUtils.getCollectionSize(collection, singleObjectSizeFunction); + } +} diff --git a/atlasdb-cassandra/src/main/java/com/palantir/atlasdb/keyvalue/cassandra/qos/ThriftObjectSizeUtils.java b/atlasdb-cassandra/src/main/java/com/palantir/atlasdb/keyvalue/cassandra/qos/ThriftObjectSizeUtils.java index 42afbd151a6..b18901d95c8 100644 --- a/atlasdb-cassandra/src/main/java/com/palantir/atlasdb/keyvalue/cassandra/qos/ThriftObjectSizeUtils.java +++ b/atlasdb-cassandra/src/main/java/com/palantir/atlasdb/keyvalue/cassandra/qos/ThriftObjectSizeUtils.java @@ -128,6 +128,17 @@ public static long getColumnSize(Column column) { + getTimestampSize(); } + public static long getColumnSize(Column column) { + if (column == null) { + return getNullSize(); + } + + return getByteArraySize(column.getValue()) + + getByteArraySize(column.getName()) + + getTtlSize() + + getTimestampSize(); + } + private static long getCounterSuperColumnSize(CounterSuperColumn counterSuperColumn) { if (counterSuperColumn == null) { return getNullSize(); diff --git a/atlasdb-config/src/main/java/com/palantir/atlasdb/config/AtlasDbRuntimeConfig.java b/atlasdb-config/src/main/java/com/palantir/atlasdb/config/AtlasDbRuntimeConfig.java index df5fbf01319..6dd4b6998e0 100644 --- a/atlasdb-config/src/main/java/com/palantir/atlasdb/config/AtlasDbRuntimeConfig.java +++ b/atlasdb-config/src/main/java/com/palantir/atlasdb/config/AtlasDbRuntimeConfig.java @@ -74,6 +74,14 @@ public QosClientConfig qos() { */ public abstract Optional timelockRuntime(); + /** + * Runtime live-reloadable parameters for communicating with TimeLock. + * + * This value is ignored if the install config does not specify usage of TimeLock. + * We do not currently support live reloading from a leader block or using embedded services to using TimeLock. + */ + public abstract Optional timelockRuntime(); + public static ImmutableAtlasDbRuntimeConfig defaultRuntimeConfig() { return ImmutableAtlasDbRuntimeConfig.builder().build(); }