From 297ffd1ae5fd094e56c6fa864a4eb5dfc215bc17 Mon Sep 17 00:00:00 2001 From: Himangi Saraogi Date: Wed, 8 Nov 2017 11:35:11 +0000 Subject: [PATCH] Create one qosCLient for each service QosClientBuilder hooked up to KVS create Create the QosClient in CassandraClientPoolImpl if the config is specified. Create FakeQosClient if the config is not specified Cleanup get broken tests to pass --- atlasdb-api/build.gradle | 1 + .../keyvalue/api/QosClientBuilder.java | 33 +++++++++++ .../palantir/atlasdb/spi/AtlasDbFactory.java | 8 ++- .../cassandra/CassandraAtlasDbFactory.java | 7 ++- .../keyvalue/cassandra/CassandraClient.java | 6 +- .../cassandra/CassandraClientFactory.java | 10 ++-- .../cassandra/CassandraClientPoolImpl.java | 33 ++++------- .../CassandraClientPoolingContainer.java | 6 +- .../CassandraExpiringKeyValueService.java | 3 +- .../CassandraKeyValueServiceImpl.java | 57 +++++++++++++++++-- .../keyvalue/cassandra/FakeQosClient.java | 26 +++++++++ .../atlasdb/qos/AtlasDbQosClient.java | 25 +++----- .../com/palantir/atlasdb/qos/QosClient.java | 21 +++++++ .../atlasdb/qos/AtlasDbQosClientTest.java | 12 ++-- .../atlasdb/config/AtlasDbConfig.java | 3 + .../ServiceDiscoveringAtlasSupplier.java | 25 ++++++-- .../atlasdb/factory/TransactionManagers.java | 14 ++++- .../AutoServiceAnnotatedAtlasDbFactory.java | 4 +- .../atlasdb/services/ServicesConfig.java | 6 +- .../keyvalue/dbkvs/DbAtlasDbFactory.java | 4 +- .../memory/InMemoryAtlasDbFactory.java | 4 +- .../keyvalue/jdbc/JdbcAtlasDbFactory.java | 4 +- qos-service-api/build.gradle | 1 + .../timelock/TimeLockServerLauncher.java | 1 - 24 files changed, 232 insertions(+), 82 deletions(-) create mode 100644 atlasdb-api/src/main/java/com/palantir/atlasdb/keyvalue/api/QosClientBuilder.java create mode 100644 atlasdb-cassandra/src/main/java/com/palantir/atlasdb/keyvalue/cassandra/FakeQosClient.java create mode 100644 atlasdb-client/src/main/java/com/palantir/atlasdb/qos/QosClient.java diff --git a/atlasdb-api/build.gradle b/atlasdb-api/build.gradle index b182b2d9386..51fef02ad23 100644 --- a/atlasdb-api/build.gradle +++ b/atlasdb-api/build.gradle @@ -13,6 +13,7 @@ dependencies { compile group: 'javax.validation', name: 'validation-api' compile group: 'com.palantir.remoting-api', name: 'ssl-config' + compile group: 'com.palantir.remoting3', name: 'jaxrs-clients' processor group: 'org.immutables', name: 'value' diff --git a/atlasdb-api/src/main/java/com/palantir/atlasdb/keyvalue/api/QosClientBuilder.java b/atlasdb-api/src/main/java/com/palantir/atlasdb/keyvalue/api/QosClientBuilder.java new file mode 100644 index 00000000000..4042d5e30a3 --- /dev/null +++ b/atlasdb-api/src/main/java/com/palantir/atlasdb/keyvalue/api/QosClientBuilder.java @@ -0,0 +1,33 @@ +/* + * Copyright 2017 Palantir Technologies, Inc. All rights reserved. + * + * Licensed under the BSD-3 License (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://opensource.org/licenses/BSD-3-Clause + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.palantir.atlasdb.keyvalue.api; + +import java.util.Optional; + +import org.immutables.value.Value; + +import com.palantir.remoting.api.config.service.ServiceConfiguration; + +@Value.Immutable +public abstract class QosClientBuilder { + public abstract Optional qosServiceConfiguration(); + + @Value.Default + public String qosUserAgent() { + return "unknown-qos-agent"; + } +} diff --git a/atlasdb-api/src/main/java/com/palantir/atlasdb/spi/AtlasDbFactory.java b/atlasdb-api/src/main/java/com/palantir/atlasdb/spi/AtlasDbFactory.java index e44549f3584..5901327dfd2 100644 --- a/atlasdb-api/src/main/java/com/palantir/atlasdb/spi/AtlasDbFactory.java +++ b/atlasdb-api/src/main/java/com/palantir/atlasdb/spi/AtlasDbFactory.java @@ -21,7 +21,9 @@ import org.slf4j.LoggerFactory; import com.palantir.atlasdb.config.LeaderConfig; +import com.palantir.atlasdb.keyvalue.api.ImmutableQosClientBuilder; import com.palantir.atlasdb.keyvalue.api.KeyValueService; +import com.palantir.atlasdb.keyvalue.api.QosClientBuilder; import com.palantir.atlasdb.keyvalue.api.TableReference; import com.palantir.timestamp.TimestampService; import com.palantir.timestamp.TimestampStoreInvalidator; @@ -34,7 +36,8 @@ public interface AtlasDbFactory { default KeyValueService createRawKeyValueService( KeyValueServiceConfig config, Optional leaderConfig) { - return createRawKeyValueService(config, leaderConfig, Optional.empty(), DEFAULT_INITIALIZE_ASYNC); + return createRawKeyValueService(config, leaderConfig, Optional.empty(), DEFAULT_INITIALIZE_ASYNC, + ImmutableQosClientBuilder.builder().build()); } /** @@ -52,7 +55,8 @@ KeyValueService createRawKeyValueService( KeyValueServiceConfig config, Optional leaderConfig, Optional namespace, - boolean initializeAsync); + boolean initializeAsync, + QosClientBuilder qosClientBuilder); default TimestampService createTimestampService(KeyValueService rawKvs) { return createTimestampService(rawKvs, Optional.empty(), DEFAULT_INITIALIZE_ASYNC); diff --git a/atlasdb-cassandra/src/main/java/com/palantir/atlasdb/cassandra/CassandraAtlasDbFactory.java b/atlasdb-cassandra/src/main/java/com/palantir/atlasdb/cassandra/CassandraAtlasDbFactory.java index 6fbb85950a6..da536fc3126 100644 --- a/atlasdb-cassandra/src/main/java/com/palantir/atlasdb/cassandra/CassandraAtlasDbFactory.java +++ b/atlasdb-cassandra/src/main/java/com/palantir/atlasdb/cassandra/CassandraAtlasDbFactory.java @@ -23,6 +23,7 @@ import com.palantir.atlasdb.AtlasDbConstants; import com.palantir.atlasdb.config.LeaderConfig; import com.palantir.atlasdb.keyvalue.api.KeyValueService; +import com.palantir.atlasdb.keyvalue.api.QosClientBuilder; import com.palantir.atlasdb.keyvalue.api.TableReference; import com.palantir.atlasdb.keyvalue.cassandra.CassandraKeyValueService; import com.palantir.atlasdb.keyvalue.cassandra.CassandraKeyValueServiceImpl; @@ -43,13 +44,15 @@ public KeyValueService createRawKeyValueService( KeyValueServiceConfig config, Optional leaderConfig, Optional namespace, - boolean initializeAsync) { + boolean initializeAsync, + QosClientBuilder qosClientBuilder) { AtlasDbVersion.ensureVersionReported(); CassandraKeyValueServiceConfig preprocessedConfig = preprocessKvsConfig(config, namespace); return CassandraKeyValueServiceImpl.create( CassandraKeyValueServiceConfigManager.createSimpleManager(preprocessedConfig), leaderConfig, - initializeAsync); + initializeAsync, + qosClientBuilder); } @VisibleForTesting diff --git a/atlasdb-cassandra/src/main/java/com/palantir/atlasdb/keyvalue/cassandra/CassandraClient.java b/atlasdb-cassandra/src/main/java/com/palantir/atlasdb/keyvalue/cassandra/CassandraClient.java index ba3b05045fa..2fa3578d295 100644 --- a/atlasdb-cassandra/src/main/java/com/palantir/atlasdb/keyvalue/cassandra/CassandraClient.java +++ b/atlasdb-cassandra/src/main/java/com/palantir/atlasdb/keyvalue/cassandra/CassandraClient.java @@ -37,7 +37,7 @@ import org.apache.cassandra.thrift.UnavailableException; import org.apache.thrift.TException; -import com.palantir.atlasdb.qos.AtlasDbQosClient; +import com.palantir.atlasdb.qos.QosClient; import com.palantir.processors.AutoDelegate; /** @@ -47,9 +47,9 @@ @SuppressWarnings({"checkstyle:all", "DuplicateThrows"}) // :'( public class CassandraClient extends AutoDelegate_Client { private final Cassandra.Client delegate; - private final AtlasDbQosClient qosClient; + private final QosClient qosClient; - public CassandraClient(Cassandra.Client delegate, AtlasDbQosClient qosClient) { + public CassandraClient(Cassandra.Client delegate, QosClient qosClient) { super(delegate.getInputProtocol()); this.delegate = delegate; this.qosClient = qosClient; diff --git a/atlasdb-cassandra/src/main/java/com/palantir/atlasdb/keyvalue/cassandra/CassandraClientFactory.java b/atlasdb-cassandra/src/main/java/com/palantir/atlasdb/keyvalue/cassandra/CassandraClientFactory.java index c77f4e52e59..18cfe8fef20 100644 --- a/atlasdb-cassandra/src/main/java/com/palantir/atlasdb/keyvalue/cassandra/CassandraClientFactory.java +++ b/atlasdb-cassandra/src/main/java/com/palantir/atlasdb/keyvalue/cassandra/CassandraClientFactory.java @@ -46,7 +46,7 @@ import com.google.common.collect.Maps; import com.palantir.atlasdb.cassandra.CassandraCredentialsConfig; import com.palantir.atlasdb.cassandra.CassandraKeyValueServiceConfig; -import com.palantir.atlasdb.qos.AtlasDbQosClient; +import com.palantir.atlasdb.qos.QosClient; import com.palantir.common.exception.AtlasDbDependencyException; import com.palantir.logsafe.SafeArg; import com.palantir.logsafe.UnsafeArg; @@ -67,11 +67,11 @@ public SSLSocketFactory load(InetSocketAddress host) throws Exception { } }); - private final AtlasDbQosClient qosClient; + private final QosClient qosClient; private final InetSocketAddress addr; private final CassandraKeyValueServiceConfig config; - public CassandraClientFactory(AtlasDbQosClient qosClient, + public CassandraClientFactory(QosClient qosClient, InetSocketAddress addr, CassandraKeyValueServiceConfig config) { this.qosClient = qosClient; @@ -92,7 +92,7 @@ public Client create() throws Exception { } } - private static Cassandra.Client getClient(AtlasDbQosClient qosClient, + private static Cassandra.Client getClient(QosClient qosClient, InetSocketAddress addr, CassandraKeyValueServiceConfig config) throws Exception { Client ret = getWrappedClient(qosClient, addr, config); @@ -111,7 +111,7 @@ private static Cassandra.Client getClient(AtlasDbQosClient qosClient, } } - private static Client getWrappedClient(AtlasDbQosClient qosClient, + private static Client getWrappedClient(QosClient qosClient, InetSocketAddress addr, CassandraKeyValueServiceConfig config) throws TException { diff --git a/atlasdb-cassandra/src/main/java/com/palantir/atlasdb/keyvalue/cassandra/CassandraClientPoolImpl.java b/atlasdb-cassandra/src/main/java/com/palantir/atlasdb/keyvalue/cassandra/CassandraClientPoolImpl.java index 20a87312c79..214b6b4c6d0 100644 --- a/atlasdb-cassandra/src/main/java/com/palantir/atlasdb/keyvalue/cassandra/CassandraClientPoolImpl.java +++ b/atlasdb-cassandra/src/main/java/com/palantir/atlasdb/keyvalue/cassandra/CassandraClientPoolImpl.java @@ -31,7 +31,6 @@ import java.util.Optional; import java.util.Set; import java.util.TreeMap; -import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.ThreadLocalRandom; @@ -51,7 +50,6 @@ import org.slf4j.LoggerFactory; import org.slf4j.helpers.MessageFormatter; -import com.codahale.metrics.InstrumentedScheduledExecutorService; import com.codahale.metrics.Meter; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; @@ -74,11 +72,7 @@ import com.palantir.atlasdb.keyvalue.api.InsufficientConsistencyException; import com.palantir.atlasdb.keyvalue.api.TableReference; import com.palantir.atlasdb.keyvalue.cassandra.CassandraClientFactory.ClientCreationFailedException; -import com.palantir.atlasdb.qos.AtlasDbQosClient; -import com.palantir.atlasdb.qos.ImmutableQosServiceRuntimeConfig; -import com.palantir.atlasdb.qos.QosService; -import com.palantir.atlasdb.qos.QosServiceResource; -import com.palantir.atlasdb.util.AtlasDbMetrics; +import com.palantir.atlasdb.qos.QosClient; import com.palantir.atlasdb.util.MetricsManager; import com.palantir.common.base.FunctionCheckedException; import com.palantir.common.base.Throwables; @@ -157,7 +151,7 @@ public void shutdown() { private final CassandraKeyValueServiceConfig config; private final Map currentPools = Maps.newConcurrentMap(); private final StartupChecks startupChecks; - private final AtlasDbQosClient qosClient; + private final QosClient qosClient; private final ScheduledExecutorService refreshDaemon; private final MetricsManager metricsManager = new MetricsManager(); private final RequestMetrics aggregateMetrics = new RequestMetrics(null); @@ -170,36 +164,29 @@ public void shutdown() { @VisibleForTesting static CassandraClientPoolImpl createImplForTest(CassandraKeyValueServiceConfig config, StartupChecks startupChecks) { - return create(config, startupChecks, AtlasDbConstants.DEFAULT_INITIALIZE_ASYNC); + return create(config, startupChecks, AtlasDbConstants.DEFAULT_INITIALIZE_ASYNC, new FakeQosClient()); } public static CassandraClientPool create(CassandraKeyValueServiceConfig config) { - return create(config, AtlasDbConstants.DEFAULT_INITIALIZE_ASYNC); + return create(config, AtlasDbConstants.DEFAULT_INITIALIZE_ASYNC, new FakeQosClient()); } - public static CassandraClientPool create(CassandraKeyValueServiceConfig config, boolean initializeAsync) { - CassandraClientPoolImpl cassandraClientPool = create(config, - StartupChecks.RUN, initializeAsync); + public static CassandraClientPool create(CassandraKeyValueServiceConfig config, boolean initializeAsync, + QosClient qosClient) { + CassandraClientPoolImpl cassandraClientPool = create(config, StartupChecks.RUN, initializeAsync, qosClient); return cassandraClientPool.wrapper.isInitialized() ? cassandraClientPool : cassandraClientPool.wrapper; } private static CassandraClientPoolImpl create(CassandraKeyValueServiceConfig config, - StartupChecks startupChecks, boolean initializeAsync) { - // TODO eventually we'll want to pass this in from somewhere - QosService qosResource = new QosServiceResource(ImmutableQosServiceRuntimeConfig.builder().build()); - - ScheduledExecutorService scheduler = new InstrumentedScheduledExecutorService( - Executors.newSingleThreadScheduledExecutor(), - AtlasDbMetrics.getMetricRegistry(), - "qos-client-executor"); - AtlasDbQosClient qosClient = AtlasDbQosClient.create(scheduler, qosResource, config.getKeyspaceOrThrow()); + StartupChecks startupChecks, boolean initializeAsync, QosClient qosClient) { CassandraClientPoolImpl cassandraClientPool = new CassandraClientPoolImpl(config, startupChecks, qosClient); cassandraClientPool.wrapper.initialize(initializeAsync); return cassandraClientPool; } + private CassandraClientPoolImpl(CassandraKeyValueServiceConfig config, StartupChecks startupChecks, - AtlasDbQosClient qosClient) { + QosClient qosClient) { this.config = config; this.startupChecks = startupChecks; this.qosClient = qosClient; diff --git a/atlasdb-cassandra/src/main/java/com/palantir/atlasdb/keyvalue/cassandra/CassandraClientPoolingContainer.java b/atlasdb-cassandra/src/main/java/com/palantir/atlasdb/keyvalue/cassandra/CassandraClientPoolingContainer.java index dd48bebb3cd..e9e6b206a15 100644 --- a/atlasdb-cassandra/src/main/java/com/palantir/atlasdb/keyvalue/cassandra/CassandraClientPoolingContainer.java +++ b/atlasdb-cassandra/src/main/java/com/palantir/atlasdb/keyvalue/cassandra/CassandraClientPoolingContainer.java @@ -40,7 +40,7 @@ import com.google.common.base.Function; import com.google.common.base.MoreObjects; import com.palantir.atlasdb.cassandra.CassandraKeyValueServiceConfig; -import com.palantir.atlasdb.qos.AtlasDbQosClient; +import com.palantir.atlasdb.qos.QosClient; import com.palantir.atlasdb.util.MetricsManager; import com.palantir.common.base.FunctionCheckedException; import com.palantir.common.pooling.PoolingContainer; @@ -50,7 +50,7 @@ public class CassandraClientPoolingContainer implements PoolingContainer { private static final Logger log = LoggerFactory.getLogger(CassandraClientPoolingContainer.class); - private final AtlasDbQosClient qosClient; + private final QosClient qosClient; private final InetSocketAddress host; private final CassandraKeyValueServiceConfig config; private final MetricsManager metricsManager = new MetricsManager(); @@ -58,7 +58,7 @@ public class CassandraClientPoolingContainer implements PoolingContainer private final AtomicInteger openRequests = new AtomicInteger(); private final GenericObjectPool clientPool; - public CassandraClientPoolingContainer(AtlasDbQosClient qosClient, + public CassandraClientPoolingContainer(QosClient qosClient, InetSocketAddress host, CassandraKeyValueServiceConfig config, int poolNumber) { diff --git a/atlasdb-cassandra/src/main/java/com/palantir/atlasdb/keyvalue/cassandra/CassandraExpiringKeyValueService.java b/atlasdb-cassandra/src/main/java/com/palantir/atlasdb/keyvalue/cassandra/CassandraExpiringKeyValueService.java index 49058f401cb..dd2cd4d73a9 100644 --- a/atlasdb-cassandra/src/main/java/com/palantir/atlasdb/keyvalue/cassandra/CassandraExpiringKeyValueService.java +++ b/atlasdb-cassandra/src/main/java/com/palantir/atlasdb/keyvalue/cassandra/CassandraExpiringKeyValueService.java @@ -36,6 +36,7 @@ import com.palantir.atlasdb.config.LeaderConfig; import com.palantir.atlasdb.keyvalue.api.Cell; import com.palantir.atlasdb.keyvalue.api.ExpiringKeyValueService; +import com.palantir.atlasdb.keyvalue.api.ImmutableQosClientBuilder; import com.palantir.atlasdb.keyvalue.api.KeyAlreadyExistsException; import com.palantir.atlasdb.keyvalue.api.TableReference; import com.palantir.atlasdb.keyvalue.api.Value; @@ -68,7 +69,7 @@ private CassandraExpiringKeyValueService( Optional leaderConfig, boolean initializeAsync) { super(LoggerFactory.getLogger(CassandraKeyValueService.class), configManager, compactionManager, leaderConfig, - initializeAsync); + initializeAsync, ImmutableQosClientBuilder.builder().build()); } @Override diff --git a/atlasdb-cassandra/src/main/java/com/palantir/atlasdb/keyvalue/cassandra/CassandraKeyValueServiceImpl.java b/atlasdb-cassandra/src/main/java/com/palantir/atlasdb/keyvalue/cassandra/CassandraKeyValueServiceImpl.java index cfdf0b7b871..964ec371d20 100644 --- a/atlasdb-cassandra/src/main/java/com/palantir/atlasdb/keyvalue/cassandra/CassandraKeyValueServiceImpl.java +++ b/atlasdb-cassandra/src/main/java/com/palantir/atlasdb/keyvalue/cassandra/CassandraKeyValueServiceImpl.java @@ -31,7 +31,9 @@ import java.util.Random; import java.util.Set; import java.util.concurrent.Callable; +import java.util.concurrent.Executors; import java.util.concurrent.Future; +import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.stream.Collectors; @@ -53,6 +55,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.codahale.metrics.InstrumentedScheduledExecutorService; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Function; import com.google.common.base.Functions; @@ -94,8 +97,10 @@ import com.palantir.atlasdb.keyvalue.api.ClusterAvailabilityStatus; import com.palantir.atlasdb.keyvalue.api.ColumnSelection; import com.palantir.atlasdb.keyvalue.api.ImmutableCandidateCellForSweepingRequest; +import com.palantir.atlasdb.keyvalue.api.ImmutableQosClientBuilder; import com.palantir.atlasdb.keyvalue.api.InsufficientConsistencyException; import com.palantir.atlasdb.keyvalue.api.KeyAlreadyExistsException; +import com.palantir.atlasdb.keyvalue.api.QosClientBuilder; import com.palantir.atlasdb.keyvalue.api.RangeRequest; import com.palantir.atlasdb.keyvalue.api.RangeRequests; import com.palantir.atlasdb.keyvalue.api.RowColumnRangeIterator; @@ -121,8 +126,12 @@ import com.palantir.atlasdb.keyvalue.impl.KeyValueServices; import com.palantir.atlasdb.keyvalue.impl.LocalRowColumnRangeIterator; import com.palantir.atlasdb.logging.LoggingArgs; +import com.palantir.atlasdb.qos.AtlasDbQosClient; +import com.palantir.atlasdb.qos.QosClient; +import com.palantir.atlasdb.qos.QosService; import com.palantir.atlasdb.util.AnnotatedCallable; import com.palantir.atlasdb.util.AnnotationType; +import com.palantir.atlasdb.util.AtlasDbMetrics; import com.palantir.common.annotation.Idempotent; import com.palantir.common.base.ClosableIterator; import com.palantir.common.base.ClosableIterators; @@ -132,6 +141,8 @@ import com.palantir.logsafe.SafeArg; import com.palantir.logsafe.UnsafeArg; import com.palantir.processors.AutoDelegate; +import com.palantir.remoting3.clients.ClientConfigurations; +import com.palantir.remoting3.jaxrs.JaxRsClient; import com.palantir.util.paging.AbstractPagingIterable; import com.palantir.util.paging.SimpleTokenBackedResultsPage; import com.palantir.util.paging.TokenBackedBasicResultsPage; @@ -222,8 +233,16 @@ public static CassandraKeyValueService create( CassandraKeyValueServiceConfigManager configManager, Optional leaderConfig, boolean initializeAsync) { + return create(configManager, leaderConfig, initializeAsync, ImmutableQosClientBuilder.builder().build()); + } + + public static CassandraKeyValueService create( + CassandraKeyValueServiceConfigManager configManager, + Optional leaderConfig, + boolean initializeAsync, + QosClientBuilder qosClientBuilder) { return create(configManager, leaderConfig, LoggerFactory.getLogger(CassandraKeyValueService.class), - initializeAsync); + initializeAsync, qosClientBuilder); } @VisibleForTesting @@ -231,18 +250,21 @@ static CassandraKeyValueService create( CassandraKeyValueServiceConfigManager configManager, Optional leaderConfig, Logger log) { - return create(configManager, leaderConfig, log, AtlasDbConstants.DEFAULT_INITIALIZE_ASYNC); + return create(configManager, leaderConfig, log, AtlasDbConstants.DEFAULT_INITIALIZE_ASYNC, + ImmutableQosClientBuilder.builder().build()); } private static CassandraKeyValueService create( CassandraKeyValueServiceConfigManager configManager, Optional leaderConfig, Logger log, - boolean initializeAsync) { + boolean initializeAsync, + QosClientBuilder qosClientBuilder) { Optional compactionManager = CassandraJmxCompaction.createJmxCompactionManager(configManager); CassandraKeyValueServiceImpl keyValueService = - new CassandraKeyValueServiceImpl(log, configManager, compactionManager, leaderConfig, initializeAsync); + new CassandraKeyValueServiceImpl(log, configManager, compactionManager, leaderConfig, initializeAsync, + qosClientBuilder); keyValueService.wrapper.initialize(initializeAsync); return keyValueService.wrapper.isInitialized() ? keyValueService : keyValueService.wrapper; } @@ -251,12 +273,14 @@ protected CassandraKeyValueServiceImpl(Logger log, CassandraKeyValueServiceConfigManager configManager, Optional compactionManager, Optional leaderConfig, - boolean initializeAsync) { + boolean initializeAsync, + QosClientBuilder qosClientBuilder) { super(AbstractKeyValueService.createFixedThreadPool("Atlas Cassandra KVS", configManager.getConfig().poolSize() * configManager.getConfig().servers().size())); this.log = log; this.configManager = configManager; - this.clientPool = CassandraClientPoolImpl.create(configManager.getConfig(), initializeAsync); + QosClient qosClient = getQosClient(qosClientBuilder); + this.clientPool = CassandraClientPoolImpl.create(configManager.getConfig(), initializeAsync, qosClient); this.compactionManager = compactionManager; this.leaderConfig = leaderConfig; this.hiddenTables = new HiddenTables(); @@ -268,6 +292,27 @@ protected CassandraKeyValueServiceImpl(Logger log, this.cassandraTables = new CassandraTables(clientPool, configManager); } + private QosClient getQosClient(QosClientBuilder qosClientBuilder) { + if (qosClientBuilder.qosServiceConfiguration().isPresent()) { + return createAtlasDbQosClient(qosClientBuilder); + } else { + return new FakeQosClient(); + } + } + + private QosClient createAtlasDbQosClient(QosClientBuilder qosClientBuilder) { + Preconditions.checkState(qosClientBuilder.qosServiceConfiguration().isPresent(), + "Qos Service Config is required to create a AtlasDBQosClient."); + QosService qosService = JaxRsClient.create(QosService.class, + qosClientBuilder.qosUserAgent(), + ClientConfigurations.of(qosClientBuilder.qosServiceConfiguration().get())); + ScheduledExecutorService scheduler = new InstrumentedScheduledExecutorService( + Executors.newSingleThreadScheduledExecutor(), + AtlasDbMetrics.getMetricRegistry(), + "qos-client-executor"); + return new AtlasDbQosClient(qosService, scheduler, configManager.getConfig().getKeyspaceOrThrow()); + } + @Override public boolean isInitialized() { return wrapper.isInitialized(); diff --git a/atlasdb-cassandra/src/main/java/com/palantir/atlasdb/keyvalue/cassandra/FakeQosClient.java b/atlasdb-cassandra/src/main/java/com/palantir/atlasdb/keyvalue/cassandra/FakeQosClient.java new file mode 100644 index 00000000000..e38c75e9543 --- /dev/null +++ b/atlasdb-cassandra/src/main/java/com/palantir/atlasdb/keyvalue/cassandra/FakeQosClient.java @@ -0,0 +1,26 @@ +/* + * Copyright 2017 Palantir Technologies, Inc. All rights reserved. + * + * Licensed under the BSD-3 License (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://opensource.org/licenses/BSD-3-Clause + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.palantir.atlasdb.keyvalue.cassandra; + +import com.palantir.atlasdb.qos.QosClient; + +public class FakeQosClient implements QosClient { + @Override + public void checkLimit() { + // no op + } +} diff --git a/atlasdb-client/src/main/java/com/palantir/atlasdb/qos/AtlasDbQosClient.java b/atlasdb-client/src/main/java/com/palantir/atlasdb/qos/AtlasDbQosClient.java index 741844a3f5d..7ce05041a93 100644 --- a/atlasdb-client/src/main/java/com/palantir/atlasdb/qos/AtlasDbQosClient.java +++ b/atlasdb-client/src/main/java/com/palantir/atlasdb/qos/AtlasDbQosClient.java @@ -20,33 +20,24 @@ import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; -@SuppressWarnings("FinalClass") -public class AtlasDbQosClient { - QosService qosService; +public class AtlasDbQosClient implements QosClient { + private volatile long credits; - volatile long credits; - - private AtlasDbQosClient(QosService qosService) { - this.qosService = qosService; - } - - public static AtlasDbQosClient create(ScheduledExecutorService scheduler, QosService qosService, + public AtlasDbQosClient(QosService qosService, + ScheduledExecutorService limitRefresher, String clientName) { - AtlasDbQosClient client = new AtlasDbQosClient(qosService); - - scheduler.scheduleAtFixedRate(() -> { + limitRefresher.scheduleAtFixedRate(() -> { try { - client.credits = qosService.getLimit(clientName); + credits = qosService.getLimit(clientName); } catch (Exception e) { // do nothing } - }, 0L, 1L, TimeUnit.SECONDS); - - return client; + }, 0L, 60L, TimeUnit.SECONDS); } // The KVS layer should call this before every read/write operation // Currently all operations are treated equally; each uses up a unit of credits + @Override public void checkLimit() { // always return immediately - i.e. no backoff // TODO if soft-limited, pause diff --git a/atlasdb-client/src/main/java/com/palantir/atlasdb/qos/QosClient.java b/atlasdb-client/src/main/java/com/palantir/atlasdb/qos/QosClient.java new file mode 100644 index 00000000000..cfb82619a6d --- /dev/null +++ b/atlasdb-client/src/main/java/com/palantir/atlasdb/qos/QosClient.java @@ -0,0 +1,21 @@ +/* + * Copyright 2017 Palantir Technologies, Inc. All rights reserved. + * + * Licensed under the BSD-3 License (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://opensource.org/licenses/BSD-3-Clause + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.palantir.atlasdb.qos; + +public interface QosClient { + void checkLimit(); +} diff --git a/atlasdb-client/src/test/java/com/palantir/atlasdb/qos/AtlasDbQosClientTest.java b/atlasdb-client/src/test/java/com/palantir/atlasdb/qos/AtlasDbQosClientTest.java index f9eacfd3c96..b224c25bcfe 100644 --- a/atlasdb-client/src/test/java/com/palantir/atlasdb/qos/AtlasDbQosClientTest.java +++ b/atlasdb-client/src/test/java/com/palantir/atlasdb/qos/AtlasDbQosClientTest.java @@ -37,14 +37,14 @@ public void setUp() { @Test public void doesNotBackOff() { - AtlasDbQosClient qosClient = AtlasDbQosClient.create(scheduler, qosService, "test-client"); + AtlasDbQosClient qosClient = new AtlasDbQosClient(qosService, scheduler, "test-client"); scheduler.tick(1L, TimeUnit.MILLISECONDS); qosClient.checkLimit(); } @Test public void throwsAfterLimitExceeded() { - AtlasDbQosClient qosClient = AtlasDbQosClient.create(scheduler, qosService, "test-client"); + AtlasDbQosClient qosClient = new AtlasDbQosClient(qosService, scheduler, "test-client"); scheduler.tick(1L, TimeUnit.MILLISECONDS); qosClient.checkLimit(); @@ -53,15 +53,15 @@ public void throwsAfterLimitExceeded() { @Test public void canCheckAgainAfterRefreshPeriod() { - AtlasDbQosClient qosClient = AtlasDbQosClient.create(scheduler, qosService, "test-client"); + AtlasDbQosClient qosClient = new AtlasDbQosClient(qosService, scheduler, "test-client"); scheduler.tick(1L, TimeUnit.MILLISECONDS); qosClient.checkLimit(); - assertThatThrownBy(qosClient::checkLimit).isInstanceOf(RuntimeException.class); + assertThatThrownBy(qosClient::checkLimit) + .isInstanceOf(RuntimeException.class).hasMessage("Rate limit exceeded"); - scheduler.tick(1L, TimeUnit.SECONDS); + scheduler.tick(60L, TimeUnit.SECONDS); qosClient.checkLimit(); } - } diff --git a/atlasdb-config/src/main/java/com/palantir/atlasdb/config/AtlasDbConfig.java b/atlasdb-config/src/main/java/com/palantir/atlasdb/config/AtlasDbConfig.java index 6161854ad4c..93501b1be7a 100644 --- a/atlasdb-config/src/main/java/com/palantir/atlasdb/config/AtlasDbConfig.java +++ b/atlasdb-config/src/main/java/com/palantir/atlasdb/config/AtlasDbConfig.java @@ -29,6 +29,7 @@ import com.palantir.atlasdb.memory.InMemoryAtlasDbConfig; import com.palantir.atlasdb.spi.KeyValueServiceConfig; import com.palantir.exception.NotInitializedException; +import com.palantir.remoting.api.config.service.ServiceConfiguration; @JsonDeserialize(as = ImmutableAtlasDbConfig.class) @JsonSerialize(as = ImmutableAtlasDbConfig.class) @@ -256,6 +257,8 @@ public int getDefaultLockTimeoutSeconds() { return AtlasDbConstants.DEFAULT_LOCK_TIMEOUT_SECONDS; } + public abstract Optional getQosServiceConfiguration(); + @Value.Check protected final void check() { checkLeaderAndTimelockBlocks(); diff --git a/atlasdb-config/src/main/java/com/palantir/atlasdb/factory/ServiceDiscoveringAtlasSupplier.java b/atlasdb-config/src/main/java/com/palantir/atlasdb/factory/ServiceDiscoveringAtlasSupplier.java index 861b877dd3a..4a973e8f09c 100644 --- a/atlasdb-config/src/main/java/com/palantir/atlasdb/factory/ServiceDiscoveringAtlasSupplier.java +++ b/atlasdb-config/src/main/java/com/palantir/atlasdb/factory/ServiceDiscoveringAtlasSupplier.java @@ -35,7 +35,9 @@ import com.google.common.base.Suppliers; import com.palantir.atlasdb.AtlasDbConstants; import com.palantir.atlasdb.config.LeaderConfig; +import com.palantir.atlasdb.keyvalue.api.ImmutableQosClientBuilder; import com.palantir.atlasdb.keyvalue.api.KeyValueService; +import com.palantir.atlasdb.keyvalue.api.QosClientBuilder; import com.palantir.atlasdb.keyvalue.api.TableReference; import com.palantir.atlasdb.spi.AtlasDbFactory; import com.palantir.atlasdb.spi.KeyValueServiceConfig; @@ -54,9 +56,11 @@ public class ServiceDiscoveringAtlasSupplier { private final Supplier keyValueService; private final Supplier timestampService; private final Supplier timestampStoreInvalidator; + private final QosClientBuilder qosClientBuilder; public ServiceDiscoveringAtlasSupplier(KeyValueServiceConfig config, Optional leaderConfig) { - this(config, leaderConfig, Optional.empty(), AtlasDbConstants.DEFAULT_INITIALIZE_ASYNC); + this(config, leaderConfig, Optional.empty(), AtlasDbConstants.DEFAULT_INITIALIZE_ASYNC, + ImmutableQosClientBuilder.builder().build()); } public ServiceDiscoveringAtlasSupplier( @@ -64,15 +68,17 @@ public ServiceDiscoveringAtlasSupplier( Optional leaderConfig, Optional namespace, Optional timestampTable) { - this(config, leaderConfig, namespace, timestampTable, AtlasDbConstants.DEFAULT_INITIALIZE_ASYNC); + this(config, leaderConfig, namespace, timestampTable, AtlasDbConstants.DEFAULT_INITIALIZE_ASYNC, + ImmutableQosClientBuilder.builder().build()); } public ServiceDiscoveringAtlasSupplier( KeyValueServiceConfig config, Optional leaderConfig, Optional namespace, - boolean initializeAsync) { - this(config, leaderConfig, namespace, Optional.empty(), initializeAsync); + boolean initializeAsync, + QosClientBuilder qosClientBuilder) { + this(config, leaderConfig, namespace, Optional.empty(), initializeAsync, qosClientBuilder); } public ServiceDiscoveringAtlasSupplier( @@ -80,9 +86,11 @@ public ServiceDiscoveringAtlasSupplier( Optional leaderConfig, Optional namespace, Optional timestampTable, - boolean initializeAsync) { + boolean initializeAsync, + QosClientBuilder qosClientBuilderPassed) { this.config = config; this.leaderConfig = leaderConfig; + this.qosClientBuilder = qosClientBuilderPassed; AtlasDbFactory atlasFactory = StreamSupport.stream(loader.spliterator(), false) .filter(producesCorrectType()) @@ -92,7 +100,12 @@ public ServiceDiscoveringAtlasSupplier( + " Have you annotated it with @AutoService(AtlasDbFactory.class)?" )); keyValueService = Suppliers.memoize( - () -> atlasFactory.createRawKeyValueService(config, leaderConfig, namespace, initializeAsync)); + () -> atlasFactory.createRawKeyValueService( + config, + leaderConfig, + namespace, + initializeAsync, + qosClientBuilder)); timestampService = () -> atlasFactory.createTimestampService(getKeyValueService(), timestampTable, initializeAsync); timestampStoreInvalidator = () -> atlasFactory.createTimestampStoreInvalidator(getKeyValueService()); diff --git a/atlasdb-config/src/main/java/com/palantir/atlasdb/factory/TransactionManagers.java b/atlasdb-config/src/main/java/com/palantir/atlasdb/factory/TransactionManagers.java index c51db40aed2..f0019127dd6 100644 --- a/atlasdb-config/src/main/java/com/palantir/atlasdb/factory/TransactionManagers.java +++ b/atlasdb-config/src/main/java/com/palantir/atlasdb/factory/TransactionManagers.java @@ -57,6 +57,7 @@ import com.palantir.atlasdb.factory.timestamp.DecoratedTimelockServices; import com.palantir.atlasdb.http.AtlasDbFeignTargetFactory; import com.palantir.atlasdb.http.UserAgents; +import com.palantir.atlasdb.keyvalue.api.ImmutableQosClientBuilder; import com.palantir.atlasdb.keyvalue.api.KeyValueService; import com.palantir.atlasdb.keyvalue.impl.ProfilingKeyValueService; import com.palantir.atlasdb.keyvalue.impl.SweepStatsKeyValueService; @@ -304,9 +305,18 @@ SerializableTransactionManager serializable() { java.util.function.Supplier runtimeConfigSupplier = () -> runtimeConfigSupplier().get().orElse(defaultRuntime); + ImmutableQosClientBuilder qosServiceClientBuilder = ImmutableQosClientBuilder.builder() + .qosServiceConfiguration(config.getQosServiceConfiguration()) + .qosUserAgent(userAgent()) + .build(); + ServiceDiscoveringAtlasSupplier atlasFactory = - new ServiceDiscoveringAtlasSupplier(config.keyValueService(), config.leader(), config.namespace(), - config.initializeAsync()); + new ServiceDiscoveringAtlasSupplier( + config.keyValueService(), + config.leader(), + config.namespace(), + config.initializeAsync(), + qosServiceClientBuilder); KeyValueService rawKvs = atlasFactory.getKeyValueService(); LockRequest.setDefaultLockTimeout( diff --git a/atlasdb-config/src/test/java/com/palantir/atlasdb/factory/AutoServiceAnnotatedAtlasDbFactory.java b/atlasdb-config/src/test/java/com/palantir/atlasdb/factory/AutoServiceAnnotatedAtlasDbFactory.java index c7764bfa828..6d7dd9845ba 100644 --- a/atlasdb-config/src/test/java/com/palantir/atlasdb/factory/AutoServiceAnnotatedAtlasDbFactory.java +++ b/atlasdb-config/src/test/java/com/palantir/atlasdb/factory/AutoServiceAnnotatedAtlasDbFactory.java @@ -27,6 +27,7 @@ import com.google.common.collect.Lists; import com.palantir.atlasdb.config.LeaderConfig; import com.palantir.atlasdb.keyvalue.api.KeyValueService; +import com.palantir.atlasdb.keyvalue.api.QosClientBuilder; import com.palantir.atlasdb.keyvalue.api.TableReference; import com.palantir.atlasdb.spi.AtlasDbFactory; import com.palantir.atlasdb.spi.KeyValueServiceConfig; @@ -51,7 +52,8 @@ public KeyValueService createRawKeyValueService( KeyValueServiceConfig config, Optional leaderConfig, Optional unused, - boolean initializeAsync) { + boolean initializeAsync, + QosClientBuilder unusedQosClientBuilder) { if (initializeAsync) { log.warn("Asynchronous initialization not implemented, will initialize synchronousy."); } diff --git a/atlasdb-dagger/src/main/java/com/palantir/atlasdb/services/ServicesConfig.java b/atlasdb-dagger/src/main/java/com/palantir/atlasdb/services/ServicesConfig.java index 1f0a3e3a770..055e665ba72 100644 --- a/atlasdb-dagger/src/main/java/com/palantir/atlasdb/services/ServicesConfig.java +++ b/atlasdb-dagger/src/main/java/com/palantir/atlasdb/services/ServicesConfig.java @@ -26,6 +26,7 @@ import com.palantir.atlasdb.config.AtlasDbConfig; import com.palantir.atlasdb.config.AtlasDbRuntimeConfig; import com.palantir.atlasdb.factory.ServiceDiscoveringAtlasSupplier; +import com.palantir.atlasdb.keyvalue.api.ImmutableQosClientBuilder; import com.palantir.atlasdb.table.description.Schema; @Value.Immutable @@ -41,7 +42,10 @@ public ServiceDiscoveringAtlasSupplier atlasDbSupplier() { atlasDbConfig().keyValueService(), atlasDbConfig().leader(), atlasDbConfig().namespace(), - atlasDbConfig().initializeAsync()); + atlasDbConfig().initializeAsync(), + ImmutableQosClientBuilder.builder() + .qosServiceConfiguration(atlasDbConfig().getQosServiceConfiguration()) + .build()); } @Value.Default diff --git a/atlasdb-dbkvs/src/main/java/com/palantir/atlasdb/keyvalue/dbkvs/DbAtlasDbFactory.java b/atlasdb-dbkvs/src/main/java/com/palantir/atlasdb/keyvalue/dbkvs/DbAtlasDbFactory.java index 50f030ff499..4387f413d77 100644 --- a/atlasdb-dbkvs/src/main/java/com/palantir/atlasdb/keyvalue/dbkvs/DbAtlasDbFactory.java +++ b/atlasdb-dbkvs/src/main/java/com/palantir/atlasdb/keyvalue/dbkvs/DbAtlasDbFactory.java @@ -25,6 +25,7 @@ import com.palantir.atlasdb.AtlasDbConstants; import com.palantir.atlasdb.config.LeaderConfig; import com.palantir.atlasdb.keyvalue.api.KeyValueService; +import com.palantir.atlasdb.keyvalue.api.QosClientBuilder; import com.palantir.atlasdb.keyvalue.api.TableReference; import com.palantir.atlasdb.keyvalue.dbkvs.impl.ConnectionManagerAwareDbKvs; import com.palantir.atlasdb.keyvalue.dbkvs.timestamp.InDbTimestampBoundStore; @@ -57,7 +58,8 @@ public KeyValueService createRawKeyValueService( KeyValueServiceConfig config, Optional leaderConfig, Optional namespace, - boolean initializeAsync) { + boolean initializeAsync, + QosClientBuilder unusedQosClientBuilder) { if (initializeAsync) { log.warn("Asynchronous initialization not implemented, will initialize synchronousy."); } diff --git a/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/memory/InMemoryAtlasDbFactory.java b/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/memory/InMemoryAtlasDbFactory.java index ef197e1472d..6198ba3d285 100644 --- a/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/memory/InMemoryAtlasDbFactory.java +++ b/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/memory/InMemoryAtlasDbFactory.java @@ -33,6 +33,7 @@ import com.palantir.atlasdb.cleaner.api.OnCleanupTask; import com.palantir.atlasdb.config.LeaderConfig; import com.palantir.atlasdb.keyvalue.api.KeyValueService; +import com.palantir.atlasdb.keyvalue.api.QosClientBuilder; import com.palantir.atlasdb.keyvalue.api.TableReference; import com.palantir.atlasdb.keyvalue.impl.InMemoryKeyValueService; import com.palantir.atlasdb.schema.AtlasSchema; @@ -107,7 +108,8 @@ public InMemoryKeyValueService createRawKeyValueService( KeyValueServiceConfig config, Optional leaderConfig, Optional unused, - boolean initializeAsync) { + boolean initializeAsync, + QosClientBuilder unusedQosClientBuilder) { if (initializeAsync) { log.warn("Asynchronous initialization not implemented, will initialize synchronousy."); } diff --git a/atlasdb-jdbc/src/main/java/com/palantir/atlasdb/keyvalue/jdbc/JdbcAtlasDbFactory.java b/atlasdb-jdbc/src/main/java/com/palantir/atlasdb/keyvalue/jdbc/JdbcAtlasDbFactory.java index 3048625f745..54cf4a990fd 100644 --- a/atlasdb-jdbc/src/main/java/com/palantir/atlasdb/keyvalue/jdbc/JdbcAtlasDbFactory.java +++ b/atlasdb-jdbc/src/main/java/com/palantir/atlasdb/keyvalue/jdbc/JdbcAtlasDbFactory.java @@ -25,6 +25,7 @@ import com.palantir.atlasdb.AtlasDbConstants; import com.palantir.atlasdb.config.LeaderConfig; import com.palantir.atlasdb.keyvalue.api.KeyValueService; +import com.palantir.atlasdb.keyvalue.api.QosClientBuilder; import com.palantir.atlasdb.keyvalue.api.TableReference; import com.palantir.atlasdb.spi.AtlasDbFactory; import com.palantir.atlasdb.spi.KeyValueServiceConfig; @@ -55,7 +56,8 @@ public KeyValueService createRawKeyValueService( KeyValueServiceConfig config, Optional leaderConfig, Optional unused, - boolean initializeAsync) { + boolean initializeAsync, + QosClientBuilder unusedQosClientBuilder) { if (initializeAsync) { log.warn("Asynchronous initialization not implemented, will initialize synchronousy."); } diff --git a/qos-service-api/build.gradle b/qos-service-api/build.gradle index ec406d446dd..0a994ae64ba 100644 --- a/qos-service-api/build.gradle +++ b/qos-service-api/build.gradle @@ -13,6 +13,7 @@ dependencies { compile group: 'javax.ws.rs', name: 'javax.ws.rs-api' compile group: 'com.palantir.safe-logging', name: 'safe-logging' compile group: 'com.fasterxml.jackson.core', name: 'jackson-databind' + compile group: 'com.palantir.remoting3', name: 'jaxrs-clients' processor group: 'org.immutables', name: 'value' diff --git a/timelock-server/src/main/java/com/palantir/atlasdb/timelock/TimeLockServerLauncher.java b/timelock-server/src/main/java/com/palantir/atlasdb/timelock/TimeLockServerLauncher.java index 764d75fbc2b..84441080983 100644 --- a/timelock-server/src/main/java/com/palantir/atlasdb/timelock/TimeLockServerLauncher.java +++ b/timelock-server/src/main/java/com/palantir/atlasdb/timelock/TimeLockServerLauncher.java @@ -19,7 +19,6 @@ import com.codahale.metrics.MetricRegistry; import com.fasterxml.jackson.datatype.jdk8.Jdk8Module; -import com.palantir.atlasdb.qos.QosServiceResource; import com.palantir.atlasdb.timelock.config.CombinedTimeLockServerConfiguration; import com.palantir.atlasdb.timelock.config.TimeLockConfigMigrator; import com.palantir.atlasdb.timelock.config.TimeLockServerConfiguration;