Skip to content

Commit

Permalink
Fixes #2481.
Browse files Browse the repository at this point in the history
Adding storage-throttle module/etc. to contain logic for wrapping other storage implementations and limiting the number of requests that can go through to them at a given time.
Elasticsearch storage's maxRequests can be override by throttle properties if the throttle is enabled.
Inspired by work done on #2169.
  • Loading branch information
Logic-32 committed Apr 24, 2019
1 parent 689a27c commit ca2d3ca
Show file tree
Hide file tree
Showing 17 changed files with 1,002 additions and 9 deletions.
1 change: 1 addition & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@
<armeria.version>0.84.0</armeria.version>
<!-- This is from armeria, but be careful to avoid >= v20 apis -->
<guava.version>27.0.1-jre</guava.version>
<netflix.concurrency.limits.version>0.2.0</netflix.concurrency.limits.version>

<!-- only used for proto interop testing -->
<wire.version>3.0.0-alpha01</wire.version>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@

import static java.lang.String.format;
import static java.util.logging.Level.FINE;
import java.util.concurrent.RejectedExecutionException;

/**
* This component takes action on spans received from a transport. This includes deserializing,
Expand Down Expand Up @@ -227,7 +228,13 @@ RuntimeException doError(String message, Throwable e) {
message = format("%s due to %s(%s)", message, e.getClass().getSimpleName(), error);
warn(message, e);
}
return new RuntimeException(message, e);

if (e instanceof RejectedExecutionException) {
// This can indicate to a higher layer that we need to slow down ingestion
return (RejectedExecutionException) e;
} else {
return new RuntimeException(message, e);
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@

import static java.util.Arrays.asList;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.ArgumentMatchers.*;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.verify;
Expand Down Expand Up @@ -101,7 +102,7 @@ public void acceptSpansCallback_onErrorWithNullMessage() {
RuntimeException exception = new RuntimeException();
callback.onError(exception);

verify(collector).warn("Cannot store spans [1] due to RuntimeException()", exception);
verify(collector).warn(contains("Cannot store spans"), eq(exception));
}

@Test
Expand All @@ -111,24 +112,25 @@ public void acceptSpansCallback_onErrorWithMessage() {
callback.onError(exception);

verify(collector)
.warn("Cannot store spans [1] due to IllegalArgumentException(no beer)", exception);
.warn(contains("due to IllegalArgumentException(no beer)"), eq(exception));
}

@Test
public void errorAcceptingSpans_onErrorWithNullMessage() {
String message =
collector.errorStoringSpans(asList(CLIENT_SPAN), new RuntimeException()).getMessage();

assertThat(message).isEqualTo("Cannot store spans [1] due to RuntimeException()");
assertThat(message).contains("Cannot store spans");
assertThat(message).contains("due to RuntimeException()");
}

@Test
public void errorAcceptingSpans_onErrorWithMessage() {
RuntimeException exception = new IllegalArgumentException("no beer");
String message = collector.errorStoringSpans(asList(CLIENT_SPAN), exception).getMessage();

assertThat(message)
.isEqualTo("Cannot store spans [1] due to IllegalArgumentException(no beer)");
assertThat(message).contains("Cannot store spans");
assertThat(message).contains("due to IllegalArgumentException(no beer)");
}

@Test
Expand Down
16 changes: 16 additions & 0 deletions zipkin-server/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,17 @@
<version>${micrometer.version}</version>
</dependency>

<dependency>
<groupId>com.netflix.concurrency-limits</groupId>
<artifactId>concurrency-limits-core</artifactId>
<version>${netflix.concurrency.limits.version}</version>
</dependency>
<dependency>
<groupId>io.micrometer</groupId>
<artifactId>micrometer-core</artifactId>
<version>${micrometer.version}</version>
</dependency>

<!-- Trace api controller activity with Brave -->
<dependency>
<groupId>io.zipkin.brave</groupId>
Expand Down Expand Up @@ -299,6 +310,11 @@
<version>2.4.0</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>
<scope>test</scope>
</dependency>
</dependencies>

<profiles>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package zipkin2.server.internal;

import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
import org.springframework.boot.autoconfigure.condition.ConditionOutcome;
import org.springframework.boot.autoconfigure.condition.SpringBootCondition;
import org.springframework.context.annotation.ConditionContext;
import org.springframework.context.annotation.Conditional;
import org.springframework.core.type.AnnotatedTypeMetadata;

@Conditional(ConditionalOnThrottledStorage.ThrottledStorageCondition.class)
@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.TYPE, ElementType.METHOD})
public @interface ConditionalOnThrottledStorage {
class ThrottledStorageCondition extends SpringBootCondition {
@Override
public ConditionOutcome getMatchOutcome(ConditionContext context, AnnotatedTypeMetadata a) {
String throttleEnabled = context.getEnvironment()
.getProperty("zipkin.storage.throttle.enabled");

if (!Boolean.valueOf(throttleEnabled)) {
return ConditionOutcome.noMatch("zipkin.storage.throttle.enabled isn't true");
}

return ConditionOutcome.match();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,27 +26,34 @@
import io.micrometer.core.instrument.MeterRegistry;
import io.micrometer.core.instrument.config.MeterFilter;
import java.util.List;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.BeanFactory;
import org.springframework.beans.factory.BeanFactoryAware;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.beans.factory.config.BeanPostProcessor;
import org.springframework.boot.actuate.autoconfigure.metrics.MeterRegistryCustomizer;
import org.springframework.boot.actuate.health.HealthAggregator;
import org.springframework.boot.autoconfigure.ImportAutoConfiguration;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Condition;
import org.springframework.context.annotation.ConditionContext;
import org.springframework.context.annotation.Conditional;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Lazy;
import org.springframework.core.annotation.Order;
import org.springframework.core.type.AnnotatedTypeMetadata;
import org.springframework.web.servlet.config.annotation.ViewControllerRegistry;
import org.springframework.web.servlet.config.annotation.WebMvcConfigurer;
import zipkin2.server.internal.throttle.ZipkinStorageThrottleProperties;
import zipkin2.collector.CollectorMetrics;
import zipkin2.collector.CollectorSampler;
import zipkin2.server.internal.brave.TracingStorageComponent;
import zipkin2.storage.InMemoryStorage;
import zipkin2.storage.StorageComponent;
import zipkin2.server.internal.throttle.ThrottledStorageComponent;

@Configuration
@ImportAutoConfiguration(ArmeriaSpringActuatorAutoConfiguration.class)
Expand Down Expand Up @@ -154,10 +161,47 @@ public Object postProcessAfterInitialization(Object bean, String beanName) {
}
}

@Configuration
@EnableConfigurationProperties(ZipkinStorageThrottleProperties.class)
@ConditionalOnThrottledStorage
static class ThrottledStorageComponentEnhancer implements BeanPostProcessor, BeanFactoryAware {

/**
* Need this to resolve cyclic instantiation issue with spring. Mostly, this is for MeterRegistry as really
* bad things happen if you try to Autowire it (loss of JVM metrics) but also using it for properties just to make
* sure no cycles exist at all as a result of turning throttling on.
*
* <p>Ref: <a href="https://stackoverflow.com/a/19688634">Tracking down cause of Spring's "not eligible for auto-proxying"</a></p>
*/
private BeanFactory beanFactory;

