Skip to content
This repository has been archived by the owner on Nov 14, 2024. It is now read-only.

Commit

Permalink
qos config
Browse files Browse the repository at this point in the history
  • Loading branch information
Nathan Ziebart committed Nov 15, 2017
1 parent dd55403 commit 2318f69
Show file tree
Hide file tree
Showing 14 changed files with 250 additions and 49 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
import com.fasterxml.jackson.databind.annotation.JsonSerialize;
import com.palantir.atlasdb.AtlasDbConstants;
import com.palantir.remoting.api.config.service.ServiceConfiguration;
import com.palantir.atlasdb.qos.config.QosClientConfig;

@JsonDeserialize(as = ImmutableAtlasDbRuntimeConfig.class)
@JsonSerialize(as = ImmutableAtlasDbRuntimeConfig.class)
Expand Down Expand Up @@ -61,7 +61,10 @@ public long getTimestampCacheSize() {
return AtlasDbConstants.DEFAULT_TIMESTAMP_CACHE_SIZE;
}

public abstract Optional<ServiceConfiguration> getQosServiceConfiguration();
@Value.Default
public QosClientConfig qos() {
return QosClientConfig.DEFAULT;
}

/**
* Runtime live-reloadable parameters for communicating with TimeLock.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,6 @@
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.function.Supplier;
Expand All @@ -30,7 +28,6 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.codahale.metrics.InstrumentedScheduledExecutorService;
import com.codahale.metrics.MetricRegistry;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.google.common.annotations.VisibleForTesting;
Expand Down Expand Up @@ -72,11 +69,10 @@
import com.palantir.atlasdb.persistentlock.KvsBackedPersistentLockService;
import com.palantir.atlasdb.persistentlock.NoOpPersistentLockService;
import com.palantir.atlasdb.persistentlock.PersistentLockService;
import com.palantir.atlasdb.qos.FakeQosClient;
import com.palantir.atlasdb.qos.QosClient;
import com.palantir.atlasdb.qos.QosService;
import com.palantir.atlasdb.qos.client.AtlasDbQosClient;
import com.palantir.atlasdb.qos.ratelimit.QosRateLimiter;
import com.palantir.atlasdb.qos.config.QosClientConfig;
import com.palantir.atlasdb.qos.ratelimit.QosRateLimiters;
import com.palantir.atlasdb.schema.generated.SweepTableFactory;
import com.palantir.atlasdb.spi.AtlasDbFactory;
import com.palantir.atlasdb.spi.KeyValueServiceConfig;
Expand Down Expand Up @@ -117,9 +113,6 @@
import com.palantir.lock.impl.LockServiceImpl;
import com.palantir.lock.v2.TimelockService;
import com.palantir.logsafe.UnsafeArg;
import com.palantir.remoting.api.config.service.ServiceConfiguration;
import com.palantir.remoting3.clients.ClientConfigurations;
import com.palantir.remoting3.jaxrs.JaxRsClient;
import com.palantir.timestamp.TimestampService;
import com.palantir.timestamp.TimestampStoreInvalidator;
import com.palantir.util.OptionalResolver;
Expand Down Expand Up @@ -316,8 +309,7 @@ SerializableTransactionManager serializable() {
java.util.function.Supplier<AtlasDbRuntimeConfig> runtimeConfigSupplier =
() -> runtimeConfigSupplier().get().orElse(defaultRuntime);


QosClient qosClient = getQosClient(runtimeConfigSupplier.get().getQosServiceConfiguration());
QosClient qosClient = getQosClient(runtimeConfigSupplier.get().qos());

ServiceDiscoveringAtlasSupplier atlasFactory =
new ServiceDiscoveringAtlasSupplier(
Expand Down Expand Up @@ -411,20 +403,10 @@ SerializableTransactionManager serializable() {
return transactionManager;
}

private QosClient getQosClient(Optional<ServiceConfiguration> serviceConfiguration) {
return serviceConfiguration.map(this::createAtlasDbQosClient).orElse(FakeQosClient.INSTANCE);
}

private QosClient createAtlasDbQosClient(ServiceConfiguration serviceConfiguration) {
QosService qosService = JaxRsClient.create(QosService.class,
userAgent(),
ClientConfigurations.of(serviceConfiguration));
private QosClient getQosClient(QosClientConfig config) {
// TODO(nziebart): create a RefreshingRateLimiter
ScheduledExecutorService scheduler = new InstrumentedScheduledExecutorService(
Executors.newSingleThreadScheduledExecutor(),
AtlasDbMetrics.getMetricRegistry(),
"qos-client-executor");
return AtlasDbQosClient.create(QosRateLimiter.create());
QosRateLimiters rateLimiters = QosRateLimiters.create(config.limits());
return AtlasDbQosClient.create(rateLimiters);
}

private static boolean areTransactionManagerInitializationPrerequisitesSatisfied(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@
import com.palantir.atlasdb.config.TimeLockClientConfig;
import com.palantir.atlasdb.factory.startup.TimeLockMigrator;
import com.palantir.atlasdb.memory.InMemoryAtlasDbConfig;
import com.palantir.atlasdb.qos.config.QosClientConfig;
import com.palantir.atlasdb.table.description.GenericTestSchema;
import com.palantir.atlasdb.transaction.impl.SerializableTransactionManager;
import com.palantir.atlasdb.util.MetricsRule;
Expand Down Expand Up @@ -193,7 +194,7 @@ public void setup() throws JsonProcessingException {

runtimeConfig = mock(AtlasDbRuntimeConfig.class);
when(runtimeConfig.timestampClient()).thenReturn(ImmutableTimestampClientConfig.of(false));
when(runtimeConfig.getQosServiceConfiguration()).thenReturn(Optional.empty());
when(runtimeConfig.qos()).thenReturn(QosClientConfig.DEFAULT);
when(runtimeConfig.timelockRuntime()).thenReturn(Optional.empty());

environment = mock(Consumer.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,8 @@ public class QosServiceIntegrationTest {

@Test
public void returnsConfiguredLimits() {
assertThat(service.getLimit("test")).isEqualTo(10L);
assertThat(service.getLimit("test2")).isEqualTo(20L);
assertThat(service.getLimit("test")).isEqualTo(10);
assertThat(service.getLimit("test2")).isEqualTo(20);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -26,24 +26,24 @@
import com.google.common.base.Ticker;
import com.palantir.atlasdb.qos.QosClient;
import com.palantir.atlasdb.qos.QosMetrics;
import com.palantir.atlasdb.qos.ratelimit.QosRateLimiter;
import com.palantir.atlasdb.qos.ratelimit.QosRateLimiters;

public class AtlasDbQosClient implements QosClient {

private static final Logger log = LoggerFactory.getLogger(AtlasDbQosClient.class);

private final QosRateLimiter rateLimiter;
private final QosRateLimiters rateLimiters;
private final QosMetrics metrics;
private final Ticker ticker;

public static AtlasDbQosClient create(QosRateLimiter rateLimiter) {
return new AtlasDbQosClient(rateLimiter, new QosMetrics(), Ticker.systemTicker());
public static AtlasDbQosClient create(QosRateLimiters rateLimiters) {
return new AtlasDbQosClient(rateLimiters, new QosMetrics(), Ticker.systemTicker());
}

@VisibleForTesting
AtlasDbQosClient(QosRateLimiter rateLimiter, QosMetrics metrics, Ticker ticker) {
AtlasDbQosClient(QosRateLimiters rateLimiters, QosMetrics metrics, Ticker ticker) {
this.metrics = metrics;
this.rateLimiter = rateLimiter;
this.rateLimiters = rateLimiters;
this.ticker = ticker;
}

Expand All @@ -53,7 +53,7 @@ public <T, E extends Exception> T executeRead(
ReadQuery<T, E> query,
Function<T, Integer> weigher) throws E {
int estimatedWeight = getWeight(estimatedWeigher, 1);
rateLimiter.consumeWithBackoff(estimatedWeight);
rateLimiters.read().consumeWithBackoff(estimatedWeight);

// TODO(nziebart): decide what to do if we encounter a timeout exception
long startTimeNanos = ticker.read();
Expand All @@ -64,15 +64,15 @@ public <T, E extends Exception> T executeRead(
metrics.updateReadCount();
metrics.updateBytesRead(actualWeight);
metrics.updateReadTimeMicros(TimeUnit.NANOSECONDS.toMicros(totalTimeNanos));
rateLimiter.recordAdjustment(actualWeight - estimatedWeight);
rateLimiters.read().recordAdjustment(actualWeight - estimatedWeight);

return result;
}

@Override
public <T, E extends Exception> void executeWrite(Supplier<Integer> weigher, WriteQuery<E> query) throws E {
int weight = getWeight(weigher, 1);
rateLimiter.consumeWithBackoff(weight);
rateLimiters.write().consumeWithBackoff(weight);

// TODO(nziebart): decide what to do if we encounter a timeout exception
long startTimeNanos = ticker.read();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
/*
* Copyright 2017 Palantir Technologies, Inc. All rights reserved.
*
* Licensed under the BSD-3 License (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://opensource.org/licenses/BSD-3-Clause
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.palantir.atlasdb.qos.config;

import java.util.Optional;

import org.immutables.value.Value;

import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
import com.fasterxml.jackson.databind.annotation.JsonSerialize;
import com.palantir.remoting.api.config.service.HumanReadableDuration;
import com.palantir.remoting.api.config.service.ServiceConfiguration;

@Value.Immutable
@JsonDeserialize(as = ImmutableQosClientConfig.class)
@JsonSerialize(as = ImmutableQosClientConfig.class)
public abstract class QosClientConfig {

public static final QosClientConfig DEFAULT = ImmutableQosClientConfig.builder().build();

public abstract Optional<ServiceConfiguration> qosService();

@Value.Default
public HumanReadableDuration maxBackoffSleepTime() {
return HumanReadableDuration.seconds(10);
}

@Value.Default
public QosLimitsConfig limits() {
return QosLimitsConfig.DEFAULT_NO_LIMITS;
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/*
* Copyright 2017 Palantir Technologies, Inc. All rights reserved.
*
* Licensed under the BSD-3 License (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://opensource.org/licenses/BSD-3-Clause
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.palantir.atlasdb.qos.config;

import org.immutables.value.Value;

import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
import com.fasterxml.jackson.databind.annotation.JsonSerialize;

@Value.Immutable
@JsonDeserialize(as = ImmutableQosLimitsConfig.class)
@JsonSerialize(as = ImmutableQosLimitsConfig.class)
public abstract class QosLimitsConfig {

public static final QosLimitsConfig DEFAULT_NO_LIMITS = ImmutableQosLimitsConfig.builder().build();

@Value.Default
public int readBytesPerSecond() {
return Integer.MAX_VALUE;
}

@Value.Default
public int writeBytesPerSecond() {
return Integer.MAX_VALUE;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ public Duration consumeWithBackoff(int estimatedNumUnits) {
/**
* Records an adjustment to the original estimate of units consumed passed to {@link #consumeWithBackoff(int)}. This
* should be called after a query returns, when the exact number of units consumed is known. This value may be
* positive (if the original estimate was too small) or negative (if the original estimate was too large.
* positive (if the original estimate was too small) or negative (if the original estimate was too large).
*/
public void recordAdjustment(int adjustmentUnits) {
if (adjustmentUnits > 0) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
/*
* Copyright 2017 Palantir Technologies, Inc. All rights reserved.
*
* Licensed under the BSD-3 License (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://opensource.org/licenses/BSD-3-Clause
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.palantir.atlasdb.qos.ratelimit;

import org.immutables.value.Value;

import com.palantir.atlasdb.qos.config.QosLimitsConfig;

@Value.Immutable
public interface QosRateLimiters {

static QosRateLimiters create(QosLimitsConfig config) {
QosRateLimiter readLimiter = QosRateLimiter.create();
readLimiter.updateRate(config.readBytesPerSecond());

QosRateLimiter writeLimiter = QosRateLimiter.create();
writeLimiter.updateRate(config.writeBytesPerSecond());

return ImmutableQosRateLimiters.builder()
.read(readLimiter)
.write(writeLimiter)
.build();
}

QosRateLimiter read();

QosRateLimiter write();

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
/*
* Copyright 2017 Palantir Technologies, Inc. All rights reserved.
*
* Licensed under the BSD-3 License (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://opensource.org/licenses/BSD-3-Clause
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.palantir.atlasdb.qos;

import static org.assertj.core.api.Assertions.assertThat;

import java.io.File;
import java.io.IOException;
import java.nio.file.Paths;

import org.junit.Test;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.dataformat.yaml.YAMLFactory;
import com.fasterxml.jackson.datatype.guava.GuavaModule;
import com.fasterxml.jackson.datatype.jdk8.Jdk8Module;
import com.palantir.atlasdb.qos.config.ImmutableQosClientConfig;
import com.palantir.atlasdb.qos.config.ImmutableQosLimitsConfig;
import com.palantir.atlasdb.qos.config.QosClientConfig;
import com.palantir.atlasdb.qos.config.QosServiceRuntimeConfig;
import com.palantir.remoting.api.config.service.HumanReadableDuration;
import com.palantir.remoting.api.config.service.ServiceConfiguration;
import com.palantir.remoting.api.config.ssl.SslConfiguration;
import com.palantir.remoting3.ext.jackson.ShimJdk7Module;

public class QosClientConfigDeserializationTest {

private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(new YAMLFactory())
.registerModule(new GuavaModule())
.registerModule(new ShimJdk7Module())
.registerModule(new Jdk8Module());

@Test
public void canDeserializeFromYaml() throws IOException {
QosClientConfig expected = ImmutableQosClientConfig.builder()
.qosService(
ServiceConfiguration.builder()
.addUris("http://localhost:8080")
.security(SslConfiguration.of(Paths.get("trustStore.jks")))
.build())
.maxBackoffSleepTime(HumanReadableDuration.seconds(20))
.limits(ImmutableQosLimitsConfig.builder()
.readBytesPerSecond(123)
.writeBytesPerSecond(456)
.build())
.build();

File configFile = new File(QosServiceRuntimeConfig.class.getResource("/qos-client.yml").getPath());
QosClientConfig config = OBJECT_MAPPER.readValue(configFile, QosClientConfig.class);

assertThat(config).isEqualTo(expected);
}

}
Loading

0 comments on commit 2318f69

Please sign in to comment.