Skip to content
This repository has been archived by the owner on Nov 14, 2024. It is now read-only.

[QoS] Feature/qos meters #2640

Merged
merged 16 commits into from
Nov 10, 2017
Original file line number Diff line number Diff line change
@@ -17,8 +17,10 @@
package com.palantir.atlasdb.keyvalue.cassandra;

import java.nio.ByteBuffer;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.function.Function;

import org.apache.cassandra.thrift.AutoDelegate_Client;
import org.apache.cassandra.thrift.Cassandra;
@@ -36,6 +38,8 @@
import org.apache.cassandra.thrift.TimedOutException;
import org.apache.cassandra.thrift.UnavailableException;
import org.apache.thrift.TException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.palantir.atlasdb.qos.AtlasDbQosClient;
import com.palantir.processors.AutoDelegate;
@@ -46,13 +50,17 @@
@AutoDelegate(typeToExtend = Cassandra.Client.class)
@SuppressWarnings({"checkstyle:all", "DuplicateThrows"}) // :'(
public class CassandraClient extends AutoDelegate_Client {
private final Logger log = LoggerFactory.getLogger(CassandraClient.class);

private final Cassandra.Client delegate;
private final AtlasDbQosClient qosClient;
private final QosMetrics qosMetrics;

public CassandraClient(Cassandra.Client delegate, AtlasDbQosClient qosClient) {
super(delegate.getInputProtocol());
this.delegate = delegate;
this.qosClient = qosClient;
this.qosMetrics = new QosMetrics();
}

@Override
@@ -65,30 +73,84 @@ public Map<ByteBuffer, List<ColumnOrSuperColumn>> multiget_slice(List<ByteBuffer
SlicePredicate predicate, ConsistencyLevel consistency_level)
throws InvalidRequestException, UnavailableException, TimedOutException, TException {
qosClient.checkLimit();
return delegate.multiget_slice(keys, column_parent, predicate, consistency_level);
Map<ByteBuffer, List<ColumnOrSuperColumn>> result = delegate.multiget_slice(keys, column_parent,
predicate, consistency_level);
try {
recordBytesRead(getApproximateReadByteCount(result));
} catch (Exception e) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As before I don't think there should be any need to catch exceptions from the metrics code so we can remove all of these.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What if there is a NullPointerException or something wrong in our metrics calculation, we will end up failing Cassandra operations if we dont catch these. Ideally, we dont want to throw an exception if recording metrics throws, right.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's move the try/catch into the recordBytesRead method

log.warn("Encountered an exception when recording write metrics for multiget_slice.", e);
}
return result;
}

private long getApproximateReadByteCount(Map<ByteBuffer, List<ColumnOrSuperColumn>> result) {
return getCollectionSize(result.entrySet(),
rowResult ->
ThriftObjectSizeUtils.getByteBufferSize(rowResult.getKey())
+ getCollectionSize(rowResult.getValue(), ThriftObjectSizeUtils::getColumnOrSuperColumnSize));
}

@Override
public void batch_mutate(Map<ByteBuffer, Map<String, List<Mutation>>> mutation_map,
public void batch_mutate(Map<ByteBuffer, Map<String, List<Mutation>>> mutationMap,
ConsistencyLevel consistency_level)
throws InvalidRequestException, UnavailableException, TimedOutException, TException {
qosClient.checkLimit();
delegate.batch_mutate(mutation_map, consistency_level);
delegate.batch_mutate(mutationMap, consistency_level);
try {
recordBytesWritten(getApproximateWriteByteCount(mutationMap));
} catch (Exception e) {
log.warn("Encountered an exception when recording write metrics for batch_mutate.", e);
}
}

private long getApproximateWriteByteCount(Map<ByteBuffer, Map<String, List<Mutation>>> batchMutateMap) {
long approxBytesForKeys = getCollectionSize(batchMutateMap.keySet(), ThriftObjectSizeUtils::getByteBufferSize);
long approxBytesForValues = getCollectionSize(batchMutateMap.values(), currentMap ->
getCollectionSize(currentMap.keySet(), ThriftObjectSizeUtils::getStringSize)
+ getCollectionSize(currentMap.values(),
mutations -> getCollectionSize(mutations, ThriftObjectSizeUtils::getMutationSize)));
return approxBytesForKeys + approxBytesForValues;
}

@Override
public CqlResult execute_cql3_query(ByteBuffer query, Compression compression, ConsistencyLevel consistency)
throws InvalidRequestException, UnavailableException, TimedOutException, SchemaDisagreementException,
TException {
qosClient.checkLimit();
return delegate.execute_cql3_query(query, compression, consistency);
CqlResult cqlResult = delegate.execute_cql3_query(query, compression, consistency);
try {
recordBytesRead(ThriftObjectSizeUtils.getCqlResultSize(cqlResult));
} catch (Exception e) {
log.warn("Encountered an exception when recording read metrics for execute_cql3_query.", e);
}
return cqlResult;
}

@Override
public List<KeySlice> get_range_slices(ColumnParent column_parent, SlicePredicate predicate, KeyRange range,
ConsistencyLevel consistency_level)
throws InvalidRequestException, UnavailableException, TimedOutException, TException {
qosClient.checkLimit();
return delegate.get_range_slices(column_parent, predicate, range, consistency_level);
List<KeySlice> result = super.get_range_slices(column_parent, predicate, range, consistency_level);
try {
recordBytesRead(getCollectionSize(result, ThriftObjectSizeUtils::getKeySliceSize));
} catch (Exception e) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would really hope that recording metrics shouldn't throw an Exception for any reason. Why do we need this?

log.warn("Encountered an exception when recording read metrics for get_range_slices.", e);
}
return result;
}

