Skip to content
This repository has been archived by the owner on Nov 14, 2024. It is now read-only.

Commit

Permalink
[QoS] Client config (#2690)
Browse files Browse the repository at this point in the history
* qos config

* respect max backoff itme

* [QoS] [Refactor] Query Weights (#2697)

* query weights

* extra tests

* [QoS] Number of rows per query (#2698)

* num rows

* checkstyle

* fix tests

* no int casting

* fix numRows calculation on batch_mutate
  • Loading branch information
nziebart authored Nov 17, 2017
1 parent dd55403 commit 5089bca
Show file tree
Hide file tree
Showing 33 changed files with 828 additions and 305 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import com.google.common.collect.Maps;
import com.palantir.atlasdb.cassandra.CassandraCredentialsConfig;
import com.palantir.atlasdb.cassandra.CassandraKeyValueServiceConfig;
import com.palantir.atlasdb.keyvalue.cassandra.qos.QosCassandraClient;
import com.palantir.atlasdb.qos.QosClient;
import com.palantir.atlasdb.util.AtlasDbMetrics;
import com.palantir.common.exception.AtlasDbDependencyException;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,11 @@
* limitations under the License.
*/

package com.palantir.atlasdb.keyvalue.cassandra;
package com.palantir.atlasdb.keyvalue.cassandra.qos;

import java.nio.ByteBuffer;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.function.Function;

import org.apache.cassandra.thrift.CASResult;
import org.apache.cassandra.thrift.Cassandra;
Expand All @@ -43,13 +41,13 @@
import org.slf4j.LoggerFactory;

import com.palantir.atlasdb.keyvalue.api.TableReference;
import com.palantir.atlasdb.keyvalue.cassandra.CassandraClient;
import com.palantir.atlasdb.keyvalue.cassandra.CqlQuery;
import com.palantir.atlasdb.qos.QosClient;

@SuppressWarnings({"all"}) // thrift variable names.
public class QosCassandraClient implements CassandraClient {

private static final int DEFAULT_ESTIMATED_READ_BYTES = 100;

private static final Logger log = LoggerFactory.getLogger(CassandraClient.class);

private final CassandraClient client;
Expand All @@ -70,55 +68,35 @@ public Map<ByteBuffer, List<ColumnOrSuperColumn>> multiget_slice(String kvsMetho
List<ByteBuffer> keys, SlicePredicate predicate, ConsistencyLevel consistency_level)
throws InvalidRequestException, UnavailableException, TimedOutException, TException {
return qosClient.executeRead(
() -> DEFAULT_ESTIMATED_READ_BYTES,
() -> client.multiget_slice(kvsMethodName, tableRef, keys,
predicate, consistency_level),
this::getApproximateReadByteCount);
}

private int getApproximateReadByteCount(Map<ByteBuffer, List<ColumnOrSuperColumn>> result) {
return getCollectionSize(result.entrySet(),
rowResult -> ThriftObjectSizeUtils.getByteBufferSize(rowResult.getKey())
+ getCollectionSize(rowResult.getValue(),
ThriftObjectSizeUtils::getColumnOrSuperColumnSize));
() -> client.multiget_slice(kvsMethodName, tableRef, keys, predicate, consistency_level),
ThriftQueryWeighers.MULTIGET_SLICE);
}

@Override
public List<KeySlice> get_range_slices(String kvsMethodName, TableReference tableRef, SlicePredicate predicate,
KeyRange range, ConsistencyLevel consistency_level)
throws InvalidRequestException, UnavailableException, TimedOutException, TException {
return qosClient.executeRead(
() -> DEFAULT_ESTIMATED_READ_BYTES,
() -> client.get_range_slices(kvsMethodName, tableRef, predicate, range, consistency_level),
result -> getCollectionSize(result, ThriftObjectSizeUtils::getKeySliceSize));
ThriftQueryWeighers.GET_RANGE_SLICES);
}

@Override
public void batch_mutate(String kvsMethodName, Map<ByteBuffer, Map<String, List<Mutation>>> mutation_map,
ConsistencyLevel consistency_level)
throws InvalidRequestException, UnavailableException, TimedOutException, TException {
qosClient.executeWrite(
() -> getApproximateWriteByteCount(mutation_map),
() -> client.batch_mutate(kvsMethodName, mutation_map, consistency_level));
}

private int getApproximateWriteByteCount(Map<ByteBuffer, Map<String, List<Mutation>>> batchMutateMap) {
int approxBytesForKeys = getCollectionSize(batchMutateMap.keySet(), ThriftObjectSizeUtils::getByteBufferSize);
int approxBytesForValues = getCollectionSize(batchMutateMap.values(), currentMap ->
getCollectionSize(currentMap.keySet(), ThriftObjectSizeUtils::getStringSize)
+ getCollectionSize(currentMap.values(),
mutations -> getCollectionSize(mutations, ThriftObjectSizeUtils::getMutationSize)));
return approxBytesForKeys + approxBytesForValues;
() -> client.batch_mutate(kvsMethodName, mutation_map, consistency_level),
ThriftQueryWeighers.batchMutate(mutation_map));
}

@Override
public ColumnOrSuperColumn get(TableReference tableReference, ByteBuffer key, byte[] column,
ConsistencyLevel consistency_level)
throws InvalidRequestException, NotFoundException, UnavailableException, TimedOutException, TException {
return qosClient.executeRead(
() -> DEFAULT_ESTIMATED_READ_BYTES,
() -> client.get(tableReference, key, column, consistency_level),
ThriftObjectSizeUtils::getColumnOrSuperColumnSize);
ThriftQueryWeighers.GET);
}

