Skip to content
This repository has been archived by the owner on Nov 14, 2024. It is now read-only.

Commit

Permalink
Create one qosCLient for each service
Browse files Browse the repository at this point in the history
QosClientBuilder hooked up to KVS create

Create the QosClient in CassandraClientPoolImpl if the config is specified.

Create FakeQosClient if the config is not specified

Cleanup

get broken tests to pass
  • Loading branch information
hsaraogi committed Nov 8, 2017
1 parent ffa5e5a commit 297ffd1
Show file tree
Hide file tree
Showing 24 changed files with 232 additions and 82 deletions.
1 change: 1 addition & 0 deletions atlasdb-api/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ dependencies {
compile group: 'javax.validation', name: 'validation-api'

compile group: 'com.palantir.remoting-api', name: 'ssl-config'
compile group: 'com.palantir.remoting3', name: 'jaxrs-clients'

processor group: 'org.immutables', name: 'value'

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/*
* Copyright 2017 Palantir Technologies, Inc. All rights reserved.
*
* Licensed under the BSD-3 License (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://opensource.org/licenses/BSD-3-Clause
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.palantir.atlasdb.keyvalue.api;

import java.util.Optional;

import org.immutables.value.Value;

import com.palantir.remoting.api.config.service.ServiceConfiguration;

@Value.Immutable
public abstract class QosClientBuilder {
public abstract Optional<ServiceConfiguration> qosServiceConfiguration();

@Value.Default
public String qosUserAgent() {
return "unknown-qos-agent";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,9 @@
import org.slf4j.LoggerFactory;

import com.palantir.atlasdb.config.LeaderConfig;
import com.palantir.atlasdb.keyvalue.api.ImmutableQosClientBuilder;
import com.palantir.atlasdb.keyvalue.api.KeyValueService;
import com.palantir.atlasdb.keyvalue.api.QosClientBuilder;
import com.palantir.atlasdb.keyvalue.api.TableReference;
import com.palantir.timestamp.TimestampService;
import com.palantir.timestamp.TimestampStoreInvalidator;
Expand All @@ -34,7 +36,8 @@ public interface AtlasDbFactory {

default KeyValueService createRawKeyValueService(
KeyValueServiceConfig config, Optional<LeaderConfig> leaderConfig) {
return createRawKeyValueService(config, leaderConfig, Optional.empty(), DEFAULT_INITIALIZE_ASYNC);
return createRawKeyValueService(config, leaderConfig, Optional.empty(), DEFAULT_INITIALIZE_ASYNC,
ImmutableQosClientBuilder.builder().build());
}

/**
Expand All @@ -52,7 +55,8 @@ KeyValueService createRawKeyValueService(
KeyValueServiceConfig config,
Optional<LeaderConfig> leaderConfig,
Optional<String> namespace,
boolean initializeAsync);
boolean initializeAsync,
QosClientBuilder qosClientBuilder);

default TimestampService createTimestampService(KeyValueService rawKvs) {
return createTimestampService(rawKvs, Optional.empty(), DEFAULT_INITIALIZE_ASYNC);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import com.palantir.atlasdb.AtlasDbConstants;
import com.palantir.atlasdb.config.LeaderConfig;
import com.palantir.atlasdb.keyvalue.api.KeyValueService;
import com.palantir.atlasdb.keyvalue.api.QosClientBuilder;
import com.palantir.atlasdb.keyvalue.api.TableReference;
import com.palantir.atlasdb.keyvalue.cassandra.CassandraKeyValueService;
import com.palantir.atlasdb.keyvalue.cassandra.CassandraKeyValueServiceImpl;
Expand All @@ -43,13 +44,15 @@ public KeyValueService createRawKeyValueService(
KeyValueServiceConfig config,
Optional<LeaderConfig> leaderConfig,
Optional<String> namespace,
boolean initializeAsync) {
boolean initializeAsync,
QosClientBuilder qosClientBuilder) {
AtlasDbVersion.ensureVersionReported();
CassandraKeyValueServiceConfig preprocessedConfig = preprocessKvsConfig(config, namespace);
return CassandraKeyValueServiceImpl.create(
CassandraKeyValueServiceConfigManager.createSimpleManager(preprocessedConfig),
leaderConfig,
initializeAsync);
initializeAsync,
qosClientBuilder);
}

@VisibleForTesting
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@
import org.apache.cassandra.thrift.UnavailableException;
import org.apache.thrift.TException;

import com.palantir.atlasdb.qos.AtlasDbQosClient;
import com.palantir.atlasdb.qos.QosClient;
import com.palantir.processors.AutoDelegate;

/**
Expand All @@ -47,9 +47,9 @@
@SuppressWarnings({"checkstyle:all", "DuplicateThrows"}) // :'(
public class CassandraClient extends AutoDelegate_Client {
private final Cassandra.Client delegate;
private final AtlasDbQosClient qosClient;
private final QosClient qosClient;

public CassandraClient(Cassandra.Client delegate, AtlasDbQosClient qosClient) {
public CassandraClient(Cassandra.Client delegate, QosClient qosClient) {
super(delegate.getInputProtocol());
this.delegate = delegate;
this.qosClient = qosClient;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@
import com.google.common.collect.Maps;
import com.palantir.atlasdb.cassandra.CassandraCredentialsConfig;
import com.palantir.atlasdb.cassandra.CassandraKeyValueServiceConfig;
import com.palantir.atlasdb.qos.AtlasDbQosClient;
import com.palantir.atlasdb.qos.QosClient;
import com.palantir.common.exception.AtlasDbDependencyException;
import com.palantir.logsafe.SafeArg;
import com.palantir.logsafe.UnsafeArg;
Expand All @@ -67,11 +67,11 @@ public SSLSocketFactory load(InetSocketAddress host) throws Exception {
}
});

private final AtlasDbQosClient qosClient;
private final QosClient qosClient;
private final InetSocketAddress addr;
private final CassandraKeyValueServiceConfig config;

public CassandraClientFactory(AtlasDbQosClient qosClient,
public CassandraClientFactory(QosClient qosClient,
InetSocketAddress addr,
CassandraKeyValueServiceConfig config) {
this.qosClient = qosClient;
Expand All @@ -92,7 +92,7 @@ public Client create() throws Exception {
}
}

private static Cassandra.Client getClient(AtlasDbQosClient qosClient,
private static Cassandra.Client getClient(QosClient qosClient,
InetSocketAddress addr,
CassandraKeyValueServiceConfig config) throws Exception {
Client ret = getWrappedClient(qosClient, addr, config);
Expand All @@ -111,7 +111,7 @@ private static Cassandra.Client getClient(AtlasDbQosClient qosClient,
}
}

private static Client getWrappedClient(AtlasDbQosClient qosClient,
private static Client getWrappedClient(QosClient qosClient,
InetSocketAddress addr,
CassandraKeyValueServiceConfig config)
throws TException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@
import java.util.Optional;
import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ThreadLocalRandom;
Expand All @@ -51,7 +50,6 @@
import org.slf4j.LoggerFactory;
import org.slf4j.helpers.MessageFormatter;

import com.codahale.metrics.InstrumentedScheduledExecutorService;
import com.codahale.metrics.Meter;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
Expand All @@ -74,11 +72,7 @@
import com.palantir.atlasdb.keyvalue.api.InsufficientConsistencyException;
import com.palantir.atlasdb.keyvalue.api.TableReference;
import com.palantir.atlasdb.keyvalue.cassandra.CassandraClientFactory.ClientCreationFailedException;
import com.palantir.atlasdb.qos.AtlasDbQosClient;
import com.palantir.atlasdb.qos.ImmutableQosServiceRuntimeConfig;
import com.palantir.atlasdb.qos.QosService;
import com.palantir.atlasdb.qos.QosServiceResource;
import com.palantir.atlasdb.util.AtlasDbMetrics;
import com.palantir.atlasdb.qos.QosClient;
import com.palantir.atlasdb.util.MetricsManager;
import com.palantir.common.base.FunctionCheckedException;
import com.palantir.common.base.Throwables;
Expand Down Expand Up @@ -157,7 +151,7 @@ public void shutdown() {
private final CassandraKeyValueServiceConfig config;
private final Map<InetSocketAddress, CassandraClientPoolingContainer> currentPools = Maps.newConcurrentMap();
private final StartupChecks startupChecks;
private final AtlasDbQosClient qosClient;
private final QosClient qosClient;
private final ScheduledExecutorService refreshDaemon;
private final MetricsManager metricsManager = new MetricsManager();
private final RequestMetrics aggregateMetrics = new RequestMetrics(null);
Expand All @@ -170,36 +164,29 @@ public void shutdown() {
@VisibleForTesting
static CassandraClientPoolImpl createImplForTest(CassandraKeyValueServiceConfig config,
StartupChecks startupChecks) {
return create(config, startupChecks, AtlasDbConstants.DEFAULT_INITIALIZE_ASYNC);
return create(config, startupChecks, AtlasDbConstants.DEFAULT_INITIALIZE_ASYNC, new FakeQosClient());
}

public static CassandraClientPool create(CassandraKeyValueServiceConfig config) {
return create(config, AtlasDbConstants.DEFAULT_INITIALIZE_ASYNC);
return create(config, AtlasDbConstants.DEFAULT_INITIALIZE_ASYNC, new FakeQosClient());
}

public static CassandraClientPool create(CassandraKeyValueServiceConfig config, boolean initializeAsync) {
CassandraClientPoolImpl cassandraClientPool = create(config,
StartupChecks.RUN, initializeAsync);
public static CassandraClientPool create(CassandraKeyValueServiceConfig config, boolean initializeAsync,
QosClient qosClient) {
CassandraClientPoolImpl cassandraClientPool = create(config, StartupChecks.RUN, initializeAsync, qosClient);
return cassandraClientPool.wrapper.isInitialized() ? cassandraClientPool : cassandraClientPool.wrapper;
}

private static CassandraClientPoolImpl create(CassandraKeyValueServiceConfig config,
StartupChecks startupChecks, boolean initializeAsync) {
// TODO eventually we'll want to pass this in from somewhere
QosService qosResource = new QosServiceResource(ImmutableQosServiceRuntimeConfig.builder().build());

ScheduledExecutorService scheduler = new InstrumentedScheduledExecutorService(
Executors.newSingleThreadScheduledExecutor(),
AtlasDbMetrics.getMetricRegistry(),
"qos-client-executor");
AtlasDbQosClient qosClient = AtlasDbQosClient.create(scheduler, qosResource, config.getKeyspaceOrThrow());
StartupChecks startupChecks, boolean initializeAsync, QosClient qosClient) {
CassandraClientPoolImpl cassandraClientPool = new CassandraClientPoolImpl(config, startupChecks, qosClient);
cassandraClientPool.wrapper.initialize(initializeAsync);
return cassandraClientPool;
}


private CassandraClientPoolImpl(CassandraKeyValueServiceConfig config, StartupChecks startupChecks,
AtlasDbQosClient qosClient) {
QosClient qosClient) {
this.config = config;
this.startupChecks = startupChecks;
this.qosClient = qosClient;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@
import com.google.common.base.Function;
import com.google.common.base.MoreObjects;
import com.palantir.atlasdb.cassandra.CassandraKeyValueServiceConfig;
import com.palantir.atlasdb.qos.AtlasDbQosClient;
import com.palantir.atlasdb.qos.QosClient;
import com.palantir.atlasdb.util.MetricsManager;
import com.palantir.common.base.FunctionCheckedException;
import com.palantir.common.pooling.PoolingContainer;
Expand All @@ -50,15 +50,15 @@
public class CassandraClientPoolingContainer implements PoolingContainer<Client> {
private static final Logger log = LoggerFactory.getLogger(CassandraClientPoolingContainer.class);

private final AtlasDbQosClient qosClient;
private final QosClient qosClient;
private final InetSocketAddress host;
private final CassandraKeyValueServiceConfig config;
private final MetricsManager metricsManager = new MetricsManager();
private final AtomicLong count = new AtomicLong();
private final AtomicInteger openRequests = new AtomicInteger();
private final GenericObjectPool<Client> clientPool;

public CassandraClientPoolingContainer(AtlasDbQosClient qosClient,
public CassandraClientPoolingContainer(QosClient qosClient,
InetSocketAddress host,
CassandraKeyValueServiceConfig config,
int poolNumber) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import com.palantir.atlasdb.config.LeaderConfig;
import com.palantir.atlasdb.keyvalue.api.Cell;
import com.palantir.atlasdb.keyvalue.api.ExpiringKeyValueService;
import com.palantir.atlasdb.keyvalue.api.ImmutableQosClientBuilder;
import com.palantir.atlasdb.keyvalue.api.KeyAlreadyExistsException;
import com.palantir.atlasdb.keyvalue.api.TableReference;
import com.palantir.atlasdb.keyvalue.api.Value;
Expand Down Expand Up @@ -68,7 +69,7 @@ private CassandraExpiringKeyValueService(
Optional<LeaderConfig> leaderConfig,
boolean initializeAsync) {
super(LoggerFactory.getLogger(CassandraKeyValueService.class), configManager, compactionManager, leaderConfig,
initializeAsync);
initializeAsync, ImmutableQosClientBuilder.builder().build());
}

@Override
Expand Down
Loading

0 comments on commit 297ffd1

Please sign in to comment.