Skip to content
This repository has been archived by the owner on Nov 14, 2024. It is now read-only.

[QoS] rate limiting #2684

Closed
wants to merge 28 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
2e8c960
Fix SweepBatchConfig values to properly decrease to 1 with each failu…
tboam Nov 8, 2017
f116645
SweeperService logging improvements (#2618)
tboam Nov 8, 2017
c1e21ee
Refactor TracingKVS (#2643)
fsamuel-bs Nov 8, 2017
53afd91
Delete docs (#2657)
hsaraogi Nov 9, 2017
e25d806
[20 minute tasks] Add test for when a batch is full (#2655)
tboam Nov 9, 2017
88e3ffe
MetricRegistry log level downgrade + multiple timestamp tracker tests…
jeremyk-91 Nov 9, 2017
8b67855
Extract interface for Cassandra client (#2660)
fsamuel-bs Nov 10, 2017
c0c05f6
client -> namespace [no release notes] (#2654)
hsaraogi Nov 10, 2017
fede1c3
0.65.2 and 0.66.0 release notes (#2663)
fsamuel-bs Nov 10, 2017
a2be749
[QoS] Add getNamespace to AtlasDBConfig (#2661)
hsaraogi Nov 10, 2017
18d35b6
Live Reloading the TimeLock Block, Part 1: Pull to Push (#2621)
jeremyk-91 Nov 10, 2017
a953328
Live Reloading the TimeLock Block, Part 2: TransactionManagers Plumbi…
jeremyk-91 Nov 10, 2017
8fdd50b
[TTT] [no release notes] Document behaviour regarding index rows (#2658)
jeremyk-91 Nov 13, 2017
6fed36f
Refactor and Instrument CassandraClient api (#2665)
fsamuel-bs Nov 13, 2017
e8e85f9
Live Reloading the TimeLock Block, Part 3: Working with 0 Nodes (#2647)
jeremyk-91 Nov 13, 2017
74180cf
check immutable ts (#2406)
nziebart Nov 13, 2017
d4bf805
Propagate top-level KVS method names to CassandraClient (#2669)
fsamuel-bs Nov 13, 2017
247f60c
Extract cql executor interface (#2670)
fsamuel-bs Nov 13, 2017
913e5e7
bump awaitility (#2668)
hsaraogi Nov 13, 2017
cafc479
Bump Atlas on Tritium 0.8.4 to fix dependency conflicts (#2662)
Nov 14, 2017
7fb4d17
Correctly log Paxos events (#2674)
fsamuel-bs Nov 14, 2017
14216ac
Slow log and tracing (#2673)
fsamuel-bs Nov 14, 2017
78791cf
Refactor cassandra client (#2676)
fsamuel-bs Nov 14, 2017
22e129a
use supplier for object size [no release notes]
Nov 14, 2017
757a282
fix merge
Nov 14, 2017
a45809d
fix merge in AtlasDbConfig
Nov 14, 2017
e3bd685
rate limiting
Nov 15, 2017
2564543
[QoS] total time spent talking to Cassandra (#2687)
nziebart Nov 17, 2017
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ public interface AtlasDbFactory {
default KeyValueService createRawKeyValueService(
KeyValueServiceConfig config, Optional<LeaderConfig> leaderConfig) {
return createRawKeyValueService(config, leaderConfig, Optional.empty(), DEFAULT_INITIALIZE_ASYNC,
FakeQosClient.getDefault());
FakeQosClient.INSTANCE);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
import java.util.Map.Entry;
import java.util.NavigableMap;

import org.apache.cassandra.thrift.Cassandra;
import org.apache.cassandra.thrift.KsDef;
import org.apache.cassandra.thrift.TokenRange;
import org.apache.thrift.TException;
Expand Down Expand Up @@ -99,7 +98,8 @@ public void testTokenMapping() {
public void testSanitiseReplicationFactorPassesForTheKeyspace() {
clientPool.run(client -> {
try {
CassandraVerifier.currentRfOnKeyspaceMatchesDesiredRf(client, CassandraContainer.KVS_CONFIG);
CassandraVerifier.currentRfOnKeyspaceMatchesDesiredRf(client.rawClient(),
CassandraContainer.KVS_CONFIG);
} catch (TException e) {
fail("currentRf On Keyspace does not Match DesiredRf");
}
Expand All @@ -111,7 +111,7 @@ public void testSanitiseReplicationFactorPassesForTheKeyspace() {
public void testSanitiseReplicationFactorFailsAfterManipulatingReplicationFactorInConfig() {
clientPool.run(client -> {
try {
CassandraVerifier.currentRfOnKeyspaceMatchesDesiredRf(client,
CassandraVerifier.currentRfOnKeyspaceMatchesDesiredRf(client.rawClient(),
ImmutableCassandraKeyValueServiceConfig.copyOf(
CassandraContainer.KVS_CONFIG).withReplicationFactor(
MODIFIED_REPLICATION_FACTOR));
Expand All @@ -128,7 +128,8 @@ public void testSanitiseReplicationFactorFailsAfterManipulatingReplicationFactor
changeReplicationFactor(MODIFIED_REPLICATION_FACTOR);
clientPool.run(client -> {
try {
CassandraVerifier.currentRfOnKeyspaceMatchesDesiredRf(client, CassandraContainer.KVS_CONFIG);
CassandraVerifier.currentRfOnKeyspaceMatchesDesiredRf(client.rawClient(),
CassandraContainer.KVS_CONFIG);
fail("currentRf On Keyspace Matches DesiredRf after manipulating the cassandra keyspace");
} catch (Exception e) {
assertReplicationFactorMismatchError(e);
Expand All @@ -145,13 +146,14 @@ private void assertReplicationFactorMismatchError(Exception ex) {
}

private void changeReplicationFactor(int replicationFactor) throws TException {
clientPool.run((FunctionCheckedException<Cassandra.Client, Void, TException>) client -> {
KsDef originalKsDef = client.describe_keyspace(CassandraContainer.KVS_CONFIG.getKeyspaceOrThrow());
clientPool.run((FunctionCheckedException<CassandraClient, Void, TException>) client -> {
KsDef originalKsDef = client.rawClient().describe_keyspace(
CassandraContainer.KVS_CONFIG.getKeyspaceOrThrow());
KsDef modifiedKsDef = originalKsDef.deepCopy();
modifiedKsDef.setStrategy_class(CassandraConstants.NETWORK_STRATEGY);
modifiedKsDef.setStrategy_options(ImmutableMap.of("dc1", Integer.toString(replicationFactor)));
modifiedKsDef.setCf_defs(ImmutableList.of());
client.system_update_keyspace(modifiedKsDef);
client.rawClient().system_update_keyspace(modifiedKsDef);
return null;
});
}
Expand All @@ -169,8 +171,8 @@ public void testPoolGivenNoOptionTalksToBlacklistedHosts() {
clientPool.blacklistedHosts.clear();
}

private FunctionCheckedException<Cassandra.Client, List<TokenRange>, Exception> describeRing =
client -> client.describe_ring("atlasdb");
private FunctionCheckedException<CassandraClient, List<TokenRange>, Exception> describeRing =
client -> client.rawClient().describe_ring("atlasdb");

@Test
public void testWeightedHostsWithUniformActivity() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ public void testGcGraceSecondsUpgradeIsApplied() throws TException {

private void assertThatGcGraceSecondsIs(CassandraKeyValueService kvs, int gcGraceSeconds) throws TException {
List<CfDef> knownCfs = kvs.getClientPool().runWithRetry(client ->
client.describe_keyspace("atlasdb").getCf_defs());
client.rawClient().describe_keyspace("atlasdb").getCf_defs());
CfDef clusterSideCf = Iterables.getOnlyElement(knownCfs.stream()
.filter(cf -> cf.getName().equals(getInternalTestTableName()))
.collect(Collectors.toList()));
Expand All @@ -185,7 +185,7 @@ public void testCfEqualityChecker() throws TException {
kvs.createTable(testTable, tableMetadata);

List<CfDef> knownCfs = kvs.getClientPool().runWithRetry(client ->
client.describe_keyspace("atlasdb").getCf_defs());
client.rawClient().describe_keyspace("atlasdb").getCf_defs());
CfDef clusterSideCf = Iterables.getOnlyElement(knownCfs.stream()
.filter(cf -> cf.getName().equals(getInternalTestTableName()))
.collect(Collectors.toList()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,10 @@
import java.util.stream.Collectors;
import java.util.stream.Stream;

import org.awaitility.Awaitility;
import org.awaitility.core.ConditionTimeoutException;
import org.joda.time.Duration;

import com.jayway.awaitility.Awaitility;
import com.jayway.awaitility.core.ConditionTimeoutException;
import com.palantir.common.base.Throwables;

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,9 @@
*/
package com.palantir.atlasdb.keyvalue.cassandra;

import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.List;

import org.apache.cassandra.thrift.Cassandra;
import org.apache.cassandra.thrift.Column;
import org.apache.cassandra.thrift.Compression;
import org.apache.cassandra.thrift.ConsistencyLevel;
Expand All @@ -29,6 +27,7 @@

import com.google.common.collect.Iterables;
import com.google.common.primitives.Longs;
import com.palantir.logsafe.SafeArg;

public final class SchemaMutationLockTestTools {
private final CassandraClientPool clientPool;
Expand All @@ -51,20 +50,21 @@ public CqlResult setLocksTableValue(long lockId, int heartbeatCount) throws TExc

public CqlResult truncateLocksTable() throws TException {
return clientPool.run(client -> {
String truncateCql = String.format("TRUNCATE \"%s\";", lockTable.getOnlyTable().getQualifiedName());
return runCqlQuery(truncateCql, client, ConsistencyLevel.ALL);
CqlQuery truncateQuery = new CqlQuery("TRUNCATE \"%s\";",
SafeArg.of("lockTable", lockTable.getOnlyTable().getQualifiedName()));
return runCqlQuery(truncateQuery, client, ConsistencyLevel.ALL);
});
}

public CqlResult readLocksTable() throws TException {
return clientPool.run(client -> {
String lockRowName = getHexEncodedBytes(CassandraConstants.GLOBAL_DDL_LOCK_ROW_NAME);
String lockColName = getHexEncodedBytes(CassandraConstants.GLOBAL_DDL_LOCK_COLUMN_NAME);
String selectCql = String.format(
CqlQuery selectCql = new CqlQuery(
"SELECT \"value\" FROM \"%s\" WHERE key = %s AND column1 = %s AND column2 = -1;",
lockTable.getOnlyTable().getQualifiedName(),
lockRowName,
lockColName);
SafeArg.of("lockTable", lockTable.getOnlyTable().getQualifiedName()),
SafeArg.of("lockRow", lockRowName),
SafeArg.of("lockColumn", lockColName));
return runCqlQuery(selectCql, client, ConsistencyLevel.LOCAL_QUORUM);
});
}
Expand All @@ -86,12 +86,12 @@ private CqlResult setLocksTableValueInternal(String hexLockValue) throws TExcept
String lockRowName = getHexEncodedBytes(CassandraConstants.GLOBAL_DDL_LOCK_ROW_NAME);
String lockColName = getHexEncodedBytes(CassandraConstants.GLOBAL_DDL_LOCK_COLUMN_NAME);
String lockTableName = lockTable.getOnlyTable().getQualifiedName();
String updateCql = String.format(
CqlQuery updateCql = new CqlQuery(
"UPDATE \"%s\" SET value = %s WHERE key = %s AND column1 = %s AND column2 = -1;",
lockTableName,
hexLockValue,
lockRowName,
lockColName);
SafeArg.of("lockTable", lockTableName),
SafeArg.of("hexLockValue", hexLockValue),
SafeArg.of("lockRow", lockRowName),
SafeArg.of("lockCol", lockColName));
return runCqlQuery(updateCql, client, ConsistencyLevel.EACH_QUORUM);
});
}
Expand All @@ -106,9 +106,8 @@ private static String getHexEncodedBytes(String str) {
return CassandraKeyValueServices.encodeAsHex(str.getBytes(StandardCharsets.UTF_8));
}

private static CqlResult runCqlQuery(String query, Cassandra.Client client, ConsistencyLevel consistency)
private static CqlResult runCqlQuery(CqlQuery cqlQuery, CassandraClient client, ConsistencyLevel consistency)
throws TException {
ByteBuffer queryBuffer = ByteBuffer.wrap(query.getBytes(StandardCharsets.UTF_8));
return client.execute_cql3_query(queryBuffer, Compression.NONE, consistency);
return client.execute_cql3_query(cqlQuery, Compression.NONE, consistency);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,12 @@
import java.util.Map;
import java.util.concurrent.TimeUnit;

import org.awaitility.Awaitility;
import org.junit.AfterClass;
import org.junit.ClassRule;

import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableMap;
import com.jayway.awaitility.Awaitility;
import com.palantir.atlasdb.AtlasDbConstants;
import com.palantir.atlasdb.cassandra.CassandraKeyValueServiceConfig;
import com.palantir.atlasdb.cassandra.CassandraKeyValueServiceConfigManager;
Expand Down
1 change: 1 addition & 0 deletions atlasdb-cassandra/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ dependencies {
testCompile group: 'org.mockito', name: 'mockito-core'
testCompile group: 'org.assertj', name: 'assertj-core'
testCompile group: 'org.hamcrest', name: 'hamcrest-library'
testCompile group: 'org.awaitility', name: 'awaitility'

processor group: 'org.immutables', name: 'value'
processor 'com.google.auto.service:auto-service:1.0-rc2'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,132 +17,69 @@
package com.palantir.atlasdb.keyvalue.cassandra;

import java.nio.ByteBuffer;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.function.Function;

import org.apache.cassandra.thrift.AutoDelegate_Client;
import org.apache.cassandra.thrift.CASResult;
import org.apache.cassandra.thrift.Cassandra;
import org.apache.cassandra.thrift.Column;
import org.apache.cassandra.thrift.ColumnOrSuperColumn;
import org.apache.cassandra.thrift.ColumnParent;
import org.apache.cassandra.thrift.Compression;
import org.apache.cassandra.thrift.ConsistencyLevel;
import org.apache.cassandra.thrift.CqlResult;
import org.apache.cassandra.thrift.InvalidRequestException;
import org.apache.cassandra.thrift.KeyRange;
import org.apache.cassandra.thrift.KeySlice;
import org.apache.cassandra.thrift.Mutation;
import org.apache.cassandra.thrift.NotFoundException;
import org.apache.cassandra.thrift.SchemaDisagreementException;
import org.apache.cassandra.thrift.SlicePredicate;
import org.apache.cassandra.thrift.TimedOutException;
import org.apache.cassandra.thrift.UnavailableException;
import org.apache.thrift.TException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.palantir.atlasdb.qos.QosClient;
import com.palantir.processors.AutoDelegate;

/**
* Wrapper for Cassandra.Client.
*/
@AutoDelegate(typeToExtend = Cassandra.Client.class)
@SuppressWarnings({"checkstyle:all", "DuplicateThrows"}) // :'(
public class CassandraClient extends AutoDelegate_Client {
private final Logger log = LoggerFactory.getLogger(CassandraClient.class);

private final Cassandra.Client delegate;
private final QosMetrics qosMetrics;
private final QosClient qosClient;

public CassandraClient(Cassandra.Client delegate, QosClient qosClient) {
super(delegate.getInputProtocol());
this.delegate = delegate;
this.qosClient = qosClient;
this.qosMetrics = new QosMetrics();
}
import com.palantir.atlasdb.keyvalue.api.TableReference;

@Override
public Cassandra.Client delegate() {
return delegate;
}
@SuppressWarnings({"all"}) // thrift variable names.
public interface CassandraClient {
Cassandra.Client rawClient();

@Override
public Map<ByteBuffer, List<ColumnOrSuperColumn>> multiget_slice(List<ByteBuffer> keys, ColumnParent column_parent,
Map<ByteBuffer, List<ColumnOrSuperColumn>> multiget_slice(String kvsMethodName,
TableReference tableRef,
List<ByteBuffer> keys,
SlicePredicate predicate, ConsistencyLevel consistency_level)
throws InvalidRequestException, UnavailableException, TimedOutException, TException {
qosClient.checkLimit();
Map<ByteBuffer, List<ColumnOrSuperColumn>> result = delegate.multiget_slice(keys, column_parent,
predicate, consistency_level);
recordBytesRead(getApproximateReadByteCount(result));
return result;
}
throws InvalidRequestException, UnavailableException, TimedOutException, org.apache.thrift.TException;

private long getApproximateReadByteCount(Map<ByteBuffer, List<ColumnOrSuperColumn>> result) {
return getCollectionSize(result.entrySet(),
rowResult ->
ThriftObjectSizeUtils.getByteBufferSize(rowResult.getKey())
+ getCollectionSize(rowResult.getValue(), ThriftObjectSizeUtils::getColumnOrSuperColumnSize));
}

@Override
public void batch_mutate(Map<ByteBuffer, Map<String, List<Mutation>>> mutationMap,
List<KeySlice> get_range_slices(String kvsMethodName,
TableReference tableRef,
SlicePredicate predicate,
KeyRange range,
ConsistencyLevel consistency_level)
throws InvalidRequestException, UnavailableException, TimedOutException, TException {
qosClient.checkLimit();
delegate.batch_mutate(mutationMap, consistency_level);
recordBytesWritten(getApproximateWriteByteCount(mutationMap));
}

private long getApproximateWriteByteCount(Map<ByteBuffer, Map<String, List<Mutation>>> batchMutateMap) {
long approxBytesForKeys = getCollectionSize(batchMutateMap.keySet(), ThriftObjectSizeUtils::getByteBufferSize);
long approxBytesForValues = getCollectionSize(batchMutateMap.values(), currentMap ->
getCollectionSize(currentMap.keySet(), ThriftObjectSizeUtils::getStringSize)
+ getCollectionSize(currentMap.values(),
mutations -> getCollectionSize(mutations, ThriftObjectSizeUtils::getMutationSize)));
return approxBytesForKeys + approxBytesForValues;
}
throws InvalidRequestException, UnavailableException, TimedOutException, org.apache.thrift.TException;

@Override
public CqlResult execute_cql3_query(ByteBuffer query, Compression compression, ConsistencyLevel consistency)
throws InvalidRequestException, UnavailableException, TimedOutException, SchemaDisagreementException,
TException {
qosClient.checkLimit();
CqlResult cqlResult = delegate.execute_cql3_query(query, compression, consistency);
recordBytesRead(ThriftObjectSizeUtils.getCqlResultSize(cqlResult));
return cqlResult;
}

@Override
public List<KeySlice> get_range_slices(ColumnParent column_parent, SlicePredicate predicate, KeyRange range,
void batch_mutate(String kvsMethodName,
Map<ByteBuffer, Map<String, List<Mutation>>> mutation_map,
ConsistencyLevel consistency_level)
throws InvalidRequestException, UnavailableException, TimedOutException, TException {
qosClient.checkLimit();
List<KeySlice> result = super.get_range_slices(column_parent, predicate, range, consistency_level);
recordBytesRead(getCollectionSize(result, ThriftObjectSizeUtils::getKeySliceSize));
return result;
}

private void recordBytesRead(long numBytesRead) {
try {
qosMetrics.updateReadCount();
qosMetrics.updateBytesRead(numBytesRead);
} catch (Exception e) {
log.warn("Encountered an exception when recording read metrics.", e);
}
}

private void recordBytesWritten(long numBytesWritten) {
try {
qosMetrics.updateWriteCount();
qosMetrics.updateBytesWritten(numBytesWritten);
} catch (Exception e) {
log.warn("Encountered an exception when recording write metrics.", e);
}
}
throws InvalidRequestException, UnavailableException, TimedOutException, org.apache.thrift.TException;

private <T> long getCollectionSize(Collection<T> collection, Function<T, Long> singleObjectSizeFunction) {
return collection.stream().mapToLong(singleObjectSizeFunction::apply).sum();
}
ColumnOrSuperColumn get(TableReference tableReference,
ByteBuffer key,
byte[] column,
ConsistencyLevel consistency_level)
throws InvalidRequestException, NotFoundException, UnavailableException, TimedOutException,
org.apache.thrift.TException;

CASResult cas(TableReference tableReference,
ByteBuffer key,
List<Column> expected,
List<Column> updates,
ConsistencyLevel serial_consistency_level,
ConsistencyLevel commit_consistency_level)
throws InvalidRequestException, UnavailableException, TimedOutException, org.apache.thrift.TException;

CqlResult execute_cql3_query(CqlQuery cqlQuery,
Compression compression,
ConsistencyLevel consistency)
throws InvalidRequestException, UnavailableException, TimedOutException, SchemaDisagreementException,
org.apache.thrift.TException;
}
Loading