@Override
public Object postProcessAfterInitialization(Object bean, String beanName) {
if (bean instanceof StorageComponent) {
ZipkinStorageThrottleProperties throttleProperties = beanFactory.getBean(ZipkinStorageThrottleProperties.class);
return new ThrottledStorageComponent((StorageComponent) bean,
beanFactory.getBean(MeterRegistry.class),
throttleProperties.getMinConcurrency(),
throttleProperties.getMaxConcurrency(),
throttleProperties.getMaxQueueSize());
}
return bean;
}

@Override
public void setBeanFactory(BeanFactory beanFactory) throws BeansException {
this.beanFactory = beanFactory;
}
}

/**
* This is a special-case configuration if there's no StorageComponent of any kind. In-Mem can
* supply both read apis, so we add two beans here.
*
* <p>Note: this needs to be {@link Lazy} to avoid circular dependency issues when using with
* {@link ThrottledStorageComponentEnhancer}.
*/
@Lazy
@Configuration
@Conditional(StorageTypeMemAbsentOrEmpty.class)
@ConditionalOnMissingBean(StorageComponent.class)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import okhttp3.HttpUrl;
import okhttp3.OkHttpClient;
import okhttp3.logging.HttpLoggingInterceptor;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.context.properties.ConfigurationProperties;
import zipkin2.elasticsearch.ElasticsearchStorage;

