Skip to content
This repository has been archived by the owner on Nov 14, 2024. It is now read-only.

Feature/qos service api #2629

Merged
merged 47 commits into from
Dec 2, 2017
Merged
Show file tree
Hide file tree
Changes from 20 commits
Commits
Show all changes
47 commits
Select commit Hold shift + click to select a range
f5664a2
Extremely basic QosServiceResource
gsheasby Nov 2, 2017
6beb734
Make resource an interface
gsheasby Nov 2, 2017
45cc7fc
Add client PathParam
gsheasby Nov 2, 2017
f0345e9
Clean up javax.ws.rs dependencies
gsheasby Nov 2, 2017
013582c
Create stub for AtlasDbQosClient
gsheasby Nov 3, 2017
a303b1e
Calls to checkLimit use up a credit; throw when out of credits
gsheasby Nov 3, 2017
139a208
Add QosServiceResourceImpl + test
gsheasby Nov 3, 2017
fb925d3
AutoDelegate for Cassandra.Client
gsheasby Nov 3, 2017
50cc876
Rename QosService stuff
gsheasby Nov 3, 2017
bcbde47
Pass AtlasDbQosClient to CassandraClient
gsheasby Nov 3, 2017
9be8291
Check limit on multiget_slice
gsheasby Nov 3, 2017
e2efe14
Check limit on batch_mutate
gsheasby Nov 3, 2017
47de8d5
Don't test we aren't soft-limited while we can never be soft-limited
gsheasby Nov 3, 2017
c7cfa3d
Check limit on remaining CassandraClient methods
gsheasby Nov 6, 2017
d5fa2db
Scheduled refresh of AtlasDbQosClient.credits
gsheasby Nov 6, 2017
9666ee0
Refresh every second
gsheasby Nov 6, 2017
7717ee4
Mount qos-service on Timelock
hsaraogi Nov 6, 2017
db0037b
Checkstyle
hsaraogi Nov 7, 2017
5ccf451
Update dependency locks
gsheasby Nov 7, 2017
c191cb7
Merge branch 'develop' into feature/qos-service-api
hsaraogi Nov 7, 2017
364ae22
Dont throw limitExceededException
hsaraogi Nov 7, 2017
5515d31
Move client param around
hsaraogi Nov 8, 2017
32ed6bc
Comment
hsaraogi Nov 8, 2017
ffa5e5a
Qos Service config (#2644)
hsaraogi Nov 8, 2017
f8d687f
[QoS] qos ete test (#2652)
nziebart Nov 8, 2017
1613da4
[QoS] rate limiter (#2653)
nziebart Nov 10, 2017
3873586
[QoS] Feature/qos client (#2650)
hsaraogi Nov 10, 2017
103fd97
deps
Nov 10, 2017
22b1e7a
fix tests
Nov 10, 2017
3d1f43f
[QoS] Feature/qos meters (#2640)
hsaraogi Nov 10, 2017
3797765
[QoS] QosClient with ratelimiter (#2667)
hsaraogi Nov 13, 2017
509f3e9
locks
hsaraogi Nov 13, 2017
34dcd28
[QoS] Create a jaxrs-client for the integ tests (#2675)
hsaraogi Nov 14, 2017
b8f8312
Nziebart/merge develop into qos (#2683)
nziebart Nov 15, 2017
4fcef06
qos rate limiting (#2709)
fsamuel-bs Nov 20, 2017
d0b9ae0
[QoS] Estimate the number of read bytes w/ number of rows (#2717)
fsamuel-bs Nov 21, 2017
8998354
weight estimates (#2725)
nziebart Nov 21, 2017
bbd272e
[QoS] Fix exceptions thrown on CqlExecutor (#2696)
fsamuel-bs Nov 22, 2017
d2d7b18
[QoS] Qos ete test (#2708)
hsaraogi Nov 24, 2017
be45487
[QoS] Fix/qos system table rate limiting (#2739)
hsaraogi Nov 24, 2017
bf251e1
Merge develop to the feature branch (#2741)
hsaraogi Nov 27, 2017
11192dd
Nziebart/cell timestamps qos (#2745)
nziebart Nov 28, 2017
d4c2c67
Merge branch 'develop' into feature/qos-service-api
hsaraogi Nov 28, 2017
ae1a3d5
Differentiate between read and write limits when logging (#2751)
hsaraogi Nov 29, 2017
b5caa16
Merge branch 'develop' into feature/qos-service-api
nziebart Dec 1, 2017
da8e513
Use longs in the rate limiter and handle negative adjustments. (#2758)
hsaraogi Dec 1, 2017
92aa59c
Merge branch 'develop' into feature/qos-service-api
hsaraogi Dec 1, 2017
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion atlasdb-api/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ apply from: "../gradle/shared.gradle"
dependencies {
compile project(":atlasdb-commons")
compile project(":timestamp-api")
compile 'javax.ws.rs:javax.ws.rs-api:2.0.1'
compile group: 'javax.ws.rs', name: 'javax.ws.rs-api'
compile group: 'org.apache.commons', name: 'commons-lang3'
compile group: 'com.fasterxml.jackson.core', name: 'jackson-annotations'
compile group: 'com.fasterxml.jackson.core', name: 'jackson-databind'
Expand Down
2 changes: 0 additions & 2 deletions atlasdb-api/versions.lock
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,6 @@
},
"javax.ws.rs:javax.ws.rs-api": {
"locked": "2.0.1",
"requested": "2.0.1",
"transitive": [
"com.palantir.atlasdb:atlasdb-commons",
"com.palantir.atlasdb:timestamp-api"
Expand Down Expand Up @@ -170,7 +169,6 @@
},
"javax.ws.rs:javax.ws.rs-api": {
"locked": "2.0.1",
"requested": "2.0.1",
"transitive": [
"com.palantir.atlasdb:atlasdb-commons",
"com.palantir.atlasdb:timestamp-api"
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
/*
* Copyright 2017 Palantir Technologies, Inc. All rights reserved.
*
* Licensed under the BSD-3 License (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://opensource.org/licenses/BSD-3-Clause
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.palantir.atlasdb.keyvalue.cassandra;

import java.nio.ByteBuffer;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;

import org.apache.cassandra.thrift.AutoDelegate_Client;
import org.apache.cassandra.thrift.Cassandra;
import org.apache.cassandra.thrift.ColumnOrSuperColumn;
import org.apache.cassandra.thrift.ColumnParent;
import org.apache.cassandra.thrift.Compression;
import org.apache.cassandra.thrift.ConsistencyLevel;
import org.apache.cassandra.thrift.CqlResult;
import org.apache.cassandra.thrift.InvalidRequestException;
import org.apache.cassandra.thrift.KeyRange;
import org.apache.cassandra.thrift.KeySlice;
import org.apache.cassandra.thrift.Mutation;
import org.apache.cassandra.thrift.SchemaDisagreementException;
import org.apache.cassandra.thrift.SlicePredicate;
import org.apache.cassandra.thrift.TimedOutException;
import org.apache.cassandra.thrift.UnavailableException;
import org.apache.thrift.TException;

import com.palantir.atlasdb.qos.AtlasDbQosClient;
import com.palantir.common.base.Throwables;
import com.palantir.processors.AutoDelegate;

/**
* Wrapper for Cassandra.Client.
*/
@AutoDelegate(typeToExtend = Cassandra.Client.class)
@SuppressWarnings({"checkstyle:all", "DuplicateThrows"}) // :'(
public class CassandraClient extends AutoDelegate_Client {
private final Cassandra.Client delegate;
private final AtlasDbQosClient qosClient;

public CassandraClient(Cassandra.Client delegate, AtlasDbQosClient qosClient) {
super(delegate.getInputProtocol());
this.delegate = delegate;
this.qosClient = qosClient;
}

@Override
public Cassandra.Client delegate() {
return delegate;
}

@Override
public Map<ByteBuffer, List<ColumnOrSuperColumn>> multiget_slice(List<ByteBuffer> keys, ColumnParent column_parent,
SlicePredicate predicate, ConsistencyLevel consistency_level)
throws InvalidRequestException, UnavailableException, TimedOutException, TException {
return checkLimitAndCall(() -> super.multiget_slice(keys, column_parent, predicate, consistency_level));
}

@Override
public void batch_mutate(Map<ByteBuffer, Map<String, List<Mutation>>> mutation_map,
ConsistencyLevel consistency_level)
throws InvalidRequestException, UnavailableException, TimedOutException, TException {
checkLimitAndCall(() -> {
super.batch_mutate(mutation_map, consistency_level);
return null;
});
}

@Override
public CqlResult execute_cql3_query(ByteBuffer query, Compression compression, ConsistencyLevel consistency)
throws InvalidRequestException, UnavailableException, TimedOutException, SchemaDisagreementException,
TException {
return checkLimitAndCall(() -> super.execute_cql3_query(query, compression, consistency));
}

@Override
public List<KeySlice> get_range_slices(ColumnParent column_parent, SlicePredicate predicate, KeyRange range,
ConsistencyLevel consistency_level)
throws InvalidRequestException, UnavailableException, TimedOutException, TException {
return checkLimitAndCall(() -> super.get_range_slices(column_parent, predicate, range, consistency_level));
}

private <T> T checkLimitAndCall(Callable<T> callable) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what's the purpose of accepting a callable here? why not just checkLimit() before each call?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got rid of this method.

try {
qosClient.checkLimit();
return callable.call();
} catch (Exception ex) {
throw Throwables.throwUncheckedException(ex);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we probably dont want to wrap these exceptions

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed.

}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import com.google.common.collect.Maps;
import com.palantir.atlasdb.cassandra.CassandraCredentialsConfig;
import com.palantir.atlasdb.cassandra.CassandraKeyValueServiceConfig;
import com.palantir.atlasdb.qos.AtlasDbQosClient;
import com.palantir.common.exception.AtlasDbDependencyException;
import com.palantir.logsafe.SafeArg;
import com.palantir.logsafe.UnsafeArg;
Expand All @@ -66,18 +67,22 @@ public SSLSocketFactory load(InetSocketAddress host) throws Exception {
}
});

private final AtlasDbQosClient qosClient;
private final InetSocketAddress addr;
private final CassandraKeyValueServiceConfig config;

public CassandraClientFactory(InetSocketAddress addr, CassandraKeyValueServiceConfig config) {
public CassandraClientFactory(AtlasDbQosClient qosClient,
InetSocketAddress addr,
CassandraKeyValueServiceConfig config) {
this.qosClient = qosClient;
this.addr = addr;
this.config = config;
}

@Override
public Client create() throws Exception {
try {
return getClient(addr, config);
return getClient(qosClient, addr, config);
} catch (Exception e) {
String message = String.format("Failed to construct client for %s/%s", addr, config.getKeyspaceOrThrow());
if (config.usingSsl()) {
Expand All @@ -87,9 +92,10 @@ public Client create() throws Exception {
}
}

private static Cassandra.Client getClient(InetSocketAddress addr,
CassandraKeyValueServiceConfig config) throws Exception {
Client ret = getClientInternal(addr, config);
private static Cassandra.Client getClient(AtlasDbQosClient qosClient,
InetSocketAddress addr,
CassandraKeyValueServiceConfig config) throws Exception {
Client ret = getWrappedClient(qosClient, addr, config);
try {
ret.set_keyspace(config.getKeyspaceOrThrow());
log.debug("Created new client for {}/{}{}{}",
Expand All @@ -105,6 +111,13 @@ private static Cassandra.Client getClient(InetSocketAddress addr,
}
}

private static Client getWrappedClient(AtlasDbQosClient qosClient,
InetSocketAddress addr,
CassandraKeyValueServiceConfig config)
throws TException {
return new CassandraClient(getClientInternal(addr, config), qosClient);
}

static Cassandra.Client getClientInternal(InetSocketAddress addr, CassandraKeyValueServiceConfig config)
throws TException {
TSocket thriftSocket = new TSocket(addr.getHostString(), addr.getPort(), config.socketTimeoutMillis());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import java.util.Optional;
import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ThreadLocalRandom;
Expand All @@ -50,6 +51,7 @@
import org.slf4j.LoggerFactory;
import org.slf4j.helpers.MessageFormatter;

import com.codahale.metrics.InstrumentedScheduledExecutorService;
import com.codahale.metrics.Meter;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
Expand All @@ -72,6 +74,10 @@
import com.palantir.atlasdb.keyvalue.api.InsufficientConsistencyException;
import com.palantir.atlasdb.keyvalue.api.TableReference;
import com.palantir.atlasdb.keyvalue.cassandra.CassandraClientFactory.ClientCreationFailedException;
import com.palantir.atlasdb.qos.AtlasDbQosClient;
import com.palantir.atlasdb.qos.QosService;
import com.palantir.atlasdb.qos.QosServiceResource;
import com.palantir.atlasdb.util.AtlasDbMetrics;
import com.palantir.atlasdb.util.MetricsManager;
import com.palantir.common.base.FunctionCheckedException;
import com.palantir.common.base.Throwables;
Expand Down Expand Up @@ -150,6 +156,7 @@ public void shutdown() {
private final CassandraKeyValueServiceConfig config;
private final Map<InetSocketAddress, CassandraClientPoolingContainer> currentPools = Maps.newConcurrentMap();
private final StartupChecks startupChecks;
private final AtlasDbQosClient qosClient;
private final ScheduledExecutorService refreshDaemon;
private final MetricsManager metricsManager = new MetricsManager();
private final RequestMetrics aggregateMetrics = new RequestMetrics(null);
Expand Down Expand Up @@ -177,14 +184,24 @@ public static CassandraClientPool create(CassandraKeyValueServiceConfig config,

private static CassandraClientPoolImpl create(CassandraKeyValueServiceConfig config,
StartupChecks startupChecks, boolean initializeAsync) {
CassandraClientPoolImpl cassandraClientPool = new CassandraClientPoolImpl(config, startupChecks);
// TODO eventually we'll want to pass this in from somewhere
QosService qosResource = new QosServiceResource();

ScheduledExecutorService scheduler = new InstrumentedScheduledExecutorService(
Executors.newSingleThreadScheduledExecutor(),
AtlasDbMetrics.getMetricRegistry(),
"qos-client-executor");
AtlasDbQosClient qosClient = AtlasDbQosClient.create(scheduler, qosResource, config.getKeyspaceOrThrow());
CassandraClientPoolImpl cassandraClientPool = new CassandraClientPoolImpl(config, startupChecks, qosClient);
cassandraClientPool.wrapper.initialize(initializeAsync);
return cassandraClientPool;
}

private CassandraClientPoolImpl(CassandraKeyValueServiceConfig config, StartupChecks startupChecks) {
private CassandraClientPoolImpl(CassandraKeyValueServiceConfig config, StartupChecks startupChecks,
AtlasDbQosClient qosClient) {
this.config = config;
this.startupChecks = startupChecks;
this.qosClient = qosClient;
this.refreshDaemon = Tracers.wrap(PTExecutors.newScheduledThreadPool(1, new ThreadFactoryBuilder()
.setDaemon(true)
.setNameFormat("CassandraClientPoolRefresh-%d")
Expand Down Expand Up @@ -287,7 +304,7 @@ private synchronized void refreshPool() {
@VisibleForTesting
void addPool(InetSocketAddress server) {
int currentPoolNumber = cassandraHosts.indexOf(server) + 1;
addPool(server, new CassandraClientPoolingContainer(server, config, currentPoolNumber));
addPool(server, new CassandraClientPoolingContainer(qosClient, server, config, currentPoolNumber));
}

@VisibleForTesting
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import com.google.common.base.Function;
import com.google.common.base.MoreObjects;
import com.palantir.atlasdb.cassandra.CassandraKeyValueServiceConfig;
import com.palantir.atlasdb.qos.AtlasDbQosClient;
import com.palantir.atlasdb.util.MetricsManager;
import com.palantir.common.base.FunctionCheckedException;
import com.palantir.common.pooling.PoolingContainer;
Expand All @@ -49,17 +50,19 @@
public class CassandraClientPoolingContainer implements PoolingContainer<Client> {
private static final Logger log = LoggerFactory.getLogger(CassandraClientPoolingContainer.class);

private final AtlasDbQosClient qosClient;
private final InetSocketAddress host;
private CassandraKeyValueServiceConfig config;
private final CassandraKeyValueServiceConfig config;
private final MetricsManager metricsManager = new MetricsManager();
private final AtomicLong count = new AtomicLong();
private final AtomicInteger openRequests = new AtomicInteger();
private final GenericObjectPool<Client> clientPool;

public CassandraClientPoolingContainer(
public CassandraClientPoolingContainer(AtlasDbQosClient qosClient,
InetSocketAddress host,
CassandraKeyValueServiceConfig config,
int poolNumber) {
this.qosClient = qosClient;
this.host = host;
this.config = config;
this.clientPool = createClientPool(poolNumber);
Expand Down Expand Up @@ -235,7 +238,7 @@ public String toString() {
* @param poolNumber number of the pool for metric registration.
*/
private GenericObjectPool<Client> createClientPool(int poolNumber) {
CassandraClientFactory cassandraClientFactory = new CassandraClientFactory(host, config);
CassandraClientFactory cassandraClientFactory = new CassandraClientFactory(qosClient, host, config);
GenericObjectPoolConfig poolConfig = new GenericObjectPoolConfig();

poolConfig.setMinIdle(config.poolSize());
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
/*
* Copyright 2017 Palantir Technologies, Inc. All rights reserved.
*
* Licensed under the BSD-3 License (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://opensource.org/licenses/BSD-3-Clause
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.palantir.atlasdb.keyvalue.cassandra;

import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoMoreInteractions;

import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;

import javax.naming.LimitExceededException;

import org.apache.cassandra.thrift.Cassandra;
import org.apache.cassandra.thrift.ColumnParent;
import org.apache.cassandra.thrift.Compression;
import org.apache.cassandra.thrift.ConsistencyLevel;
import org.apache.cassandra.thrift.KeyRange;
import org.apache.cassandra.thrift.SlicePredicate;
import org.apache.thrift.TException;
import org.junit.Before;
import org.junit.Test;

import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.palantir.atlasdb.encoding.PtBytes;
import com.palantir.atlasdb.keyvalue.cassandra.thrift.SlicePredicates;
import com.palantir.atlasdb.qos.AtlasDbQosClient;

public class CassandraClientTest {
private static final ByteBuffer ROW_KEY = ByteBuffer.wrap(PtBytes.toBytes("key"));

private final Cassandra.Client mockClient = mock(Cassandra.Client.class);
private final AtlasDbQosClient qosClient = mock(AtlasDbQosClient.class);

private static final ColumnParent TEST_TABLE = new ColumnParent("table");
private static final SlicePredicate SLICE_PREDICATE = SlicePredicates.create(SlicePredicates.Range.ALL,
SlicePredicates.Limit.ONE);

private CassandraClient client;

@Before
public void setUp() {
client = new CassandraClient(mockClient, qosClient);
}

@Test
public void multigetSliceChecksLimit() throws TException, LimitExceededException {
client.multiget_slice(ImmutableList.of(ROW_KEY), TEST_TABLE, SLICE_PREDICATE, ConsistencyLevel.ANY);

verify(qosClient, times(1)).checkLimit();
verifyNoMoreInteractions(qosClient);
}

@Test
public void batchMutateChecksLimit() throws TException, LimitExceededException {
client.batch_mutate(ImmutableMap.of(), ConsistencyLevel.ANY);

verify(qosClient, times(1)).checkLimit();
verifyNoMoreInteractions(qosClient);
}

@Test
public void executeCqlQueryChecksLimit() throws TException, LimitExceededException {
ByteBuffer query = ByteBuffer.wrap("SELECT * FROM test_table LIMIT 1".getBytes(StandardCharsets.UTF_8));
client.execute_cql3_query(query, Compression.NONE, ConsistencyLevel.ANY);

verify(qosClient, times(1)).checkLimit();
verifyNoMoreInteractions(qosClient);
}

@Test
public void getRangeSlicesChecksLimit() throws TException, LimitExceededException {
client.get_range_slices(TEST_TABLE, SLICE_PREDICATE, new KeyRange(), ConsistencyLevel.ANY);

verify(qosClient, times(1)).checkLimit();
verifyNoMoreInteractions(qosClient);
}
}
Loading