private void recordBytesRead(long numBytesRead) {
qosMetrics.updateReadCount();
qosMetrics.updateBytesRead(numBytesRead);
}

private void recordBytesWritten(long numBytesWritten) {
qosMetrics.updateWriteCount();
qosMetrics.updateBytesWritten(numBytesWritten);
}

private <T> long getCollectionSize(Collection<T> collection, Function<T, Long> singleObjectSizeFunction) {
return collection.stream().mapToLong(singleObjectSizeFunction::apply).sum();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
/*
* Copyright 2017 Palantir Technologies, Inc. All rights reserved.
*
* Licensed under the BSD-3 License (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://opensource.org/licenses/BSD-3-Clause
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.palantir.atlasdb.keyvalue.cassandra;

import com.codahale.metrics.Meter;
import com.palantir.atlasdb.util.MetricsManager;

public class QosMetrics {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Immutable?

private final MetricsManager metricsManager = new MetricsManager();

private final Meter readRequestCount;
private final Meter writeRequestCount;
private final Meter bytesRead;
private final Meter bytesWritten;

public QosMetrics() {
readRequestCount = metricsManager.registerMeter(QosMetrics.class, "numReadRequests");
writeRequestCount = metricsManager.registerMeter(QosMetrics.class, "numWriteRequests");

bytesRead = metricsManager.registerMeter(QosMetrics.class, "bytesRead");
bytesWritten = metricsManager.registerMeter(QosMetrics.class, "bytesWritten");
}

public void updateReadCount() {
readRequestCount.mark();
}

public void updateWriteCount() {
writeRequestCount.mark();
}

public void updateBytesRead(long numBytes) {
bytesRead.mark(numBytes);
}

public void updateBytesWritten(long numBytes) {
bytesWritten.mark(numBytes);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,229 @@
/*
* Copyright 2017 Palantir Technologies, Inc. All rights reserved.
*
* Licensed under the BSD-3 License (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://opensource.org/licenses/BSD-3-Clause
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.palantir.atlasdb.keyvalue.cassandra;

import java.nio.ByteBuffer;
import java.util.Collection;
import java.util.Map;
import java.util.function.Function;

import org.apache.cassandra.thrift.Column;
import org.apache.cassandra.thrift.ColumnOrSuperColumn;
import org.apache.cassandra.thrift.CounterColumn;
import org.apache.cassandra.thrift.CounterSuperColumn;
import org.apache.cassandra.thrift.CqlMetadata;
import org.apache.cassandra.thrift.CqlResult;
import org.apache.cassandra.thrift.CqlRow;
import org.apache.cassandra.thrift.Deletion;
import org.apache.cassandra.thrift.KeySlice;
import org.apache.cassandra.thrift.Mutation;
import org.apache.cassandra.thrift.SlicePredicate;
import org.apache.cassandra.thrift.SliceRange;
import org.apache.cassandra.thrift.SuperColumn;

public final class ThriftObjectSizeUtils {

private static final long ONE_BYTE = 1L;

private ThriftObjectSizeUtils() {
// utility class
}

public static long getColumnOrSuperColumnSize(ColumnOrSuperColumn columnOrSuperColumn) {
if (columnOrSuperColumn == null) {
return getNullSize();
}
return getColumnSize(columnOrSuperColumn.getColumn())
+ getSuperColumnSize(columnOrSuperColumn.getSuper_column())
+ getCounterColumnSize(columnOrSuperColumn.getCounter_column())
+ getCounterSuperColumnSize(columnOrSuperColumn.getCounter_super_column());
}

public static long getByteBufferSize(ByteBuffer byteBuffer) {
if (byteBuffer == null) {
getNullSize();
}

return byteBuffer.position();
}

public static long getMutationSize(Mutation mutation) {
if (mutation == null) {
return getNullSize();
}

return getColumnOrSuperColumnSize(mutation.getColumn_or_supercolumn()) + getDeletionSize(mutation.getDeletion());
}

public static long getCqlResultSize(CqlResult cqlResult) {
if (cqlResult == null) {
return getNullSize();
}
return getThriftEnumSize()
+ getCollectionSize(cqlResult.getRows(), ThriftObjectSizeUtils::getCqlRowSize)
+ Integer.BYTES
+ getCqlMetadataSize(cqlResult.getSchema());
}

public static long getKeySliceSize(KeySlice keySlice) {
if (keySlice == null) {
return getNullSize();
}

return getByteArraySize(keySlice.getKey())
+ getCollectionSize(keySlice.getColumns(), ThriftObjectSizeUtils::getColumnOrSuperColumnSize);
}

public static long getStringSize(String string) {
if (string == null) {
return getNullSize();
}

return string.length() * Character.SIZE;
}

private static long getCounterSuperColumnSize(CounterSuperColumn counterSuperColumn) {
if (counterSuperColumn == null) {
return getNullSize();
}

return getByteArraySize(counterSuperColumn.getName())
+ getCollectionSize(counterSuperColumn.getColumns(), ThriftObjectSizeUtils::getCounterColumnSize);
}

private static long getCounterColumnSize(CounterColumn counterColumn) {
if (counterColumn == null) {
return getNullSize();
}

return getByteArraySize(counterColumn.getName()) + getCounterValueSize();
}

private static long getSuperColumnSize(SuperColumn superColumn) {
if (superColumn == null) {
return getNullSize();
}

return getByteArraySize(superColumn.getName())
+ getCollectionSize(superColumn.getColumns(), ThriftObjectSizeUtils::getColumnSize);
}


private static long getColumnSize(Column column) {
if (column == null) {
return getNullSize();
}

return getByteArraySize(column.getValue())
+ getByteArraySize(column.getName())
+ getTtlSize()
+ getTimestampSize();
}

private static long getDeletionSize(Deletion deletion) {
if (deletion == null) {
return getNullSize();
}

return getTimestampSize()
+ getByteArraySize(deletion.getSuper_column())
+ getSlicePredicateSize(deletion.getPredicate());
}

private static long getSlicePredicateSize(SlicePredicate predicate) {
if (predicate == null) {
return getNullSize();
}

return getCollectionSize(predicate.getColumn_names(), ThriftObjectSizeUtils::getByteBufferSize) + getSliceRangeSize(predicate.getSlice_range());
}

private static long getSliceRangeSize(SliceRange sliceRange) {
if (sliceRange == null) {
return getNullSize();
}

return getByteArraySize(sliceRange.getStart())
+ getByteArraySize(sliceRange.getFinish())
+ getReversedBooleanSize()
+ getSliceRangeCountSize();
}

private static long getCqlMetadataSize(CqlMetadata schema) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good to see we're being thorough here. However some of these sizes could probably be removed for simplicity. I think we are mainly interested in the sizes of the column names and values we read and write, and not so much the extra metadata included in the request/response objects. (Fine to merge as is for now though)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Merging as is for now, with most code paths tested.

if (schema == null) {
return getNullSize();
}

return getByteBufferStringMapSize(schema.getName_types())
+ getByteBufferStringMapSize(schema.getValue_types())
+ getStringSize(schema.getDefault_name_type())
+ getStringSize(schema.getDefault_value_type());
}

private static long getByteBufferStringMapSize(Map<ByteBuffer, String> nameTypes) {
return getCollectionSize(nameTypes.entrySet(),
entry -> ThriftObjectSizeUtils.getByteBufferSize(entry.getKey()) +
ThriftObjectSizeUtils.getStringSize(entry.getValue()));
}

private static Long getCqlRowSize(CqlRow cqlRow) {
if (cqlRow == null) {
return getNullSize();
}
return getByteArraySize(cqlRow.getKey())
+ getCollectionSize(cqlRow.getColumns(), ThriftObjectSizeUtils::getColumnSize);
}

private static long getThriftEnumSize() {
return Integer.BYTES;
}

private static long getByteArraySize(byte[] byteArray) {
if (byteArray == null) {
return getNullSize();
}
return byteArray.length;
}

private static long getTimestampSize() {
return Long.BYTES;
}

private static long getTtlSize() {
return Integer.BYTES;
}

private static long getCounterValueSize() {
return Long.BYTES;
}

private static long getReversedBooleanSize() {
return ONE_BYTE;
}

private static long getSliceRangeCountSize() {
return Integer.BYTES;
}

private static long getNullSize() {
return Integer.BYTES;
}

private static <T> long getCollectionSize(Collection<T> collection, Function<T, Long> sizeFunction) {
return collection.stream().mapToLong(sizeFunction::apply).sum();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
/*
* Copyright 2017 Palantir Technologies, Inc. All rights reserved.
*
* Licensed under the BSD-3 License (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://opensource.org/licenses/BSD-3-Clause
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.palantir.atlasdb;

import static org.assertj.core.api.Java6Assertions.assertThat;

import java.nio.ByteBuffer;

import org.apache.cassandra.thrift.Column;
import org.apache.cassandra.thrift.ColumnOrSuperColumn;
import org.apache.cassandra.thrift.SuperColumn;
import org.junit.Test;

import com.google.common.collect.ImmutableList;
import com.palantir.atlasdb.keyvalue.cassandra.ThriftObjectSizeUtils;

public class ThriftObjectSizeUtilsTest {

private static final String TEST_MAME = "test";
private static final Column TEST_COLUMN = new Column(ByteBuffer.wrap(TEST_MAME.getBytes()));


private static final long TEST_COLUMN_SIZE = 4L + TEST_MAME.getBytes().length + 4L + 8L;


@Test
public void returnEightForNullColumnOrSuperColumn() throws Exception {
assertThat(ThriftObjectSizeUtils.getColumnOrSuperColumnSize(null)).isEqualTo(Integer.BYTES);
}

@Test
public void getSizeForEmptyColumnOrSuperColumn() throws Exception {
assertThat(ThriftObjectSizeUtils.getColumnOrSuperColumnSize(new ColumnOrSuperColumn())).isEqualTo(
Integer.BYTES * 4);
}

@Test
public void getSizeForColumnOrSuperColumnWithAnEmptyColumn() throws Exception {
ColumnOrSuperColumn columnOrSuperColumn = new ColumnOrSuperColumn();
columnOrSuperColumn.setColumn(new Column());
assertThat(ThriftObjectSizeUtils.getColumnOrSuperColumnSize(columnOrSuperColumn)).isEqualTo(
Integer.BYTES * 8);
}

@Test
public void getSizeForColumnOrSuperColumnWithANonEmptyColumn() throws Exception {
assertThat(ThriftObjectSizeUtils.getColumnOrSuperColumnSize(new ColumnOrSuperColumn().setColumn(TEST_COLUMN)))
.isEqualTo(Integer.BYTES * 3 + TEST_COLUMN_SIZE);
}

@Test
public void getSizeForColumnOrSuperColumnWithANonEmptyColumnAndSuperColumn() throws Exception {
ColumnOrSuperColumn columnOrSuperColumn = new ColumnOrSuperColumn();
columnOrSuperColumn.setColumn(TEST_COLUMN);
columnOrSuperColumn.setSuper_column(new SuperColumn(ByteBuffer.wrap(TEST_MAME.getBytes()), ImmutableList.of(TEST_COLUMN)));
assertThat(ThriftObjectSizeUtils.getColumnOrSuperColumnSize(columnOrSuperColumn)).isEqualTo(
Integer.BYTES * 2 + TEST_COLUMN_SIZE + TEST_MAME.getBytes().length + TEST_COLUMN_SIZE);
}


}
Original file line number Diff line number Diff line change
@@ -23,6 +23,7 @@
import org.slf4j.LoggerFactory;

import com.codahale.metrics.Gauge;
import com.codahale.metrics.Histogram;
import com.codahale.metrics.Meter;
import com.codahale.metrics.Metric;
import com.codahale.metrics.MetricRegistry;
@@ -65,6 +66,16 @@ public void registerMetric(Class clazz, String metricName, Metric metric) {
registerMetricWithFqn(MetricRegistry.name(clazz, metricName), metric);
}

public Histogram registerHistogram(Class clazz, String metricName) {
return registerHistogram(MetricRegistry.name(clazz, metricName));
}

private Histogram registerHistogram(String fullyQualifiedHistogramName) {
Histogram histogram = metricRegistry.histogram(fullyQualifiedHistogramName);
registeredMetrics.add(fullyQualifiedHistogramName);
return histogram;
}

private synchronized void registerMetricWithFqn(String fullyQualifiedMetricName, Metric metric) {
try {
metricRegistry.register(fullyQualifiedMetricName, metric);
@@ -75,6 +86,10 @@ private synchronized void registerMetricWithFqn(String fullyQualifiedMetricName,
}
}

public Meter registerMeter(Class clazz, String meterName) {
return registerMeter(MetricRegistry.name(clazz, "", meterName));
}

public Meter registerMeter(Class clazz, String metricPrefix, String meterName) {
return registerMeter(MetricRegistry.name(clazz, metricPrefix, meterName));
}