@Override
Expand All @@ -134,14 +112,10 @@ public CASResult cas(TableReference tableReference, ByteBuffer key, List<Column>
public CqlResult execute_cql3_query(CqlQuery cqlQuery, Compression compression, ConsistencyLevel consistency)
throws InvalidRequestException, UnavailableException, TimedOutException, SchemaDisagreementException,
TException {

return qosClient.executeRead(
() -> DEFAULT_ESTIMATED_READ_BYTES,
() -> client.execute_cql3_query(cqlQuery, compression, consistency),
ThriftObjectSizeUtils::getCqlResultSize);
ThriftQueryWeighers.EXECUTE_CQL3_QUERY);
}

private <T> int getCollectionSize(Collection<T> collection, Function<T, Integer> singleObjectSizeFunction) {
return ThriftObjectSizeUtils.getCollectionSize(collection, singleObjectSizeFunction);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,11 @@
* limitations under the License.
*/

package com.palantir.atlasdb.keyvalue.cassandra;
package com.palantir.atlasdb.keyvalue.cassandra.qos;

import java.nio.ByteBuffer;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.function.Function;

Expand All @@ -37,13 +38,33 @@

public final class ThriftObjectSizeUtils {

private static final int ONE_BYTE = 1;
private static final long ONE_BYTE = 1;

private ThriftObjectSizeUtils() {
// utility class
}

public static int getColumnOrSuperColumnSize(ColumnOrSuperColumn columnOrSuperColumn) {
public static long getApproximateWriteByteCount(Map<ByteBuffer, Map<String, List<Mutation>>> batchMutateMap) {
long approxBytesForKeys = getCollectionSize(batchMutateMap.keySet(), ThriftObjectSizeUtils::getByteBufferSize);
long approxBytesForValues = getCollectionSize(batchMutateMap.values(),
currentMap -> getCollectionSize(currentMap.keySet(), ThriftObjectSizeUtils::getStringSize)
+ getCollectionSize(currentMap.values(),
mutations -> getCollectionSize(mutations, ThriftObjectSizeUtils::getMutationSize)));
return approxBytesForKeys + approxBytesForValues;
}

public static long getApproximateReadByteCount(Map<ByteBuffer, List<ColumnOrSuperColumn>> result) {
return getCollectionSize(result.entrySet(),
rowResult -> ThriftObjectSizeUtils.getByteBufferSize(rowResult.getKey())
+ getCollectionSize(rowResult.getValue(),
ThriftObjectSizeUtils::getColumnOrSuperColumnSize));
}

public static long getApproximateReadByteCount(List<KeySlice> slices) {
return getCollectionSize(slices, ThriftObjectSizeUtils::getKeySliceSize);
}

public static long getColumnOrSuperColumnSize(ColumnOrSuperColumn columnOrSuperColumn) {
if (columnOrSuperColumn == null) {
return getNullSize();
}
Expand All @@ -53,14 +74,14 @@ public static int getColumnOrSuperColumnSize(ColumnOrSuperColumn columnOrSuperCo
+ getCounterSuperColumnSize(columnOrSuperColumn.getCounter_super_column());
}

public static int getByteBufferSize(ByteBuffer byteBuffer) {
public static long getByteBufferSize(ByteBuffer byteBuffer) {
if (byteBuffer == null) {
return getNullSize();
}
return byteBuffer.remaining();
}

public static int getMutationSize(Mutation mutation) {
public static long getMutationSize(Mutation mutation) {
if (mutation == null) {
return getNullSize();
}
Expand All @@ -69,7 +90,7 @@ public static int getMutationSize(Mutation mutation) {
mutation.getDeletion());
}

public static int getCqlResultSize(CqlResult cqlResult) {
public static long getCqlResultSize(CqlResult cqlResult) {
if (cqlResult == null) {
return getNullSize();
}
Expand All @@ -79,7 +100,7 @@ public static int getCqlResultSize(CqlResult cqlResult) {
+ getCqlMetadataSize(cqlResult.getSchema());
}

public static int getKeySliceSize(KeySlice keySlice) {
public static long getKeySliceSize(KeySlice keySlice) {
if (keySlice == null) {
return getNullSize();
}
Expand All @@ -88,15 +109,15 @@ public static int getKeySliceSize(KeySlice keySlice) {
+ getCollectionSize(keySlice.getColumns(), ThriftObjectSizeUtils::getColumnOrSuperColumnSize);
}

public static int getStringSize(String string) {
public static long getStringSize(String string) {
if (string == null) {
return getNullSize();
}

return string.length() * Character.SIZE;
return string.length();
}

public static int getColumnSize(Column column) {
public static long getColumnSize(Column column) {
if (column == null) {
return getNullSize();
}
Expand All @@ -107,7 +128,7 @@ public static int getColumnSize(Column column) {
+ getTimestampSize();
}

private static int getCounterSuperColumnSize(CounterSuperColumn counterSuperColumn) {
private static long getCounterSuperColumnSize(CounterSuperColumn counterSuperColumn) {
if (counterSuperColumn == null) {
return getNullSize();
}
Expand All @@ -116,15 +137,15 @@ private static int getCounterSuperColumnSize(CounterSuperColumn counterSuperColu
+ getCollectionSize(counterSuperColumn.getColumns(), ThriftObjectSizeUtils::getCounterColumnSize);
}

private static int getCounterColumnSize(CounterColumn counterColumn) {
private static long getCounterColumnSize(CounterColumn counterColumn) {
if (counterColumn == null) {
return getNullSize();
}

return getByteArraySize(counterColumn.getName()) + getCounterValueSize();
}

private static int getSuperColumnSize(SuperColumn superColumn) {
private static long getSuperColumnSize(SuperColumn superColumn) {
if (superColumn == null) {
return getNullSize();
}
Expand All @@ -133,7 +154,7 @@ private static int getSuperColumnSize(SuperColumn superColumn) {
+ getCollectionSize(superColumn.getColumns(), ThriftObjectSizeUtils::getColumnSize);
}

private static int getDeletionSize(Deletion deletion) {
private static long getDeletionSize(Deletion deletion) {
if (deletion == null) {
return getNullSize();
}
Expand All @@ -143,7 +164,7 @@ private static int getDeletionSize(Deletion deletion) {
+ getSlicePredicateSize(deletion.getPredicate());
}

private static int getSlicePredicateSize(SlicePredicate predicate) {
private static long getSlicePredicateSize(SlicePredicate predicate) {
if (predicate == null) {
return getNullSize();
}
Expand All @@ -152,7 +173,7 @@ private static int getSlicePredicateSize(SlicePredicate predicate) {
+ getSliceRangeSize(predicate.getSlice_range());
}

private static int getSliceRangeSize(SliceRange sliceRange) {
private static long getSliceRangeSize(SliceRange sliceRange) {
if (sliceRange == null) {
return getNullSize();
}
Expand All @@ -163,7 +184,7 @@ private static int getSliceRangeSize(SliceRange sliceRange) {
+ getSliceRangeCountSize();
}

private static int getCqlMetadataSize(CqlMetadata schema) {
private static long getCqlMetadataSize(CqlMetadata schema) {
if (schema == null) {
return getNullSize();
}
Expand All @@ -174,61 +195,61 @@ private static int getCqlMetadataSize(CqlMetadata schema) {
+ getStringSize(schema.getDefault_value_type());
}

private static int getByteBufferStringMapSize(Map<ByteBuffer, String> nameTypes) {
private static long getByteBufferStringMapSize(Map<ByteBuffer, String> nameTypes) {
return getCollectionSize(nameTypes.entrySet(),
entry -> ThriftObjectSizeUtils.getByteBufferSize(entry.getKey())
+ ThriftObjectSizeUtils.getStringSize(entry.getValue()));
}

private static int getCqlRowSize(CqlRow cqlRow) {
private static long getCqlRowSize(CqlRow cqlRow) {
if (cqlRow == null) {
return getNullSize();
}
return getByteArraySize(cqlRow.getKey())
+ getCollectionSize(cqlRow.getColumns(), ThriftObjectSizeUtils::getColumnSize);
}

private static int getThriftEnumSize() {
private static long getThriftEnumSize() {
return Integer.BYTES;
}

private static int getByteArraySize(byte[] byteArray) {
private static long getByteArraySize(byte[] byteArray) {
if (byteArray == null) {
return getNullSize();
}
return byteArray.length;
}

private static int getTimestampSize() {
private static long getTimestampSize() {
return Long.BYTES;
}

private static int getTtlSize() {
private static long getTtlSize() {
return Integer.BYTES;
}

private static int getCounterValueSize() {
private static long getCounterValueSize() {
return Long.BYTES;
}

private static int getReversedBooleanSize() {
private static long getReversedBooleanSize() {
return ONE_BYTE;
}

private static int getSliceRangeCountSize() {
private static long getSliceRangeCountSize() {
return Integer.BYTES;
}

private static int getNullSize() {
private static long getNullSize() {
return Integer.BYTES;
}

public static <T> int getCollectionSize(Collection<T> collection, Function<T, Integer> sizeFunction) {
public static <T> long getCollectionSize(Collection<T> collection, Function<T, Long> sizeFunction) {
if (collection == null) {
return getNullSize();
}

int sum = 0;
long sum = 0;
for (T item : collection) {
sum += sizeFunction.apply(item);
}
Expand Down
Loading

0 comments on commit 5089bca

Please sign in to comment.