Skip to content
This repository has been archived by the owner on Nov 14, 2024. It is now read-only.

Commit

Permalink
rate limiting
Browse files Browse the repository at this point in the history
  • Loading branch information
Nathan Ziebart committed Nov 15, 2017
1 parent a45809d commit e3bd685
Show file tree
Hide file tree
Showing 40 changed files with 679 additions and 198 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ public interface AtlasDbFactory {
default KeyValueService createRawKeyValueService(
KeyValueServiceConfig config, Optional<LeaderConfig> leaderConfig) {
return createRawKeyValueService(config, leaderConfig, Optional.empty(), DEFAULT_INITIALIZE_ASYNC,
FakeQosClient.getDefault());
FakeQosClient.INSTANCE);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -164,11 +164,11 @@ public void shutdown() {
@VisibleForTesting
static CassandraClientPoolImpl createImplForTest(CassandraKeyValueServiceConfig config,
StartupChecks startupChecks) {
return create(config, startupChecks, AtlasDbConstants.DEFAULT_INITIALIZE_ASYNC, FakeQosClient.getDefault());
return create(config, startupChecks, AtlasDbConstants.DEFAULT_INITIALIZE_ASYNC, FakeQosClient.INSTANCE);
}

public static CassandraClientPool create(CassandraKeyValueServiceConfig config) {
return create(config, AtlasDbConstants.DEFAULT_INITIALIZE_ASYNC, FakeQosClient.getDefault());
return create(config, AtlasDbConstants.DEFAULT_INITIALIZE_ASYNC, FakeQosClient.INSTANCE);
}

public static CassandraClientPool create(CassandraKeyValueServiceConfig config, boolean initializeAsync,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ private CassandraExpiringKeyValueService(
Optional<LeaderConfig> leaderConfig,
boolean initializeAsync) {
super(LoggerFactory.getLogger(CassandraKeyValueService.class), configManager, compactionManager, leaderConfig,
initializeAsync, FakeQosClient.getDefault());
initializeAsync, FakeQosClient.INSTANCE);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,7 @@ public static CassandraKeyValueService create(
CassandraKeyValueServiceConfigManager configManager,
Optional<LeaderConfig> leaderConfig,
boolean initializeAsync) {
return create(configManager, leaderConfig, initializeAsync, FakeQosClient.getDefault());
return create(configManager, leaderConfig, initializeAsync, FakeQosClient.INSTANCE);
}

public static CassandraKeyValueService create(
Expand All @@ -245,7 +245,7 @@ static CassandraKeyValueService create(
Optional<LeaderConfig> leaderConfig,
Logger log) {
return create(configManager, leaderConfig, log, AtlasDbConstants.DEFAULT_INITIALIZE_ASYNC,
FakeQosClient.getDefault());
FakeQosClient.INSTANCE);
}

private static CassandraKeyValueService create(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import java.util.List;
import java.util.Map;
import java.util.function.Function;
import java.util.function.Supplier;

import org.apache.cassandra.thrift.CASResult;
import org.apache.cassandra.thrift.Cassandra;
Expand All @@ -48,16 +47,17 @@

@SuppressWarnings({"all"}) // thrift variable names.
public class QosCassandraClient implements CassandraClient {
private final Logger log = LoggerFactory.getLogger(CassandraClient.class);

private static final int DEFAULT_ESTIMATED_READ_BYTES = 100;

private static final Logger log = LoggerFactory.getLogger(CassandraClient.class);

private final CassandraClient client;
private final QosMetrics qosMetrics;
private final QosClient qosClient;

public QosCassandraClient(CassandraClient client, QosClient qosClient) {
this.client = client;
this.qosClient = qosClient;
qosMetrics = new QosMetrics();
}

@Override
Expand All @@ -69,15 +69,14 @@ public Cassandra.Client rawClient() {
public Map<ByteBuffer, List<ColumnOrSuperColumn>> multiget_slice(String kvsMethodName, TableReference tableRef,
List<ByteBuffer> keys, SlicePredicate predicate, ConsistencyLevel consistency_level)
throws InvalidRequestException, UnavailableException, TimedOutException, TException {
qosClient.checkLimit();

Map<ByteBuffer, List<ColumnOrSuperColumn>> result = client.multiget_slice(kvsMethodName, tableRef, keys,
predicate, consistency_level);
recordBytesRead(() -> getApproximateReadByteCount(result));
return result;
return qosClient.executeRead(
() -> DEFAULT_ESTIMATED_READ_BYTES,
() -> client.multiget_slice(kvsMethodName, tableRef, keys,
predicate, consistency_level),
this::getApproximateReadByteCount);
}

private long getApproximateReadByteCount(Map<ByteBuffer, List<ColumnOrSuperColumn>> result) {
private int getApproximateReadByteCount(Map<ByteBuffer, List<ColumnOrSuperColumn>> result) {
return getCollectionSize(result.entrySet(),
rowResult -> ThriftObjectSizeUtils.getByteBufferSize(rowResult.getKey())
+ getCollectionSize(rowResult.getValue(),
Expand All @@ -88,26 +87,24 @@ private long getApproximateReadByteCount(Map<ByteBuffer, List<ColumnOrSuperColum
public List<KeySlice> get_range_slices(String kvsMethodName, TableReference tableRef, SlicePredicate predicate,
KeyRange range, ConsistencyLevel consistency_level)
throws InvalidRequestException, UnavailableException, TimedOutException, TException {
qosClient.checkLimit();

List<KeySlice> result = client.get_range_slices(kvsMethodName, tableRef, predicate, range, consistency_level);
recordBytesRead(() -> getCollectionSize(result, ThriftObjectSizeUtils::getKeySliceSize));
return result;
return qosClient.executeRead(
() -> DEFAULT_ESTIMATED_READ_BYTES,
() -> client.get_range_slices(kvsMethodName, tableRef, predicate, range, consistency_level),
result -> getCollectionSize(result, ThriftObjectSizeUtils::getKeySliceSize));
}

@Override
public void batch_mutate(String kvsMethodName, Map<ByteBuffer, Map<String, List<Mutation>>> mutation_map,
ConsistencyLevel consistency_level)
throws InvalidRequestException, UnavailableException, TimedOutException, TException {
qosClient.checkLimit();

client.batch_mutate(kvsMethodName, mutation_map, consistency_level);
recordBytesWritten(() -> getApproximateWriteByteCount(mutation_map));
qosClient.executeWrite(
() -> getApproximateWriteByteCount(mutation_map),
() -> client.batch_mutate(kvsMethodName, mutation_map, consistency_level));
}

private long getApproximateWriteByteCount(Map<ByteBuffer, Map<String, List<Mutation>>> batchMutateMap) {
long approxBytesForKeys = getCollectionSize(batchMutateMap.keySet(), ThriftObjectSizeUtils::getByteBufferSize);
long approxBytesForValues = getCollectionSize(batchMutateMap.values(), currentMap ->
private int getApproximateWriteByteCount(Map<ByteBuffer, Map<String, List<Mutation>>> batchMutateMap) {
int approxBytesForKeys = getCollectionSize(batchMutateMap.keySet(), ThriftObjectSizeUtils::getByteBufferSize);
int approxBytesForValues = getCollectionSize(batchMutateMap.values(), currentMap ->
getCollectionSize(currentMap.keySet(), ThriftObjectSizeUtils::getStringSize)
+ getCollectionSize(currentMap.values(),
mutations -> getCollectionSize(mutations, ThriftObjectSizeUtils::getMutationSize)));
Expand All @@ -118,56 +115,33 @@ private long getApproximateWriteByteCount(Map<ByteBuffer, Map<String, List<Mutat
public ColumnOrSuperColumn get(TableReference tableReference, ByteBuffer key, byte[] column,
ConsistencyLevel consistency_level)
throws InvalidRequestException, NotFoundException, UnavailableException, TimedOutException, TException {
qosClient.checkLimit();

ColumnOrSuperColumn result = client.get(tableReference, key, column, consistency_level);
recordBytesRead(() -> ThriftObjectSizeUtils.getColumnOrSuperColumnSize(result));
return result;
return qosClient.executeRead(
() -> DEFAULT_ESTIMATED_READ_BYTES,
() -> client.get(tableReference, key, column, consistency_level),
ThriftObjectSizeUtils::getColumnOrSuperColumnSize);
}

@Override
public CASResult cas(TableReference tableReference, ByteBuffer key, List<Column> expected, List<Column> updates,
ConsistencyLevel serial_consistency_level, ConsistencyLevel commit_consistency_level)
throws InvalidRequestException, UnavailableException, TimedOutException, TException {
qosClient.checkLimit();

CASResult result = client.cas(tableReference, key, expected, updates, serial_consistency_level,
// TODO(nziebart): should this be considered as a write or do we need to treat is as both read and write?
return client.cas(tableReference, key, expected, updates, serial_consistency_level,
commit_consistency_level);
recordBytesWritten(() -> getCollectionSize(updates, ThriftObjectSizeUtils::getColumnSize));
recordBytesRead(() -> getCollectionSize(updates, ThriftObjectSizeUtils::getColumnSize));
return result;
}

@Override
public CqlResult execute_cql3_query(CqlQuery cqlQuery, Compression compression, ConsistencyLevel consistency)
throws InvalidRequestException, UnavailableException, TimedOutException, SchemaDisagreementException,
TException {
qosClient.checkLimit();

CqlResult cqlResult = client.execute_cql3_query(cqlQuery, compression, consistency);
recordBytesRead(() -> ThriftObjectSizeUtils.getCqlResultSize(cqlResult));
return cqlResult;
}

private void recordBytesRead(Supplier<Long> numBytesRead) {
try {
qosMetrics.updateReadCount();
qosMetrics.updateBytesRead(numBytesRead.get());
} catch (Exception e) {
log.warn("Encountered an exception when recording read metrics.", e);
}
}

private void recordBytesWritten(Supplier<Long> numBytesWritten) {
try {
qosMetrics.updateWriteCount();
qosMetrics.updateBytesWritten(numBytesWritten.get());
} catch (Exception e) {
log.warn("Encountered an exception when recording write metrics.", e);
}
return qosClient.executeRead(
() -> DEFAULT_ESTIMATED_READ_BYTES,
() -> client.execute_cql3_query(cqlQuery, compression, consistency),
ThriftObjectSizeUtils::getCqlResultSize);
}

private <T> long getCollectionSize(Collection<T> collection, Function<T, Long> singleObjectSizeFunction) {
private <T> int getCollectionSize(Collection<T> collection, Function<T, Integer> singleObjectSizeFunction) {
return ThriftObjectSizeUtils.getCollectionSize(collection, singleObjectSizeFunction);
}
}
Loading

0 comments on commit e3bd685

Please sign in to comment.