Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding storage-throttle module to address "over capacity" issues #2502

Merged
merged 2 commits into from
May 10, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import zipkin2.storage.StorageComponent;

import static java.util.Arrays.asList;
import java.util.concurrent.RejectedExecutionException;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;
Expand Down Expand Up @@ -186,6 +187,16 @@ public void storeSpansCallback_onErrorWithMessage() {
}

@Test
public void errorAcceptingSpans_onErrorRejectedExecution() {
RuntimeException error = new RejectedExecutionException("slow down");
collector.handleStorageError(TRACE, error, callback);

verify(callback).onError(error);
assertThat(messages)
.containsOnly("Cannot store spans [1, 1, 2, ...] due to RejectedExecutionException(slow down)");
verify(metrics).incrementSpansDropped(4);
}

public void handleStorageError_onErrorWithNullMessage() {
RuntimeException error = new RuntimeException();
collector.handleStorageError(TRACE, error, callback);
Expand Down
10 changes: 10 additions & 0 deletions zipkin-server/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,16 @@ Defaults to true
* `AUTOCOMPLETE_KEYS`: list of span tag keys which will be returned by the `/api/v2/autocompleteTags` endpoint
* `AUTOCOMPLETE_TTL`: How long in milliseconds to suppress calls to write the same autocomplete key/value pair. Default 3600000 (1 hr)

### Throttled Storage (Experimental)
These settings can be used to help tune the rate at which Zipkin flushes data to another, underlying `StorageComponent` (such as Elasticsearch):

* `STORAGE_THROTTLE_ENABLED`: Enables throttling
* `STORAGE_THROTTLE_MIN_CONCURRENCY`: Minimum number of Threads to use for writing to storage.
* `STORAGE_THROTTLE_MAX_CONCURRENCY`: Maximum number of Threads to use for writing to storage. In order to avoid configuration drift, this value may override other, storage-specific values such as Elasticsearch's `ES_MAX_REQUESTS`.
* `STORAGE_THROTTLE_MAX_QUEUE_SIZE`: How many messages to buffer while all Threads are writing data before abandoning a message (0 = no buffering).

As this feature is experimental, it is not recommended to run this in production environments.

### Cassandra Storage
Zipkin's [Cassandra storage component](../zipkin-storage/cassandra)
supports version 3.11+ and applies when `STORAGE_TYPE` is set to `cassandra3`:
Expand Down
17 changes: 16 additions & 1 deletion zipkin-server/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -250,6 +250,17 @@
<version>${micrometer.version}</version>
</dependency>

<dependency>
<groupId>com.netflix.concurrency-limits</groupId>
<artifactId>concurrency-limits-core</artifactId>
<version>0.2.2</version>
</dependency>
<dependency>
codefromthecrypt marked this conversation as resolved.
Show resolved Hide resolved
<groupId>io.micrometer</groupId>
<artifactId>micrometer-core</artifactId>
<version>${micrometer.version}</version>
</dependency>

<!-- Trace api controller activity with Brave -->
<dependency>
<groupId>io.zipkin.brave</groupId>
Expand Down Expand Up @@ -299,6 +310,11 @@
<version>2.4.0</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>
<scope>test</scope>
</dependency>
</dependencies>

<profiles>
Expand Down Expand Up @@ -372,7 +388,6 @@
<version>${kotlin.version}</version>
<configuration>
<jvmTarget>${main.java.version}</jvmTarget>
<experimentalCoroutines>enable</experimentalCoroutines>
</configuration>
<executions>
<execution>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package zipkin2.server.internal;

import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
import org.springframework.boot.autoconfigure.condition.ConditionOutcome;
import org.springframework.boot.autoconfigure.condition.SpringBootCondition;
import org.springframework.context.annotation.ConditionContext;
import org.springframework.context.annotation.Conditional;
import org.springframework.core.type.AnnotatedTypeMetadata;

@Conditional(ConditionalOnThrottledStorage.ThrottledStorageCondition.class)
@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.TYPE, ElementType.METHOD})
@interface ConditionalOnThrottledStorage {
class ThrottledStorageCondition extends SpringBootCondition {
@Override
public ConditionOutcome getMatchOutcome(ConditionContext context, AnnotatedTypeMetadata a) {
String throttleEnabled = context.getEnvironment()
.getProperty("zipkin.storage.throttle.enabled");

if (!Boolean.valueOf(throttleEnabled)) {
return ConditionOutcome.noMatch("zipkin.storage.throttle.enabled isn't true");
}

return ConditionOutcome.match();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,27 +26,34 @@
import io.micrometer.core.instrument.MeterRegistry;
import io.micrometer.core.instrument.config.MeterFilter;
import java.util.List;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.BeanFactory;
import org.springframework.beans.factory.BeanFactoryAware;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.beans.factory.config.BeanPostProcessor;
import org.springframework.boot.actuate.autoconfigure.metrics.MeterRegistryCustomizer;
import org.springframework.boot.actuate.health.HealthAggregator;
import org.springframework.boot.autoconfigure.ImportAutoConfiguration;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Condition;
import org.springframework.context.annotation.ConditionContext;
import org.springframework.context.annotation.Conditional;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Lazy;
import org.springframework.core.annotation.Order;
import org.springframework.core.type.AnnotatedTypeMetadata;
import org.springframework.web.servlet.config.annotation.ViewControllerRegistry;
import org.springframework.web.servlet.config.annotation.WebMvcConfigurer;
import zipkin2.server.internal.throttle.ZipkinStorageThrottleProperties;
import zipkin2.collector.CollectorMetrics;
import zipkin2.collector.CollectorSampler;
import zipkin2.server.internal.brave.TracingStorageComponent;
import zipkin2.storage.InMemoryStorage;
import zipkin2.storage.StorageComponent;
import zipkin2.server.internal.throttle.ThrottledStorageComponent;

@Configuration
@ImportAutoConfiguration(ArmeriaSpringActuatorAutoConfiguration.class)
Expand Down Expand Up @@ -157,10 +164,47 @@ public Object postProcessAfterInitialization(Object bean, String beanName) {
}
}

@Configuration
@EnableConfigurationProperties(ZipkinStorageThrottleProperties.class)
@ConditionalOnThrottledStorage
static class ThrottledStorageComponentEnhancer implements BeanPostProcessor, BeanFactoryAware {

/**
* Need this to resolve cyclic instantiation issue with spring. Mostly, this is for MeterRegistry as really
* bad things happen if you try to Autowire it (loss of JVM metrics) but also using it for properties just to make
* sure no cycles exist at all as a result of turning throttling on.
*
* <p>Ref: <a href="https://stackoverflow.com/a/19688634">Tracking down cause of Spring's "not eligible for auto-proxying"</a></p>
*/
private BeanFactory beanFactory;

@Override
public Object postProcessAfterInitialization(Object bean, String beanName) {
if (bean instanceof StorageComponent) {
ZipkinStorageThrottleProperties throttleProperties = beanFactory.getBean(ZipkinStorageThrottleProperties.class);
return new ThrottledStorageComponent((StorageComponent) bean,
beanFactory.getBean(MeterRegistry.class),
throttleProperties.getMinConcurrency(),
throttleProperties.getMaxConcurrency(),
throttleProperties.getMaxQueueSize());
}
return bean;
}

@Override
public void setBeanFactory(BeanFactory beanFactory) throws BeansException {
this.beanFactory = beanFactory;
}
}

/**
* This is a special-case configuration if there's no StorageComponent of any kind. In-Mem can
* supply both read apis, so we add two beans here.
*
* <p>Note: this needs to be {@link Lazy} to avoid circular dependency issues when using with
* {@link ThrottledStorageComponentEnhancer}.
*/
@Lazy
@Configuration
@Conditional(StorageTypeMemAbsentOrEmpty.class)
@ConditionalOnMissingBean(StorageComponent.class)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import okhttp3.HttpUrl;
import okhttp3.OkHttpClient;
import okhttp3.logging.HttpLoggingInterceptor;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.context.properties.ConfigurationProperties;
import zipkin2.elasticsearch.ElasticsearchStorage;

Expand All @@ -40,8 +41,10 @@ class ZipkinElasticsearchStorageProperties implements Serializable { // for Spar
private String index = "zipkin";
/** The date separator used to create the index name. Default to -. */
private String dateSeparator = "-";
/** Sets maximum in-flight requests from this process to any Elasticsearch host. Defaults to 64 */
/** Sets maximum in-flight requests from this process to any Elasticsearch host. Defaults to 64 (overriden by throttle settings) */
private int maxRequests = 64;
/** Overrides maximum in-flight requests to match throttling settings if throttling is enabled. */
private Integer throttleMaxConcurrency;
/** Number of shards (horizontal scaling factor) per index. Defaults to 5. */
private int indexShards = 5;
/** Number of replicas (redundancy factor) per index. Defaults to 1.` */
Expand All @@ -61,6 +64,14 @@ class ZipkinElasticsearchStorageProperties implements Serializable { // for Spar
*/
private int timeout = 10_000;

ZipkinElasticsearchStorageProperties(
@Value("${zipkin.storage.throttle.enabled:false}") boolean throttleEnabled,
@Value("${zipkin.storage.throttle.maxConcurrency:200}") int throttleMaxConcurrency) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does maxConcurrency need updated to max-concurrency here?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shouldn't be required, but it is nice to do this. good catch

(ps I'm knackered so will deal with the other PR and any comments in the morning)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Kk. Thank you for finishing this up and sorry again for the lag on my end! Work got in the way :(

if (throttleEnabled) {
this.throttleMaxConcurrency = throttleMaxConcurrency;
}
}

public String getPipeline() {
return pipeline;
}
Expand Down Expand Up @@ -180,7 +191,7 @@ public ElasticsearchStorage.Builder toBuilder(OkHttpClient client) {
.index(index)
.dateSeparator(dateSeparator.isEmpty() ? 0 : dateSeparator.charAt(0))
.pipeline(pipeline)
.maxRequests(maxRequests)
.maxRequests(throttleMaxConcurrency == null ? maxRequests : throttleMaxConcurrency)
.indexShards(indexShards)
.indexReplicas(indexReplicas);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package zipkin2.server.internal.throttle;

import com.netflix.concurrency.limits.limiter.AbstractLimiter;
import io.micrometer.core.instrument.Gauge;
import io.micrometer.core.instrument.MeterRegistry;
import java.util.concurrent.ThreadPoolExecutor;
import zipkin2.server.internal.ActuateCollectorMetrics;

/** Follows the same naming convention as {@link ActuateCollectorMetrics} */
final class ActuateThrottleMetrics {
codefromthecrypt marked this conversation as resolved.
Show resolved Hide resolved
final MeterRegistry registryInstance;

ActuateThrottleMetrics(MeterRegistry registryInstance) {
this.registryInstance = registryInstance;
}

void bind(ThreadPoolExecutor pool) {
Gauge.builder("zipkin_storage.throttle.concurrency", pool::getCorePoolSize)
codefromthecrypt marked this conversation as resolved.
Show resolved Hide resolved
.description("number of threads running storage requests")
.register(registryInstance);
Gauge.builder("zipkin_storage.throttle.queue_size", pool.getQueue()::size)
.description("number of items queued waiting for access to storage")
.register(registryInstance);
}

void bind(AbstractLimiter limiter) {
// This value should parallel (zipkin_storage.throttle.queue_size + zipkin_storage.throttle.concurrency)
// It is tracked to make sure it doesn't perpetually increase. If it does then we're not resolving LimitListeners.
Gauge.builder("zipkin_storage.throttle.in_flight_requests", limiter::getInflight)
.description("number of requests the limiter thinks are active")
.register(registryInstance);
}
}
Loading