Expand All @@ -40,8 +41,10 @@ class ZipkinElasticsearchStorageProperties implements Serializable { // for Spar
private String index = "zipkin";
/** The date separator used to create the index name. Default to -. */
private String dateSeparator = "-";
/** Sets maximum in-flight requests from this process to any Elasticsearch host. Defaults to 64 */
/** Sets maximum in-flight requests from this process to any Elasticsearch host. Defaults to 64 (overriden by throttle settings) */
private int maxRequests = 64;
/** Overrdies maximum in-flight requests to match throttling settings if throttling is enabled. */
private Integer throttleMaxConcurrency;
/** Number of shards (horizontal scaling factor) per index. Defaults to 5. */
private int indexShards = 5;
/** Number of replicas (redundancy factor) per index. Defaults to 1.` */
Expand All @@ -61,6 +64,13 @@ class ZipkinElasticsearchStorageProperties implements Serializable { // for Spar
*/
private int timeout = 10_000;

public ZipkinElasticsearchStorageProperties(@Value("${zipkin.storage.throttle.enabled:false}") boolean throttleEnabled,
@Value("${zipkin.storage.throttle.maxConcurrency:200}") int throttleMaxConcurrency) {
if (throttleEnabled) {
this.throttleMaxConcurrency = throttleMaxConcurrency;
}
}

public String getPipeline() {
return pipeline;
}
Expand Down Expand Up @@ -180,7 +190,7 @@ public ElasticsearchStorage.Builder toBuilder(OkHttpClient client) {
.index(index)
.dateSeparator(dateSeparator.isEmpty() ? 0 : dateSeparator.charAt(0))
.pipeline(pipeline)
.maxRequests(maxRequests)
.maxRequests(throttleMaxConcurrency == null ? maxRequests : throttleMaxConcurrency)
.indexShards(indexShards)
.indexReplicas(indexReplicas);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package zipkin2.server.internal.throttle;

import com.netflix.concurrency.limits.Limiter;
import com.netflix.concurrency.limits.limiter.AbstractLimiter;
import io.micrometer.core.instrument.Gauge;
import io.micrometer.core.instrument.MeterRegistry;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadPoolExecutor;

class ActuateThrottleMetrics {
private final MeterRegistry registryInstance;

public ActuateThrottleMetrics(MeterRegistry registryInstance) {
this.registryInstance = registryInstance;
}

public void bind(ExecutorService executor) {
if(!(executor instanceof ThreadPoolExecutor)){
return;
}

ThreadPoolExecutor pool = (ThreadPoolExecutor) executor;
Gauge.builder("zipkin_storage.throttle.concurrency", pool::getCorePoolSize)
.description("number of threads running storage requests")
.register(registryInstance);
Gauge.builder("zipkin_storage.throttle.queue_size", pool.getQueue()::size)
.description("number of items queued waiting for access to storage")
.register(registryInstance);
}

public void bind(Limiter<Void> limiter) {
if(!(limiter instanceof AbstractLimiter)){
return;
}

AbstractLimiter abstractLimiter = (AbstractLimiter) limiter;

// This value should parallel (zipkin_storage.throttle.queue_size + zipkin_storage.throttle.concurrency)
// It is tracked to make sure it doesn't perpetually increase. If it does then we're not resolving LimitListeners.
Gauge.builder("zipkin_storage.throttle.in_flight_requests", abstractLimiter::getInflight)
.description("number of requests the limiter thinks are active")
.register(registryInstance);
}
}
Loading

0 comments on commit ca2d3ca

Please sign in to comment.