diff --git a/atlasdb-api/build.gradle b/atlasdb-api/build.gradle index b182b2d9386..51fef02ad23 100644 --- a/atlasdb-api/build.gradle +++ b/atlasdb-api/build.gradle @@ -13,6 +13,7 @@ dependencies { compile group: 'javax.validation', name: 'validation-api' compile group: 'com.palantir.remoting-api', name: 'ssl-config' + compile group: 'com.palantir.remoting3', name: 'jaxrs-clients' processor group: 'org.immutables', name: 'value' diff --git a/atlasdb-api/src/main/java/com/palantir/atlasdb/keyvalue/api/QosClientBuilder.java b/atlasdb-api/src/main/java/com/palantir/atlasdb/keyvalue/api/QosClientBuilder.java new file mode 100644 index 00000000000..4042d5e30a3 --- /dev/null +++ b/atlasdb-api/src/main/java/com/palantir/atlasdb/keyvalue/api/QosClientBuilder.java @@ -0,0 +1,33 @@ +/* + * Copyright 2017 Palantir Technologies, Inc. All rights reserved. + * + * Licensed under the BSD-3 License (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://opensource.org/licenses/BSD-3-Clause + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.palantir.atlasdb.keyvalue.api; + +import java.util.Optional; + +import org.immutables.value.Value; + +import com.palantir.remoting.api.config.service.ServiceConfiguration; + +@Value.Immutable +public abstract class QosClientBuilder { + public abstract Optional qosServiceConfiguration(); + + @Value.Default + public String qosUserAgent() { + return "unknown-qos-agent"; + } +} diff --git a/atlasdb-api/src/main/java/com/palantir/atlasdb/spi/AtlasDbFactory.java b/atlasdb-api/src/main/java/com/palantir/atlasdb/spi/AtlasDbFactory.java index e44549f3584..5901327dfd2 100644 --- a/atlasdb-api/src/main/java/com/palantir/atlasdb/spi/AtlasDbFactory.java +++ b/atlasdb-api/src/main/java/com/palantir/atlasdb/spi/AtlasDbFactory.java @@ -21,7 +21,9 @@ import org.slf4j.LoggerFactory; import com.palantir.atlasdb.config.LeaderConfig; +import com.palantir.atlasdb.keyvalue.api.ImmutableQosClientBuilder; import com.palantir.atlasdb.keyvalue.api.KeyValueService; +import com.palantir.atlasdb.keyvalue.api.QosClientBuilder; import com.palantir.atlasdb.keyvalue.api.TableReference; import com.palantir.timestamp.TimestampService; import com.palantir.timestamp.TimestampStoreInvalidator; @@ -34,7 +36,8 @@ public interface AtlasDbFactory { default KeyValueService createRawKeyValueService( KeyValueServiceConfig config, Optional leaderConfig) { - return createRawKeyValueService(config, leaderConfig, Optional.empty(), DEFAULT_INITIALIZE_ASYNC); + return createRawKeyValueService(config, leaderConfig, Optional.empty(), DEFAULT_INITIALIZE_ASYNC, + ImmutableQosClientBuilder.builder().build()); } /** @@ -52,7 +55,8 @@ KeyValueService createRawKeyValueService( KeyValueServiceConfig config, Optional leaderConfig, Optional namespace, - boolean initializeAsync); + boolean initializeAsync, + QosClientBuilder qosClientBuilder); default TimestampService createTimestampService(KeyValueService rawKvs) { return createTimestampService(rawKvs, Optional.empty(), DEFAULT_INITIALIZE_ASYNC); diff --git a/atlasdb-cassandra/src/main/java/com/palantir/atlasdb/cassandra/CassandraAtlasDbFactory.java b/atlasdb-cassandra/src/main/java/com/palantir/atlasdb/cassandra/CassandraAtlasDbFactory.java index 6fbb85950a6..da536fc3126 100644 --- a/atlasdb-cassandra/src/main/java/com/palantir/atlasdb/cassandra/CassandraAtlasDbFactory.java +++ b/atlasdb-cassandra/src/main/java/com/palantir/atlasdb/cassandra/CassandraAtlasDbFactory.java @@ -23,6 +23,7 @@ import com.palantir.atlasdb.AtlasDbConstants; import com.palantir.atlasdb.config.LeaderConfig; import com.palantir.atlasdb.keyvalue.api.KeyValueService; +import com.palantir.atlasdb.keyvalue.api.QosClientBuilder; import com.palantir.atlasdb.keyvalue.api.TableReference; import com.palantir.atlasdb.keyvalue.cassandra.CassandraKeyValueService; import com.palantir.atlasdb.keyvalue.cassandra.CassandraKeyValueServiceImpl; @@ -43,13 +44,15 @@ public KeyValueService createRawKeyValueService( KeyValueServiceConfig config, Optional leaderConfig, Optional namespace, - boolean initializeAsync) { + boolean initializeAsync, + QosClientBuilder qosClientBuilder) { AtlasDbVersion.ensureVersionReported(); CassandraKeyValueServiceConfig preprocessedConfig = preprocessKvsConfig(config, namespace); return CassandraKeyValueServiceImpl.create( CassandraKeyValueServiceConfigManager.createSimpleManager(preprocessedConfig), leaderConfig, - initializeAsync); + initializeAsync, + qosClientBuilder); } @VisibleForTesting diff --git a/atlasdb-cassandra/src/main/java/com/palantir/atlasdb/keyvalue/cassandra/CassandraClient.java b/atlasdb-cassandra/src/main/java/com/palantir/atlasdb/keyvalue/cassandra/CassandraClient.java index ba3b05045fa..2fa3578d295 100644 --- a/atlasdb-cassandra/src/main/java/com/palantir/atlasdb/keyvalue/cassandra/CassandraClient.java +++ b/atlasdb-cassandra/src/main/java/com/palantir/atlasdb/keyvalue/cassandra/CassandraClient.java @@ -37,7 +37,7 @@ import org.apache.cassandra.thrift.UnavailableException; import org.apache.thrift.TException; -import com.palantir.atlasdb.qos.AtlasDbQosClient; +import com.palantir.atlasdb.qos.QosClient; import com.palantir.processors.AutoDelegate; /** @@ -47,9 +47,9 @@ @SuppressWarnings({"checkstyle:all", "DuplicateThrows"}) // :'( public class CassandraClient extends AutoDelegate_Client { private final Cassandra.Client delegate; - private final AtlasDbQosClient qosClient; + private final QosClient qosClient; - public CassandraClient(Cassandra.Client delegate, AtlasDbQosClient qosClient) { + public CassandraClient(Cassandra.Client delegate, QosClient qosClient) { super(delegate.getInputProtocol()); this.delegate = delegate; this.qosClient = qosClient; diff --git a/atlasdb-cassandra/src/main/java/com/palantir/atlasdb/keyvalue/cassandra/CassandraClientFactory.java b/atlasdb-cassandra/src/main/java/com/palantir/atlasdb/keyvalue/cassandra/CassandraClientFactory.java index c77f4e52e59..18cfe8fef20 100644 --- a/atlasdb-cassandra/src/main/java/com/palantir/atlasdb/keyvalue/cassandra/CassandraClientFactory.java +++ b/atlasdb-cassandra/src/main/java/com/palantir/atlasdb/keyvalue/cassandra/CassandraClientFactory.java @@ -46,7 +46,7 @@ import com.google.common.collect.Maps; import com.palantir.atlasdb.cassandra.CassandraCredentialsConfig; import com.palantir.atlasdb.cassandra.CassandraKeyValueServiceConfig; -import com.palantir.atlasdb.qos.AtlasDbQosClient; +import com.palantir.atlasdb.qos.QosClient; import com.palantir.common.exception.AtlasDbDependencyException; import com.palantir.logsafe.SafeArg; import com.palantir.logsafe.UnsafeArg; @@ -67,11 +67,11 @@ public SSLSocketFactory load(InetSocketAddress host) throws Exception { } }); - private final AtlasDbQosClient qosClient; + private final QosClient qosClient; private final InetSocketAddress addr; private final CassandraKeyValueServiceConfig config; - public CassandraClientFactory(AtlasDbQosClient qosClient, + public CassandraClientFactory(QosClient qosClient, InetSocketAddress addr, CassandraKeyValueServiceConfig config) { this.qosClient = qosClient; @@ -92,7 +92,7 @@ public Client create() throws Exception { } } - private static Cassandra.Client getClient(AtlasDbQosClient qosClient, + private static Cassandra.Client getClient(QosClient qosClient, InetSocketAddress addr, CassandraKeyValueServiceConfig config) throws Exception { Client ret = getWrappedClient(qosClient, addr, config); @@ -111,7 +111,7 @@ private static Cassandra.Client getClient(AtlasDbQosClient qosClient, } } - private static Client getWrappedClient(AtlasDbQosClient qosClient, + private static Client getWrappedClient(QosClient qosClient, InetSocketAddress addr, CassandraKeyValueServiceConfig config) throws TException { diff --git a/atlasdb-cassandra/src/main/java/com/palantir/atlasdb/keyvalue/cassandra/CassandraClientPoolImpl.java b/atlasdb-cassandra/src/main/java/com/palantir/atlasdb/keyvalue/cassandra/CassandraClientPoolImpl.java index 20a87312c79..214b6b4c6d0 100644 --- a/atlasdb-cassandra/src/main/java/com/palantir/atlasdb/keyvalue/cassandra/CassandraClientPoolImpl.java +++ b/atlasdb-cassandra/src/main/java/com/palantir/atlasdb/keyvalue/cassandra/CassandraClientPoolImpl.java @@ -31,7 +31,6 @@ import java.util.Optional; import java.util.Set; import java.util.TreeMap; -import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.ThreadLocalRandom; @@ -51,7 +50,6 @@ import org.slf4j.LoggerFactory; import org.slf4j.helpers.MessageFormatter; -import com.codahale.metrics.InstrumentedScheduledExecutorService; import com.codahale.metrics.Meter; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; @@ -74,11 +72,7 @@ import com.palantir.atlasdb.keyvalue.api.InsufficientConsistencyException; import com.palantir.atlasdb.keyvalue.api.TableReference; import com.palantir.atlasdb.keyvalue.cassandra.CassandraClientFactory.ClientCreationFailedException; -import com.palantir.atlasdb.qos.AtlasDbQosClient; -import com.palantir.atlasdb.qos.ImmutableQosServiceRuntimeConfig; -import com.palantir.atlasdb.qos.QosService; -import com.palantir.atlasdb.qos.QosServiceResource; -import com.palantir.atlasdb.util.AtlasDbMetrics; +import com.palantir.atlasdb.qos.QosClient; import com.palantir.atlasdb.util.MetricsManager; import com.palantir.common.base.FunctionCheckedException; import com.palantir.common.base.Throwables; @@ -157,7 +151,7 @@ public void shutdown() { private final CassandraKeyValueServiceConfig config; private final Map currentPools = Maps.newConcurrentMap(); private final StartupChecks startupChecks; - private final AtlasDbQosClient qosClient; + private final QosClient qosClient; private final ScheduledExecutorService refreshDaemon; private final MetricsManager metricsManager = new MetricsManager(); private final RequestMetrics aggregateMetrics = new RequestMetrics(null); @@ -170,36 +164,29 @@ public void shutdown() { @VisibleForTesting static CassandraClientPoolImpl createImplForTest(CassandraKeyValueServiceConfig config, StartupChecks startupChecks) { - return create(config, startupChecks, AtlasDbConstants.DEFAULT_INITIALIZE_ASYNC); + return create(config, startupChecks, AtlasDbConstants.DEFAULT_INITIALIZE_ASYNC, new FakeQosClient()); } public static CassandraClientPool create(CassandraKeyValueServiceConfig config) { - return create(config, AtlasDbConstants.DEFAULT_INITIALIZE_ASYNC); + return create(config, AtlasDbConstants.DEFAULT_INITIALIZE_ASYNC, new FakeQosClient()); } - public static CassandraClientPool create(CassandraKeyValueServiceConfig config, boolean initializeAsync) { - CassandraClientPoolImpl cassandraClientPool = create(config, - StartupChecks.RUN, initializeAsync); + public static CassandraClientPool create(CassandraKeyValueServiceConfig config, boolean initializeAsync, + QosClient qosClient) { + CassandraClientPoolImpl cassandraClientPool = create(config, StartupChecks.RUN, initializeAsync, qosClient); return cassandraClientPool.wrapper.isInitialized() ? cassandraClientPool : cassandraClientPool.wrapper; } private static CassandraClientPoolImpl create(CassandraKeyValueServiceConfig config, - StartupChecks startupChecks, boolean initializeAsync) { - // TODO eventually we'll want to pass this in from somewhere - QosService qosResource = new QosServiceResource(ImmutableQosServiceRuntimeConfig.builder().build()); - - ScheduledExecutorService scheduler = new InstrumentedScheduledExecutorService( - Executors.newSingleThreadScheduledExecutor(), - AtlasDbMetrics.getMetricRegistry(), - "qos-client-executor"); - AtlasDbQosClient qosClient = AtlasDbQosClient.create(scheduler, qosResource, config.getKeyspaceOrThrow()); + StartupChecks startupChecks, boolean initializeAsync, QosClient qosClient) { CassandraClientPoolImpl cassandraClientPool = new CassandraClientPoolImpl(config, startupChecks, qosClient); cassandraClientPool.wrapper.initialize(initializeAsync); return cassandraClientPool; } + private CassandraClientPoolImpl(CassandraKeyValueServiceConfig config, StartupChecks startupChecks, - AtlasDbQosClient qosClient) { + QosClient qosClient) { this.config = config; this.startupChecks = startupChecks; this.qosClient = qosClient; diff --git a/atlasdb-cassandra/src/main/java/com/palantir/atlasdb/keyvalue/cassandra/CassandraClientPoolingContainer.java b/atlasdb-cassandra/src/main/java/com/palantir/atlasdb/keyvalue/cassandra/CassandraClientPoolingContainer.java index dd48bebb3cd..e9e6b206a15 100644 --- a/atlasdb-cassandra/src/main/java/com/palantir/atlasdb/keyvalue/cassandra/CassandraClientPoolingContainer.java +++ b/atlasdb-cassandra/src/main/java/com/palantir/atlasdb/keyvalue/cassandra/CassandraClientPoolingContainer.java @@ -40,7 +40,7 @@ import com.google.common.base.Function; import com.google.common.base.MoreObjects; import com.palantir.atlasdb.cassandra.CassandraKeyValueServiceConfig; -import com.palantir.atlasdb.qos.AtlasDbQosClient; +import com.palantir.atlasdb.qos.QosClient; import com.palantir.atlasdb.util.MetricsManager; import com.palantir.common.base.FunctionCheckedException; import com.palantir.common.pooling.PoolingContainer; @@ -50,7 +50,7 @@ public class CassandraClientPoolingContainer implements PoolingContainer { private static final Logger log = LoggerFactory.getLogger(CassandraClientPoolingContainer.class); - private final AtlasDbQosClient qosClient; + private final QosClient qosClient; private final InetSocketAddress host; private final CassandraKeyValueServiceConfig config; private final MetricsManager metricsManager = new MetricsManager(); @@ -58,7 +58,7 @@ public class CassandraClientPoolingContainer implements PoolingContainer private final AtomicInteger openRequests = new AtomicInteger(); private final GenericObjectPool clientPool; - public CassandraClientPoolingContainer(AtlasDbQosClient qosClient, + public CassandraClientPoolingContainer(QosClient qosClient, InetSocketAddress host, CassandraKeyValueServiceConfig config, int poolNumber) { diff --git a/atlasdb-cassandra/src/main/java/com/palantir/atlasdb/keyvalue/cassandra/CassandraExpiringKeyValueService.java b/atlasdb-cassandra/src/main/java/com/palantir/atlasdb/keyvalue/cassandra/CassandraExpiringKeyValueService.java index 49058f401cb..dd2cd4d73a9 100644 --- a/atlasdb-cassandra/src/main/java/com/palantir/atlasdb/keyvalue/cassandra/CassandraExpiringKeyValueService.java +++ b/atlasdb-cassandra/src/main/java/com/palantir/atlasdb/keyvalue/cassandra/CassandraExpiringKeyValueService.java @@ -36,6 +36,7 @@ import com.palantir.atlasdb.config.LeaderConfig; import com.palantir.atlasdb.keyvalue.api.Cell; import com.palantir.atlasdb.keyvalue.api.ExpiringKeyValueService; +import com.palantir.atlasdb.keyvalue.api.ImmutableQosClientBuilder; import com.palantir.atlasdb.keyvalue.api.KeyAlreadyExistsException; import com.palantir.atlasdb.keyvalue.api.TableReference; import com.palantir.atlasdb.keyvalue.api.Value; @@ -68,7 +69,7 @@ private CassandraExpiringKeyValueService( Optional leaderConfig, boolean initializeAsync) { super(LoggerFactory.getLogger(CassandraKeyValueService.class), configManager, compactionManager, leaderConfig, - initializeAsync); + initializeAsync, ImmutableQosClientBuilder.builder().build()); } @Override diff --git a/atlasdb-cassandra/src/main/java/com/palantir/atlasdb/keyvalue/cassandra/CassandraKeyValueServiceImpl.java b/atlasdb-cassandra/src/main/java/com/palantir/atlasdb/keyvalue/cassandra/CassandraKeyValueServiceImpl.java index cfdf0b7b871..964ec371d20 100644 --- a/atlasdb-cassandra/src/main/java/com/palantir/atlasdb/keyvalue/cassandra/CassandraKeyValueServiceImpl.java +++ b/atlasdb-cassandra/src/main/java/com/palantir/atlasdb/keyvalue/cassandra/CassandraKeyValueServiceImpl.java @@ -31,7 +31,9 @@ import java.util.Random; import java.util.Set; import java.util.concurrent.Callable; +import java.util.concurrent.Executors; import java.util.concurrent.Future; +import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.stream.Collectors; @@ -53,6 +55,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.codahale.metrics.InstrumentedScheduledExecutorService; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Function; import com.google.common.base.Functions; @@ -94,8 +97,10 @@ import com.palantir.atlasdb.keyvalue.api.ClusterAvailabilityStatus; import com.palantir.atlasdb.keyvalue.api.ColumnSelection; import com.palantir.atlasdb.keyvalue.api.ImmutableCandidateCellForSweepingRequest; +import com.palantir.atlasdb.keyvalue.api.ImmutableQosClientBuilder; import com.palantir.atlasdb.keyvalue.api.InsufficientConsistencyException; import com.palantir.atlasdb.keyvalue.api.KeyAlreadyExistsException; +import com.palantir.atlasdb.keyvalue.api.QosClientBuilder; import com.palantir.atlasdb.keyvalue.api.RangeRequest; import com.palantir.atlasdb.keyvalue.api.RangeRequests; import com.palantir.atlasdb.keyvalue.api.RowColumnRangeIterator; @@ -121,8 +126,12 @@ import com.palantir.atlasdb.keyvalue.impl.KeyValueServices; import com.palantir.atlasdb.keyvalue.impl.LocalRowColumnRangeIterator; import com.palantir.atlasdb.logging.LoggingArgs; +import com.palantir.atlasdb.qos.AtlasDbQosClient; +import com.palantir.atlasdb.qos.QosClient; +import com.palantir.atlasdb.qos.QosService; import com.palantir.atlasdb.util.AnnotatedCallable; import com.palantir.atlasdb.util.AnnotationType; +import com.palantir.atlasdb.util.AtlasDbMetrics; import com.palantir.common.annotation.Idempotent; import com.palantir.common.base.ClosableIterator; import com.palantir.common.base.ClosableIterators; @@ -132,6 +141,8 @@ import com.palantir.logsafe.SafeArg; import com.palantir.logsafe.UnsafeArg; import com.palantir.processors.AutoDelegate; +import com.palantir.remoting3.clients.ClientConfigurations; +import com.palantir.remoting3.jaxrs.JaxRsClient; import com.palantir.util.paging.AbstractPagingIterable; import com.palantir.util.paging.SimpleTokenBackedResultsPage; import com.palantir.util.paging.TokenBackedBasicResultsPage; @@ -222,8 +233,16 @@ public static CassandraKeyValueService create( CassandraKeyValueServiceConfigManager configManager, Optional leaderConfig, boolean initializeAsync) { + return create(configManager, leaderConfig, initializeAsync, ImmutableQosClientBuilder.builder().build()); + } + + public static CassandraKeyValueService create( + CassandraKeyValueServiceConfigManager configManager, + Optional leaderConfig, + boolean initializeAsync, + QosClientBuilder qosClientBuilder) { return create(configManager, leaderConfig, LoggerFactory.getLogger(CassandraKeyValueService.class), - initializeAsync); + initializeAsync, qosClientBuilder); } @VisibleForTesting @@ -231,18 +250,21 @@ static CassandraKeyValueService create( CassandraKeyValueServiceConfigManager configManager, Optional leaderConfig, Logger log) { - return create(configManager, leaderConfig, log, AtlasDbConstants.DEFAULT_INITIALIZE_ASYNC); + return create(configManager, leaderConfig, log, AtlasDbConstants.DEFAULT_INITIALIZE_ASYNC, + ImmutableQosClientBuilder.builder().build()); } private static CassandraKeyValueService create( CassandraKeyValueServiceConfigManager configManager, Optional leaderConfig, Logger log, - boolean initializeAsync) { + boolean initializeAsync, + QosClientBuilder qosClientBuilder) { Optional compactionManager = CassandraJmxCompaction.createJmxCompactionManager(configManager); CassandraKeyValueServiceImpl keyValueService = - new CassandraKeyValueServiceImpl(log, configManager, compactionManager, leaderConfig, initializeAsync); + new CassandraKeyValueServiceImpl(log, configManager, compactionManager, leaderConfig, initializeAsync, + qosClientBuilder); keyValueService.wrapper.initialize(initializeAsync); return keyValueService.wrapper.isInitialized() ? keyValueService : keyValueService.wrapper; } @@ -251,12 +273,14 @@ protected CassandraKeyValueServiceImpl(Logger log, CassandraKeyValueServiceConfigManager configManager, Optional compactionManager, Optional leaderConfig, - boolean initializeAsync) { + boolean initializeAsync, + QosClientBuilder qosClientBuilder) { super(AbstractKeyValueService.createFixedThreadPool("Atlas Cassandra KVS", configManager.getConfig().poolSize() * configManager.getConfig().servers().size())); this.log = log; this.configManager = configManager; - this.clientPool = CassandraClientPoolImpl.create(configManager.getConfig(), initializeAsync); + QosClient qosClient = getQosClient(qosClientBuilder); + this.clientPool = CassandraClientPoolImpl.create(configManager.getConfig(), initializeAsync, qosClient); this.compactionManager = compactionManager; this.leaderConfig = leaderConfig; this.hiddenTables = new HiddenTables(); @@ -268,6 +292,27 @@ protected CassandraKeyValueServiceImpl(Logger log, this.cassandraTables = new CassandraTables(clientPool, configManager); } + private QosClient getQosClient(QosClientBuilder qosClientBuilder) { + if (qosClientBuilder.qosServiceConfiguration().isPresent()) { + return createAtlasDbQosClient(qosClientBuilder); + } else { + return new FakeQosClient(); + } + } + + private QosClient createAtlasDbQosClient(QosClientBuilder qosClientBuilder) { + Preconditions.checkState(qosClientBuilder.qosServiceConfiguration().isPresent(), + "Qos Service Config is required to create a AtlasDBQosClient."); + QosService qosService = JaxRsClient.create(QosService.class, + qosClientBuilder.qosUserAgent(), + ClientConfigurations.of(qosClientBuilder.qosServiceConfiguration().get())); + ScheduledExecutorService scheduler = new InstrumentedScheduledExecutorService( + Executors.newSingleThreadScheduledExecutor(), + AtlasDbMetrics.getMetricRegistry(), + "qos-client-executor"); + return new AtlasDbQosClient(qosService, scheduler, configManager.getConfig().getKeyspaceOrThrow()); + } + @Override public boolean isInitialized() { return wrapper.isInitialized(); diff --git a/atlasdb-cassandra/src/main/java/com/palantir/atlasdb/keyvalue/cassandra/FakeQosClient.java b/atlasdb-cassandra/src/main/java/com/palantir/atlasdb/keyvalue/cassandra/FakeQosClient.java new file mode 100644 index 00000000000..e38c75e9543 --- /dev/null +++ b/atlasdb-cassandra/src/main/java/com/palantir/atlasdb/keyvalue/cassandra/FakeQosClient.java @@ -0,0 +1,26 @@ +/* + * Copyright 2017 Palantir Technologies, Inc. All rights reserved. + * + * Licensed under the BSD-3 License (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://opensource.org/licenses/BSD-3-Clause + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.palantir.atlasdb.keyvalue.cassandra; + +import com.palantir.atlasdb.qos.QosClient; + +public class FakeQosClient implements QosClient { + @Override + public void checkLimit() { + // no op + } +} diff --git a/atlasdb-client/src/main/java/com/palantir/atlasdb/qos/AtlasDbQosClient.java b/atlasdb-client/src/main/java/com/palantir/atlasdb/qos/AtlasDbQosClient.java index 741844a3f5d..7ce05041a93 100644 --- a/atlasdb-client/src/main/java/com/palantir/atlasdb/qos/AtlasDbQosClient.java +++ b/atlasdb-client/src/main/java/com/palantir/atlasdb/qos/AtlasDbQosClient.java @@ -20,33 +20,24 @@ import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; -@SuppressWarnings("FinalClass") -public class AtlasDbQosClient { - QosService qosService; +public class AtlasDbQosClient implements QosClient { + private volatile long credits; - volatile long credits; - - private AtlasDbQosClient(QosService qosService) { - this.qosService = qosService; - } - - public static AtlasDbQosClient create(ScheduledExecutorService scheduler, QosService qosService, + public AtlasDbQosClient(QosService qosService, + ScheduledExecutorService limitRefresher, String clientName) { - AtlasDbQosClient client = new AtlasDbQosClient(qosService); - - scheduler.scheduleAtFixedRate(() -> { + limitRefresher.scheduleAtFixedRate(() -> { try { - client.credits = qosService.getLimit(clientName); + credits = qosService.getLimit(clientName); } catch (Exception e) { // do nothing } - }, 0L, 1L, TimeUnit.SECONDS); - - return client; + }, 0L, 60L, TimeUnit.SECONDS); } // The KVS layer should call this before every read/write operation // Currently all operations are treated equally; each uses up a unit of credits + @Override public void checkLimit() { // always return immediately - i.e. no backoff // TODO if soft-limited, pause diff --git a/atlasdb-client/src/main/java/com/palantir/atlasdb/qos/QosClient.java b/atlasdb-client/src/main/java/com/palantir/atlasdb/qos/QosClient.java new file mode 100644 index 00000000000..cfb82619a6d --- /dev/null +++ b/atlasdb-client/src/main/java/com/palantir/atlasdb/qos/QosClient.java @@ -0,0 +1,21 @@ +/* + * Copyright 2017 Palantir Technologies, Inc. All rights reserved. + * + * Licensed under the BSD-3 License (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://opensource.org/licenses/BSD-3-Clause + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.palantir.atlasdb.qos; + +public interface QosClient { + void checkLimit(); +} diff --git a/atlasdb-client/src/test/java/com/palantir/atlasdb/qos/AtlasDbQosClientTest.java b/atlasdb-client/src/test/java/com/palantir/atlasdb/qos/AtlasDbQosClientTest.java index f9eacfd3c96..b224c25bcfe 100644 --- a/atlasdb-client/src/test/java/com/palantir/atlasdb/qos/AtlasDbQosClientTest.java +++ b/atlasdb-client/src/test/java/com/palantir/atlasdb/qos/AtlasDbQosClientTest.java @@ -37,14 +37,14 @@ public void setUp() { @Test public void doesNotBackOff() { - AtlasDbQosClient qosClient = AtlasDbQosClient.create(scheduler, qosService, "test-client"); + AtlasDbQosClient qosClient = new AtlasDbQosClient(qosService, scheduler, "test-client"); scheduler.tick(1L, TimeUnit.MILLISECONDS); qosClient.checkLimit(); } @Test public void throwsAfterLimitExceeded() { - AtlasDbQosClient qosClient = AtlasDbQosClient.create(scheduler, qosService, "test-client"); + AtlasDbQosClient qosClient = new AtlasDbQosClient(qosService, scheduler, "test-client"); scheduler.tick(1L, TimeUnit.MILLISECONDS); qosClient.checkLimit(); @@ -53,15 +53,15 @@ public void throwsAfterLimitExceeded() { @Test public void canCheckAgainAfterRefreshPeriod() { - AtlasDbQosClient qosClient = AtlasDbQosClient.create(scheduler, qosService, "test-client"); + AtlasDbQosClient qosClient = new AtlasDbQosClient(qosService, scheduler, "test-client"); scheduler.tick(1L, TimeUnit.MILLISECONDS); qosClient.checkLimit(); - assertThatThrownBy(qosClient::checkLimit).isInstanceOf(RuntimeException.class); + assertThatThrownBy(qosClient::checkLimit) + .isInstanceOf(RuntimeException.class).hasMessage("Rate limit exceeded"); - scheduler.tick(1L, TimeUnit.SECONDS); + scheduler.tick(60L, TimeUnit.SECONDS); qosClient.checkLimit(); } - } diff --git a/atlasdb-config/src/main/java/com/palantir/atlasdb/config/AtlasDbConfig.java b/atlasdb-config/src/main/java/com/palantir/atlasdb/config/AtlasDbConfig.java index 6161854ad4c..93501b1be7a 100644 --- a/atlasdb-config/src/main/java/com/palantir/atlasdb/config/AtlasDbConfig.java +++ b/atlasdb-config/src/main/java/com/palantir/atlasdb/config/AtlasDbConfig.java @@ -29,6 +29,7 @@ import com.palantir.atlasdb.memory.InMemoryAtlasDbConfig; import com.palantir.atlasdb.spi.KeyValueServiceConfig; import com.palantir.exception.NotInitializedException; +import com.palantir.remoting.api.config.service.ServiceConfiguration; @JsonDeserialize(as = ImmutableAtlasDbConfig.class) @JsonSerialize(as = ImmutableAtlasDbConfig.class) @@ -256,6 +257,8 @@ public int getDefaultLockTimeoutSeconds() { return AtlasDbConstants.DEFAULT_LOCK_TIMEOUT_SECONDS; } + public abstract Optional getQosServiceConfiguration(); + @Value.Check protected final void check() { checkLeaderAndTimelockBlocks(); diff --git a/atlasdb-config/src/main/java/com/palantir/atlasdb/factory/ServiceDiscoveringAtlasSupplier.java b/atlasdb-config/src/main/java/com/palantir/atlasdb/factory/ServiceDiscoveringAtlasSupplier.java index 861b877dd3a..4a973e8f09c 100644 --- a/atlasdb-config/src/main/java/com/palantir/atlasdb/factory/ServiceDiscoveringAtlasSupplier.java +++ b/atlasdb-config/src/main/java/com/palantir/atlasdb/factory/ServiceDiscoveringAtlasSupplier.java @@ -35,7 +35,9 @@ import com.google.common.base.Suppliers; import com.palantir.atlasdb.AtlasDbConstants; import com.palantir.atlasdb.config.LeaderConfig; +import com.palantir.atlasdb.keyvalue.api.ImmutableQosClientBuilder; import com.palantir.atlasdb.keyvalue.api.KeyValueService; +import com.palantir.atlasdb.keyvalue.api.QosClientBuilder; import com.palantir.atlasdb.keyvalue.api.TableReference; import com.palantir.atlasdb.spi.AtlasDbFactory; import com.palantir.atlasdb.spi.KeyValueServiceConfig; @@ -54,9 +56,11 @@ public class ServiceDiscoveringAtlasSupplier { private final Supplier keyValueService; private final Supplier timestampService; private final Supplier timestampStoreInvalidator; + private final QosClientBuilder qosClientBuilder; public ServiceDiscoveringAtlasSupplier(KeyValueServiceConfig config, Optional leaderConfig) { - this(config, leaderConfig, Optional.empty(), AtlasDbConstants.DEFAULT_INITIALIZE_ASYNC); + this(config, leaderConfig, Optional.empty(), AtlasDbConstants.DEFAULT_INITIALIZE_ASYNC, + ImmutableQosClientBuilder.builder().build()); } public ServiceDiscoveringAtlasSupplier( @@ -64,15 +68,17 @@ public ServiceDiscoveringAtlasSupplier( Optional leaderConfig, Optional namespace, Optional timestampTable) { - this(config, leaderConfig, namespace, timestampTable, AtlasDbConstants.DEFAULT_INITIALIZE_ASYNC); + this(config, leaderConfig, namespace, timestampTable, AtlasDbConstants.DEFAULT_INITIALIZE_ASYNC, + ImmutableQosClientBuilder.builder().build()); } public ServiceDiscoveringAtlasSupplier( KeyValueServiceConfig config, Optional leaderConfig, Optional namespace, - boolean initializeAsync) { - this(config, leaderConfig, namespace, Optional.empty(), initializeAsync); + boolean initializeAsync, + QosClientBuilder qosClientBuilder) { + this(config, leaderConfig, namespace, Optional.empty(), initializeAsync, qosClientBuilder); } public ServiceDiscoveringAtlasSupplier( @@ -80,9 +86,11 @@ public ServiceDiscoveringAtlasSupplier( Optional leaderConfig, Optional namespace, Optional timestampTable, - boolean initializeAsync) { + boolean initializeAsync, + QosClientBuilder qosClientBuilderPassed) { this.config = config; this.leaderConfig = leaderConfig; + this.qosClientBuilder = qosClientBuilderPassed; AtlasDbFactory atlasFactory = StreamSupport.stream(loader.spliterator(), false) .filter(producesCorrectType()) @@ -92,7 +100,12 @@ public ServiceDiscoveringAtlasSupplier( + " Have you annotated it with @AutoService(AtlasDbFactory.class)?" )); keyValueService = Suppliers.memoize( - () -> atlasFactory.createRawKeyValueService(config, leaderConfig, namespace, initializeAsync)); + () -> atlasFactory.createRawKeyValueService( + config, + leaderConfig, + namespace, + initializeAsync, + qosClientBuilder)); timestampService = () -> atlasFactory.createTimestampService(getKeyValueService(), timestampTable, initializeAsync); timestampStoreInvalidator = () -> atlasFactory.createTimestampStoreInvalidator(getKeyValueService()); diff --git a/atlasdb-config/src/main/java/com/palantir/atlasdb/factory/TransactionManagers.java b/atlasdb-config/src/main/java/com/palantir/atlasdb/factory/TransactionManagers.java index c51db40aed2..f0019127dd6 100644 --- a/atlasdb-config/src/main/java/com/palantir/atlasdb/factory/TransactionManagers.java +++ b/atlasdb-config/src/main/java/com/palantir/atlasdb/factory/TransactionManagers.java @@ -57,6 +57,7 @@ import com.palantir.atlasdb.factory.timestamp.DecoratedTimelockServices; import com.palantir.atlasdb.http.AtlasDbFeignTargetFactory; import com.palantir.atlasdb.http.UserAgents; +import com.palantir.atlasdb.keyvalue.api.ImmutableQosClientBuilder; import com.palantir.atlasdb.keyvalue.api.KeyValueService; import com.palantir.atlasdb.keyvalue.impl.ProfilingKeyValueService; import com.palantir.atlasdb.keyvalue.impl.SweepStatsKeyValueService; @@ -304,9 +305,18 @@ SerializableTransactionManager serializable() { java.util.function.Supplier runtimeConfigSupplier = () -> runtimeConfigSupplier().get().orElse(defaultRuntime); + ImmutableQosClientBuilder qosServiceClientBuilder = ImmutableQosClientBuilder.builder() + .qosServiceConfiguration(config.getQosServiceConfiguration()) + .qosUserAgent(userAgent()) + .build(); + ServiceDiscoveringAtlasSupplier atlasFactory = - new ServiceDiscoveringAtlasSupplier(config.keyValueService(), config.leader(), config.namespace(), - config.initializeAsync()); + new ServiceDiscoveringAtlasSupplier( + config.keyValueService(), + config.leader(), + config.namespace(), + config.initializeAsync(), + qosServiceClientBuilder); KeyValueService rawKvs = atlasFactory.getKeyValueService(); LockRequest.setDefaultLockTimeout( diff --git a/atlasdb-config/src/test/java/com/palantir/atlasdb/factory/AutoServiceAnnotatedAtlasDbFactory.java b/atlasdb-config/src/test/java/com/palantir/atlasdb/factory/AutoServiceAnnotatedAtlasDbFactory.java index c7764bfa828..6d7dd9845ba 100644 --- a/atlasdb-config/src/test/java/com/palantir/atlasdb/factory/AutoServiceAnnotatedAtlasDbFactory.java +++ b/atlasdb-config/src/test/java/com/palantir/atlasdb/factory/AutoServiceAnnotatedAtlasDbFactory.java @@ -27,6 +27,7 @@ import com.google.common.collect.Lists; import com.palantir.atlasdb.config.LeaderConfig; import com.palantir.atlasdb.keyvalue.api.KeyValueService; +import com.palantir.atlasdb.keyvalue.api.QosClientBuilder; import com.palantir.atlasdb.keyvalue.api.TableReference; import com.palantir.atlasdb.spi.AtlasDbFactory; import com.palantir.atlasdb.spi.KeyValueServiceConfig; @@ -51,7 +52,8 @@ public KeyValueService createRawKeyValueService( KeyValueServiceConfig config, Optional leaderConfig, Optional unused, - boolean initializeAsync) { + boolean initializeAsync, + QosClientBuilder unusedQosClientBuilder) { if (initializeAsync) { log.warn("Asynchronous initialization not implemented, will initialize synchronousy."); } diff --git a/atlasdb-dagger/src/main/java/com/palantir/atlasdb/services/ServicesConfig.java b/atlasdb-dagger/src/main/java/com/palantir/atlasdb/services/ServicesConfig.java index 1f0a3e3a770..055e665ba72 100644 --- a/atlasdb-dagger/src/main/java/com/palantir/atlasdb/services/ServicesConfig.java +++ b/atlasdb-dagger/src/main/java/com/palantir/atlasdb/services/ServicesConfig.java @@ -26,6 +26,7 @@ import com.palantir.atlasdb.config.AtlasDbConfig; import com.palantir.atlasdb.config.AtlasDbRuntimeConfig; import com.palantir.atlasdb.factory.ServiceDiscoveringAtlasSupplier; +import com.palantir.atlasdb.keyvalue.api.ImmutableQosClientBuilder; import com.palantir.atlasdb.table.description.Schema; @Value.Immutable @@ -41,7 +42,10 @@ public ServiceDiscoveringAtlasSupplier atlasDbSupplier() { atlasDbConfig().keyValueService(), atlasDbConfig().leader(), atlasDbConfig().namespace(), - atlasDbConfig().initializeAsync()); + atlasDbConfig().initializeAsync(), + ImmutableQosClientBuilder.builder() + .qosServiceConfiguration(atlasDbConfig().getQosServiceConfiguration()) + .build()); } @Value.Default diff --git a/atlasdb-dbkvs/src/main/java/com/palantir/atlasdb/keyvalue/dbkvs/DbAtlasDbFactory.java b/atlasdb-dbkvs/src/main/java/com/palantir/atlasdb/keyvalue/dbkvs/DbAtlasDbFactory.java index 50f030ff499..4387f413d77 100644 --- a/atlasdb-dbkvs/src/main/java/com/palantir/atlasdb/keyvalue/dbkvs/DbAtlasDbFactory.java +++ b/atlasdb-dbkvs/src/main/java/com/palantir/atlasdb/keyvalue/dbkvs/DbAtlasDbFactory.java @@ -25,6 +25,7 @@ import com.palantir.atlasdb.AtlasDbConstants; import com.palantir.atlasdb.config.LeaderConfig; import com.palantir.atlasdb.keyvalue.api.KeyValueService; +import com.palantir.atlasdb.keyvalue.api.QosClientBuilder; import com.palantir.atlasdb.keyvalue.api.TableReference; import com.palantir.atlasdb.keyvalue.dbkvs.impl.ConnectionManagerAwareDbKvs; import com.palantir.atlasdb.keyvalue.dbkvs.timestamp.InDbTimestampBoundStore; @@ -57,7 +58,8 @@ public KeyValueService createRawKeyValueService( KeyValueServiceConfig config, Optional leaderConfig, Optional namespace, - boolean initializeAsync) { + boolean initializeAsync, + QosClientBuilder unusedQosClientBuilder) { if (initializeAsync) { log.warn("Asynchronous initialization not implemented, will initialize synchronousy."); } diff --git a/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/memory/InMemoryAtlasDbFactory.java b/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/memory/InMemoryAtlasDbFactory.java index ef197e1472d..6198ba3d285 100644 --- a/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/memory/InMemoryAtlasDbFactory.java +++ b/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/memory/InMemoryAtlasDbFactory.java @@ -33,6 +33,7 @@ import com.palantir.atlasdb.cleaner.api.OnCleanupTask; import com.palantir.atlasdb.config.LeaderConfig; import com.palantir.atlasdb.keyvalue.api.KeyValueService; +import com.palantir.atlasdb.keyvalue.api.QosClientBuilder; import com.palantir.atlasdb.keyvalue.api.TableReference; import com.palantir.atlasdb.keyvalue.impl.InMemoryKeyValueService; import com.palantir.atlasdb.schema.AtlasSchema; @@ -107,7 +108,8 @@ public InMemoryKeyValueService createRawKeyValueService( KeyValueServiceConfig config, Optional leaderConfig, Optional unused, - boolean initializeAsync) { + boolean initializeAsync, + QosClientBuilder unusedQosClientBuilder) { if (initializeAsync) { log.warn("Asynchronous initialization not implemented, will initialize synchronousy."); } diff --git a/atlasdb-jdbc/src/main/java/com/palantir/atlasdb/keyvalue/jdbc/JdbcAtlasDbFactory.java b/atlasdb-jdbc/src/main/java/com/palantir/atlasdb/keyvalue/jdbc/JdbcAtlasDbFactory.java index 3048625f745..54cf4a990fd 100644 --- a/atlasdb-jdbc/src/main/java/com/palantir/atlasdb/keyvalue/jdbc/JdbcAtlasDbFactory.java +++ b/atlasdb-jdbc/src/main/java/com/palantir/atlasdb/keyvalue/jdbc/JdbcAtlasDbFactory.java @@ -25,6 +25,7 @@ import com.palantir.atlasdb.AtlasDbConstants; import com.palantir.atlasdb.config.LeaderConfig; import com.palantir.atlasdb.keyvalue.api.KeyValueService; +import com.palantir.atlasdb.keyvalue.api.QosClientBuilder; import com.palantir.atlasdb.keyvalue.api.TableReference; import com.palantir.atlasdb.spi.AtlasDbFactory; import com.palantir.atlasdb.spi.KeyValueServiceConfig; @@ -55,7 +56,8 @@ public KeyValueService createRawKeyValueService( KeyValueServiceConfig config, Optional leaderConfig, Optional unused, - boolean initializeAsync) { + boolean initializeAsync, + QosClientBuilder unusedQosClientBuilder) { if (initializeAsync) { log.warn("Asynchronous initialization not implemented, will initialize synchronousy."); } diff --git a/qos-service-api/build.gradle b/qos-service-api/build.gradle index ec406d446dd..0a994ae64ba 100644 --- a/qos-service-api/build.gradle +++ b/qos-service-api/build.gradle @@ -13,6 +13,7 @@ dependencies { compile group: 'javax.ws.rs', name: 'javax.ws.rs-api' compile group: 'com.palantir.safe-logging', name: 'safe-logging' compile group: 'com.fasterxml.jackson.core', name: 'jackson-databind' + compile group: 'com.palantir.remoting3', name: 'jaxrs-clients' processor group: 'org.immutables', name: 'value' diff --git a/timelock-server/src/main/java/com/palantir/atlasdb/timelock/TimeLockServerLauncher.java b/timelock-server/src/main/java/com/palantir/atlasdb/timelock/TimeLockServerLauncher.java index 764d75fbc2b..84441080983 100644 --- a/timelock-server/src/main/java/com/palantir/atlasdb/timelock/TimeLockServerLauncher.java +++ b/timelock-server/src/main/java/com/palantir/atlasdb/timelock/TimeLockServerLauncher.java @@ -19,7 +19,6 @@ import com.codahale.metrics.MetricRegistry; import com.fasterxml.jackson.datatype.jdk8.Jdk8Module; -import com.palantir.atlasdb.qos.QosServiceResource; import com.palantir.atlasdb.timelock.config.CombinedTimeLockServerConfiguration; import com.palantir.atlasdb.timelock.config.TimeLockConfigMigrator; import com.palantir.atlasdb.timelock.config.TimeLockServerConfiguration;