From 53837abb3492b0b51f3207cdfe974f99a14fc35f Mon Sep 17 00:00:00 2001 From: Logic-32 <25107222+Logic-32@users.noreply.github.com> Date: Wed, 17 Apr 2019 17:08:14 -0600 Subject: [PATCH] Fixes #2481. Adding ThrottledStorageComponent/etc. to contain logic for wrapping other storage implementations and limiting the number of requests that can go through to them at a given time. Elasticsearch storage's maxRequests can be override by throttle properties if the throttle is enabled. Inspired by work done on #2169. --- README.md | 3 + pom.xml | 1 + .../java/zipkin2/collector/CollectorTest.java | 11 + zipkin-server/README.md | 8 + zipkin-server/pom.xml | 16 + .../ConditionalOnThrottledStorage.java | 46 +++ .../internal/ZipkinServerConfiguration.java | 44 +++ .../ZipkinElasticsearchStorageProperties.java | 14 +- .../throttle/ActuateThrottleMetrics.java | 60 ++++ .../internal/throttle/ThrottledCall.java | 240 +++++++++++++ .../throttle/ThrottledSpanConsumer.java | 47 +++ .../throttle/ThrottledStorageComponent.java | 179 ++++++++++ .../ZipkinStorageThrottleProperties.java | 69 ++++ .../main/resources/zipkin-server-shared.yml | 5 + .../BasicAuthInterceptorTest.java | 2 +- .../server/internal/throttle/FakeCall.java | 62 ++++ .../internal/throttle/ThrottledCallTest.java | 317 ++++++++++++++++++ .../ThrottledStorageComponentTest.java | 69 ++++ .../internal/client/HttpCall.java | 5 + 19 files changed, 1195 insertions(+), 3 deletions(-) create mode 100644 zipkin-server/src/main/java/zipkin2/server/internal/ConditionalOnThrottledStorage.java create mode 100644 zipkin-server/src/main/java/zipkin2/server/internal/throttle/ActuateThrottleMetrics.java create mode 100644 zipkin-server/src/main/java/zipkin2/server/internal/throttle/ThrottledCall.java create mode 100644 zipkin-server/src/main/java/zipkin2/server/internal/throttle/ThrottledSpanConsumer.java create mode 100644 zipkin-server/src/main/java/zipkin2/server/internal/throttle/ThrottledStorageComponent.java create mode 100644 zipkin-server/src/main/java/zipkin2/server/internal/throttle/ZipkinStorageThrottleProperties.java create mode 100644 zipkin-server/src/test/java/zipkin2/server/internal/throttle/FakeCall.java create mode 100644 zipkin-server/src/test/java/zipkin2/server/internal/throttle/ThrottledCallTest.java create mode 100644 zipkin-server/src/test/java/zipkin2/server/internal/throttle/ThrottledStorageComponentTest.java diff --git a/README.md b/README.md index e4ed5fab83a..d78d0d1910b 100644 --- a/README.md +++ b/README.md @@ -94,6 +94,9 @@ It stores spans as json and has been designed for larger scale. Note: This store requires a [spark job](https://github.com/openzipkin/zipkin-dependencies) to aggregate dependency links. +### Throttling +As part of a [Collector surge and error handling](https://cwiki.apache.org/confluence/display/ZIPKIN/Collector+surge+and+error+handling) discussion that took place a throttling mechanism was added to allow more fine-grained control over how Zipkin interacts with the various `StorageComponents`. In particular, for those installations which use a push-based Collector (such as the HTTP rest API), enabling the throttle can allow Zipkin to buffer some messages in order to avoid aggressively dropping them. See [zipkin-server](zipkin-server#throttled-storage) for configuration information. + ### Disabling search The following API endpoints provide search features, and are enabled by default. Search primarily allows the trace list screen of the UI operate. diff --git a/pom.xml b/pom.xml index bb45fe19df0..73092e20f7a 100755 --- a/pom.xml +++ b/pom.xml @@ -62,6 +62,7 @@ 0.84.0 27.0.1-jre + 0.2.0 3.0.0-alpha01 diff --git a/zipkin-collector/core/src/test/java/zipkin2/collector/CollectorTest.java b/zipkin-collector/core/src/test/java/zipkin2/collector/CollectorTest.java index aabcbcb40bc..56f988c2ea1 100644 --- a/zipkin-collector/core/src/test/java/zipkin2/collector/CollectorTest.java +++ b/zipkin-collector/core/src/test/java/zipkin2/collector/CollectorTest.java @@ -31,6 +31,7 @@ import zipkin2.storage.StorageComponent; import static java.util.Arrays.asList; +import java.util.concurrent.RejectedExecutionException; import static org.assertj.core.api.Assertions.assertThat; import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.mock; @@ -186,6 +187,16 @@ public void storeSpansCallback_onErrorWithMessage() { } @Test + public void errorAcceptingSpans_onErrorRejectedExecution() { + RuntimeException error = new RejectedExecutionException("slow down"); + collector.handleStorageError(TRACE, error, callback); + + verify(callback).onError(error); + assertThat(messages) + .containsOnly("Cannot store spans [1, 1, 2, ...] due to RejectedExecutionException(slow down)"); + verify(metrics).incrementSpansDropped(4); + } + public void handleStorageError_onErrorWithNullMessage() { RuntimeException error = new RuntimeException(); collector.handleStorageError(TRACE, error, callback); diff --git a/zipkin-server/README.md b/zipkin-server/README.md index 7e64055cdae..bd03490f12a 100644 --- a/zipkin-server/README.md +++ b/zipkin-server/README.md @@ -157,6 +157,14 @@ Defaults to true * `AUTOCOMPLETE_KEYS`: list of span tag keys which will be returned by the `/api/v2/autocompleteTags` endpoint * `AUTOCOMPLETE_TTL`: How long in milliseconds to suppress calls to write the same autocomplete key/value pair. Default 3600000 (1 hr) +### Throttled Storage +These settings can be used to help tune the rate at which Zipkin flushes data to another, underlying `StorageComponent` (such as Elasticsearch): + + * `STORAGE_THROTTLE_ENABLED`: Enables throttling + * `STORAGE_THROTTLE_MIN_CONCURRENCY`: Minimum number of Threads to use for writing to storage. + * `STORAGE_THROTTLE_MAX_CONCURRENCY`: Maximum number of Threads to use for writing to storage. In order to avoid configuration drift, this value may override other, storage-specific values such as Elasticsearch's `ES_MAX_REQUESTS`. + * `STORAGE_THROTTLE_MAX_QUEUE_SIZE`: How many messages to buffer while all Threads are writing data before abandoning a message (0 = no buffering). + ### Cassandra Storage Zipkin's [Cassandra storage component](../zipkin-storage/cassandra) supports version 3.11+ and applies when `STORAGE_TYPE` is set to `cassandra3`: diff --git a/zipkin-server/pom.xml b/zipkin-server/pom.xml index 2574765eda4..18c3dbdf985 100644 --- a/zipkin-server/pom.xml +++ b/zipkin-server/pom.xml @@ -250,6 +250,17 @@ ${micrometer.version} + + com.netflix.concurrency-limits + concurrency-limits-core + ${netflix.concurrency.limits.version} + + + io.micrometer + micrometer-core + ${micrometer.version} + + io.zipkin.brave @@ -299,6 +310,11 @@ 2.4.0 test + + org.mockito + mockito-core + test + diff --git a/zipkin-server/src/main/java/zipkin2/server/internal/ConditionalOnThrottledStorage.java b/zipkin-server/src/main/java/zipkin2/server/internal/ConditionalOnThrottledStorage.java new file mode 100644 index 00000000000..0d7cb4e1845 --- /dev/null +++ b/zipkin-server/src/main/java/zipkin2/server/internal/ConditionalOnThrottledStorage.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package zipkin2.server.internal; + +import java.lang.annotation.ElementType; +import java.lang.annotation.Retention; +import java.lang.annotation.RetentionPolicy; +import java.lang.annotation.Target; +import org.springframework.boot.autoconfigure.condition.ConditionOutcome; +import org.springframework.boot.autoconfigure.condition.SpringBootCondition; +import org.springframework.context.annotation.ConditionContext; +import org.springframework.context.annotation.Conditional; +import org.springframework.core.type.AnnotatedTypeMetadata; + +@Conditional(ConditionalOnThrottledStorage.ThrottledStorageCondition.class) +@Retention(RetentionPolicy.RUNTIME) +@Target({ElementType.TYPE, ElementType.METHOD}) +@interface ConditionalOnThrottledStorage { + class ThrottledStorageCondition extends SpringBootCondition { + @Override + public ConditionOutcome getMatchOutcome(ConditionContext context, AnnotatedTypeMetadata a) { + String throttleEnabled = context.getEnvironment() + .getProperty("zipkin.storage.throttle.enabled"); + + if (!Boolean.valueOf(throttleEnabled)) { + return ConditionOutcome.noMatch("zipkin.storage.throttle.enabled isn't true"); + } + + return ConditionOutcome.match(); + } + } +} diff --git a/zipkin-server/src/main/java/zipkin2/server/internal/ZipkinServerConfiguration.java b/zipkin-server/src/main/java/zipkin2/server/internal/ZipkinServerConfiguration.java index cb381a7f338..b3d4b90ae3a 100644 --- a/zipkin-server/src/main/java/zipkin2/server/internal/ZipkinServerConfiguration.java +++ b/zipkin-server/src/main/java/zipkin2/server/internal/ZipkinServerConfiguration.java @@ -26,6 +26,9 @@ import io.micrometer.core.instrument.MeterRegistry; import io.micrometer.core.instrument.config.MeterFilter; import java.util.List; +import org.springframework.beans.BeansException; +import org.springframework.beans.factory.BeanFactory; +import org.springframework.beans.factory.BeanFactoryAware; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.beans.factory.config.BeanPostProcessor; @@ -33,20 +36,24 @@ import org.springframework.boot.actuate.health.HealthAggregator; import org.springframework.boot.autoconfigure.ImportAutoConfiguration; import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean; +import org.springframework.boot.context.properties.EnableConfigurationProperties; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Condition; import org.springframework.context.annotation.ConditionContext; import org.springframework.context.annotation.Conditional; import org.springframework.context.annotation.Configuration; +import org.springframework.context.annotation.Lazy; import org.springframework.core.annotation.Order; import org.springframework.core.type.AnnotatedTypeMetadata; import org.springframework.web.servlet.config.annotation.ViewControllerRegistry; import org.springframework.web.servlet.config.annotation.WebMvcConfigurer; +import zipkin2.server.internal.throttle.ZipkinStorageThrottleProperties; import zipkin2.collector.CollectorMetrics; import zipkin2.collector.CollectorSampler; import zipkin2.server.internal.brave.TracingStorageComponent; import zipkin2.storage.InMemoryStorage; import zipkin2.storage.StorageComponent; +import zipkin2.server.internal.throttle.ThrottledStorageComponent; @Configuration @ImportAutoConfiguration(ArmeriaSpringActuatorAutoConfiguration.class) @@ -157,10 +164,47 @@ public Object postProcessAfterInitialization(Object bean, String beanName) { } } + @Configuration + @EnableConfigurationProperties(ZipkinStorageThrottleProperties.class) + @ConditionalOnThrottledStorage + static class ThrottledStorageComponentEnhancer implements BeanPostProcessor, BeanFactoryAware { + + /** + * Need this to resolve cyclic instantiation issue with spring. Mostly, this is for MeterRegistry as really + * bad things happen if you try to Autowire it (loss of JVM metrics) but also using it for properties just to make + * sure no cycles exist at all as a result of turning throttling on. + * + *

Ref: Tracking down cause of Spring's "not eligible for auto-proxying"

+ */ + private BeanFactory beanFactory; + + @Override + public Object postProcessAfterInitialization(Object bean, String beanName) { + if (bean instanceof StorageComponent) { + ZipkinStorageThrottleProperties throttleProperties = beanFactory.getBean(ZipkinStorageThrottleProperties.class); + return new ThrottledStorageComponent((StorageComponent) bean, + beanFactory.getBean(MeterRegistry.class), + throttleProperties.getMinConcurrency(), + throttleProperties.getMaxConcurrency(), + throttleProperties.getMaxQueueSize()); + } + return bean; + } + + @Override + public void setBeanFactory(BeanFactory beanFactory) throws BeansException { + this.beanFactory = beanFactory; + } + } + /** * This is a special-case configuration if there's no StorageComponent of any kind. In-Mem can * supply both read apis, so we add two beans here. + * + *

Note: this needs to be {@link Lazy} to avoid circular dependency issues when using with + * {@link ThrottledStorageComponentEnhancer}. */ + @Lazy @Configuration @Conditional(StorageTypeMemAbsentOrEmpty.class) @ConditionalOnMissingBean(StorageComponent.class) diff --git a/zipkin-server/src/main/java/zipkin2/server/internal/elasticsearch/ZipkinElasticsearchStorageProperties.java b/zipkin-server/src/main/java/zipkin2/server/internal/elasticsearch/ZipkinElasticsearchStorageProperties.java index 8ba574f355b..bd034f17c92 100644 --- a/zipkin-server/src/main/java/zipkin2/server/internal/elasticsearch/ZipkinElasticsearchStorageProperties.java +++ b/zipkin-server/src/main/java/zipkin2/server/internal/elasticsearch/ZipkinElasticsearchStorageProperties.java @@ -23,6 +23,7 @@ import okhttp3.HttpUrl; import okhttp3.OkHttpClient; import okhttp3.logging.HttpLoggingInterceptor; +import org.springframework.beans.factory.annotation.Value; import org.springframework.boot.context.properties.ConfigurationProperties; import zipkin2.elasticsearch.ElasticsearchStorage; @@ -40,8 +41,10 @@ class ZipkinElasticsearchStorageProperties implements Serializable { // for Spar private String index = "zipkin"; /** The date separator used to create the index name. Default to -. */ private String dateSeparator = "-"; - /** Sets maximum in-flight requests from this process to any Elasticsearch host. Defaults to 64 */ + /** Sets maximum in-flight requests from this process to any Elasticsearch host. Defaults to 64 (overriden by throttle settings) */ private int maxRequests = 64; + /** Overrides maximum in-flight requests to match throttling settings if throttling is enabled. */ + private Integer throttleMaxConcurrency; /** Number of shards (horizontal scaling factor) per index. Defaults to 5. */ private int indexShards = 5; /** Number of replicas (redundancy factor) per index. Defaults to 1.` */ @@ -61,6 +64,13 @@ class ZipkinElasticsearchStorageProperties implements Serializable { // for Spar */ private int timeout = 10_000; + public ZipkinElasticsearchStorageProperties(@Value("${zipkin.storage.throttle.enabled:false}") boolean throttleEnabled, + @Value("${zipkin.storage.throttle.maxConcurrency:200}") int throttleMaxConcurrency) { + if (throttleEnabled) { + this.throttleMaxConcurrency = throttleMaxConcurrency; + } + } + public String getPipeline() { return pipeline; } @@ -180,7 +190,7 @@ public ElasticsearchStorage.Builder toBuilder(OkHttpClient client) { .index(index) .dateSeparator(dateSeparator.isEmpty() ? 0 : dateSeparator.charAt(0)) .pipeline(pipeline) - .maxRequests(maxRequests) + .maxRequests(throttleMaxConcurrency == null ? maxRequests : throttleMaxConcurrency) .indexShards(indexShards) .indexReplicas(indexReplicas); } diff --git a/zipkin-server/src/main/java/zipkin2/server/internal/throttle/ActuateThrottleMetrics.java b/zipkin-server/src/main/java/zipkin2/server/internal/throttle/ActuateThrottleMetrics.java new file mode 100644 index 00000000000..acedc2dadaa --- /dev/null +++ b/zipkin-server/src/main/java/zipkin2/server/internal/throttle/ActuateThrottleMetrics.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package zipkin2.server.internal.throttle; + +import com.netflix.concurrency.limits.Limiter; +import com.netflix.concurrency.limits.limiter.AbstractLimiter; +import io.micrometer.core.instrument.Gauge; +import io.micrometer.core.instrument.MeterRegistry; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.ThreadPoolExecutor; + +final class ActuateThrottleMetrics { + private final MeterRegistry registryInstance; + + public ActuateThrottleMetrics(MeterRegistry registryInstance) { + this.registryInstance = registryInstance; + } + + public void bind(ExecutorService executor) { + if(!(executor instanceof ThreadPoolExecutor)){ + return; + } + + ThreadPoolExecutor pool = (ThreadPoolExecutor) executor; + Gauge.builder("zipkin_storage.throttle.concurrency", pool::getCorePoolSize) + .description("number of threads running storage requests") + .register(registryInstance); + Gauge.builder("zipkin_storage.throttle.queue_size", pool.getQueue()::size) + .description("number of items queued waiting for access to storage") + .register(registryInstance); + } + + public void bind(Limiter limiter) { + if(!(limiter instanceof AbstractLimiter)){ + return; + } + + AbstractLimiter abstractLimiter = (AbstractLimiter) limiter; + + // This value should parallel (zipkin_storage.throttle.queue_size + zipkin_storage.throttle.concurrency) + // It is tracked to make sure it doesn't perpetually increase. If it does then we're not resolving LimitListeners. + Gauge.builder("zipkin_storage.throttle.in_flight_requests", abstractLimiter::getInflight) + .description("number of requests the limiter thinks are active") + .register(registryInstance); + } +} diff --git a/zipkin-server/src/main/java/zipkin2/server/internal/throttle/ThrottledCall.java b/zipkin-server/src/main/java/zipkin2/server/internal/throttle/ThrottledCall.java new file mode 100644 index 00000000000..14fcd5f43a7 --- /dev/null +++ b/zipkin-server/src/main/java/zipkin2/server/internal/throttle/ThrottledCall.java @@ -0,0 +1,240 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package zipkin2.server.internal.throttle; + +import com.netflix.concurrency.limits.Limiter; +import com.netflix.concurrency.limits.Limiter.Listener; +import java.io.IOException; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; +import java.util.concurrent.RejectedExecutionException; +import java.util.function.Supplier; +import zipkin2.Call; +import zipkin2.Callback; +import zipkin2.storage.InMemoryStorage; + +/** + * {@link Call} implementation that is backed by an {@link ExecutorService}. The ExecutorService serves two + * purposes: + *

    + *
  1. Limits the number of requests that can run in parallel.
  2. + *
  3. Depending on configuration, can queue up requests to make sure we don't aggressively drop requests that would + * otherwise succeed if given a moment. Bounded queues are safest for this as unbounded ones can lead to heap + * exhaustion and {@link OutOfMemoryError OOM errors}.
  4. + *
+ * + * @see ThrottledStorageComponent + */ +final class ThrottledCall extends Call { + final ExecutorService executor; + final Limiter limiter; + final Listener limitListener; + /** + * Delegate call needs to be supplied later to avoid having it take action when it is created (like + * {@link InMemoryStorage} and thus avoid being throttled. + */ + final Supplier> delegate; + Call call; + volatile boolean canceled; + + public ThrottledCall(ExecutorService executor, Limiter limiter, Supplier> delegate) { + this.executor = executor; + this.limiter = limiter; + this.limitListener = limiter.acquire(null).orElseThrow(RejectedExecutionException::new); + this.delegate = delegate; + } + + private ThrottledCall(ThrottledCall other) { + this(other.executor, other.limiter, other.call == null ? other.delegate : () -> other.call.clone()); + } + + @Override + public V execute() throws IOException { + try { + call = delegate.get(); + + // Make sure we throttle + Future future = executor.submit(() -> { + String oldName = setCurrentThreadName(call.toString()); + try { + return call.execute(); + } finally { + setCurrentThreadName(oldName); + } + }); + V result = future.get(); // Still block for the response + + limitListener.onSuccess(); + return result; + } catch (ExecutionException e) { + Throwable cause = e.getCause(); + if (cause instanceof RejectedExecutionException) { + // Storage rejected us, throttle back + limitListener.onDropped(); + } else { + limitListener.onIgnore(); + } + + if (cause instanceof RuntimeException) { + throw (RuntimeException) cause; + } else if (cause instanceof IOException) { + throw (IOException) cause; + } else { + throw new RuntimeException("Issue while executing on a throttled call", cause); + } + } catch (InterruptedException e) { + limitListener.onIgnore(); + throw new RuntimeException("Interrupted while blocking on a throttled call", e); + } catch (Exception e) { + // Ignoring in all cases here because storage itself isn't saying we need to throttle. Though, we may still be + // write bound, but a drop in concurrency won't necessarily help. + limitListener.onIgnore(); + throw e; + } + } + + @Override + public void enqueue(Callback callback) { + try { + executor.execute(new QueuedCall(callback)); + } catch (Exception e) { + // Ignoring in all cases here because storage itself isn't saying we need to throttle. Though, we may still be + // write bound, but a drop in concurrency won't necessarily help. + limitListener.onIgnore(); + throw e; + } + } + + @Override + public void cancel() { + canceled = true; + if (call != null) { + call.cancel(); + } + } + + @Override + public boolean isCanceled() { + return canceled || (call != null && call.isCanceled()); + } + + @Override + public Call clone() { + return new ThrottledCall<>(this); + } + + /** + * @param name New name for the current Thread + * @return Previous name of the current Thread + */ + static String setCurrentThreadName(String name) { + Thread thread = Thread.currentThread(); + String originalName = thread.getName(); + try { + thread.setName(name); + return originalName; + } catch (SecurityException e) { + return originalName; + } + } + + final class QueuedCall implements Runnable { + final Callback callback; + + public QueuedCall(Callback callback) { + this.callback = callback; + } + + @Override + public void run() { + try { + if (canceled) { + return; + } + + call = ThrottledCall.this.delegate.get(); + + String oldName = setCurrentThreadName(call.toString()); + try { + enqueueAndWait(); + } finally { + setCurrentThreadName(oldName); + } + } catch (Exception e) { + limitListener.onIgnore(); + callback.onError(e); + } + } + + void enqueueAndWait() { + ThrottledCallback throttleCallback = new ThrottledCallback<>(callback, limitListener); + call.enqueue(throttleCallback); + + // Need to wait here since the callback call will run asynchronously also. + // This ensures we don't exceed our throttle/queue limits. + throttleCallback.await(); + } + } + + static final class ThrottledCallback implements Callback { + final Callback delegate; + final Listener limitListener; + final CountDownLatch latch; + + public ThrottledCallback(Callback delegate, Listener limitListener) { + this.delegate = delegate; + this.limitListener = limitListener; + this.latch = new CountDownLatch(1); + } + + public void await() { + try { + latch.await(); + } catch (InterruptedException e) { + limitListener.onIgnore(); + throw new RuntimeException("Interrupted while blocking on a throttled call", e); + } + } + + @Override + public void onSuccess(V value) { + try { + limitListener.onSuccess(); + delegate.onSuccess(value); + } finally { + latch.countDown(); + } + } + + @Override + public void onError(Throwable t) { + try { + if (t instanceof RejectedExecutionException) { + limitListener.onDropped(); + } else { + limitListener.onIgnore(); + } + + delegate.onError(t); + } finally { + latch.countDown(); + } + } + } +} diff --git a/zipkin-server/src/main/java/zipkin2/server/internal/throttle/ThrottledSpanConsumer.java b/zipkin-server/src/main/java/zipkin2/server/internal/throttle/ThrottledSpanConsumer.java new file mode 100644 index 00000000000..ac6f6f3930c --- /dev/null +++ b/zipkin-server/src/main/java/zipkin2/server/internal/throttle/ThrottledSpanConsumer.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package zipkin2.server.internal.throttle; + +import com.netflix.concurrency.limits.Limiter; +import java.util.List; +import java.util.concurrent.ExecutorService; +import zipkin2.Call; +import zipkin2.Span; +import zipkin2.storage.SpanConsumer; + +/** + * Delegating implementation that wraps another {@link SpanConsumer} and ensures that only so many requests + * can get through to it at a given time. + * + * @see ThrottledCall + */ +final class ThrottledSpanConsumer implements SpanConsumer { + final SpanConsumer delegate; + final Limiter limiter; + final ExecutorService executor; + + ThrottledSpanConsumer(SpanConsumer delegate, Limiter limiter, ExecutorService executor) { + this.delegate = delegate; + this.limiter = limiter; + this.executor = executor; + } + + @Override + public Call accept(List spans) { + return new ThrottledCall<>(executor, limiter, () -> delegate.accept(spans)); + } +} diff --git a/zipkin-server/src/main/java/zipkin2/server/internal/throttle/ThrottledStorageComponent.java b/zipkin-server/src/main/java/zipkin2/server/internal/throttle/ThrottledStorageComponent.java new file mode 100644 index 00000000000..e87156e903e --- /dev/null +++ b/zipkin-server/src/main/java/zipkin2/server/internal/throttle/ThrottledStorageComponent.java @@ -0,0 +1,179 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package zipkin2.server.internal.throttle; + +import com.netflix.concurrency.limits.Limit; +import com.netflix.concurrency.limits.limit.Gradient2Limit; +import com.netflix.concurrency.limits.limiter.AbstractLimiter; +import io.micrometer.core.instrument.MeterRegistry; +import java.io.IOException; +import java.util.Objects; +import java.util.Optional; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.function.Consumer; +import zipkin2.storage.SpanConsumer; +import zipkin2.storage.SpanStore; +import zipkin2.storage.StorageComponent; + +/** + * Delegating implementation that limits requests to the {@link #spanConsumer()} of another + * {@link StorageComponent}. The theory here is that this class can be used to: + *
    + *
  • Prevent spamming the storage engine with excessive, spike requests when they come in; thus preserving it's life.
  • + *
  • Optionally act as a buffer so that a fixed number requests can be queued for execution when the throttle allows + * for it. This optional queue must be bounded in order to avoid running out of memory from infinitely queueing.
  • + *
+ * + * @see ThrottledSpanConsumer + */ +public final class ThrottledStorageComponent extends StorageComponent { + final StorageComponent delegate; + final AbstractLimiter limiter; + final ThreadPoolExecutor executor; + + public ThrottledStorageComponent(StorageComponent delegate, + MeterRegistry registry, + int minConcurrency, + int maxConcurrency, + int maxQueueSize) { + this.delegate = Objects.requireNonNull(delegate); + + Limit limit = Gradient2Limit.newBuilder() + .minLimit(minConcurrency) + .initialLimit(minConcurrency) // Limiter will trend towards min until otherwise necessary so may as well start there + .maxConcurrency(maxConcurrency) + .queueSize(0) + .build(); + this.limiter = new Builder().limit(limit).build(); + + this.executor = new ThreadPoolExecutor(limit.getLimit(), + limit.getLimit(), + 0, + TimeUnit.DAYS, + createQueue(maxQueueSize), + new ThottledThreadFactory(), + new ThreadPoolExecutor.AbortPolicy()); + limit.notifyOnChange(new ThreadPoolExecutorResizer(executor)); + + if (registry != null) { + ActuateThrottleMetrics metrics = new ActuateThrottleMetrics(registry); + metrics.bind(executor); + metrics.bind(limiter); + } + } + + @Override + public SpanStore spanStore() { + return delegate.spanStore(); + } + + @Override + public SpanConsumer spanConsumer() { + return new ThrottledSpanConsumer(delegate.spanConsumer(), limiter, executor); + } + + @Override + public void close() throws IOException { + executor.shutdownNow(); + delegate.close(); + } + + private static BlockingQueue createQueue(int maxSize) { + if (maxSize < 0) { + throw new IllegalArgumentException("Invalid max queue size; must be >= 0 but was: " + maxSize); + } + + if (maxSize == 0) { + // 0 means we should be bounded but we can't create a queue with that size so use 1 instead. + maxSize = 1; + } + + return new LinkedBlockingQueue<>(maxSize); + } + + static final class ThottledThreadFactory implements ThreadFactory { + @Override + public Thread newThread(Runnable r) { + Thread thread = new Thread(r); + thread.setDaemon(true); + thread.setName("throttle-pool-" + thread.getId()); + return thread; + } + } + + static final class ThreadPoolExecutorResizer implements Consumer { + final ThreadPoolExecutor executor; + + public ThreadPoolExecutorResizer(ThreadPoolExecutor executor) { + this.executor = executor; + } + + /** + * This is {@code synchronized} to ensure that we don't let the core/max pool sizes get out of sync; even + * for an instant. The two need to be tightly coupled together to ensure that when our queue fills up we don't spin + * up extra Threads beyond our calculated limit. + * + *

There is also an unfortunate aspect where the {@code max} has to always be greater than {@code core} or an + * exception will be thrown. So they have to be adjust appropriately relative to the direction the size is going. + */ + @Override + public synchronized void accept(Integer newValue) { + int previousValue = executor.getCorePoolSize(); + + int newValueInt = newValue; + if (previousValue < newValueInt) { + executor.setMaximumPoolSize(newValueInt); + executor.setCorePoolSize(newValueInt); + } else if (previousValue > newValueInt) { + executor.setCorePoolSize(newValueInt); + executor.setMaximumPoolSize(newValueInt); + } + // Note: no case for equals. Why modify something that doesn't need modified? + } + } + + static final class Builder extends AbstractLimiter.Builder { + public NonLimitingLimiter build() { + return new NonLimitingLimiter(this); + } + + @Override + protected Builder self() { + return this; + } + } + + /** + * Unlike a normal Limiter, this will actually not prevent the creation of a {@link Listener} in + * {@link #acquire(java.lang.Void)}. The point of this is to ensure that we can always derive an appropriate + * {@link Limit#getLimit() Limit} while the {@link #executor} handles actually limiting running requests. + */ + static final class NonLimitingLimiter extends AbstractLimiter { + public NonLimitingLimiter(AbstractLimiter.Builder builder) { + super(builder); + } + + @Override + public Optional acquire(Void context) { + return Optional.of(createListener()); + } + } +} diff --git a/zipkin-server/src/main/java/zipkin2/server/internal/throttle/ZipkinStorageThrottleProperties.java b/zipkin-server/src/main/java/zipkin2/server/internal/throttle/ZipkinStorageThrottleProperties.java new file mode 100644 index 00000000000..d144c7391b9 --- /dev/null +++ b/zipkin-server/src/main/java/zipkin2/server/internal/throttle/ZipkinStorageThrottleProperties.java @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package zipkin2.server.internal.throttle; + +import org.springframework.boot.context.properties.ConfigurationProperties; + +@ConfigurationProperties("zipkin.storage.throttle") +public final class ZipkinStorageThrottleProperties { + /** Should we throttle at all? */ + private boolean enabled; + /** Minimum number of storage requests to allow through at a given time. */ + private int minConcurrency; + /** + * Maximum number of storage requests to allow through at a given time. Should be tuned to (bulk_index_pool_size / num_servers_in_cluster). + * e.g. 200 (default pool size in Elasticsearch) / 2 (number of load balanced zipkin-server instances) = 100. + */ + private int maxConcurrency; + /** + * Maximum number of storage requests to buffer while waiting for open Thread. + * 0 = no buffering. + */ + private int maxQueueSize; + + public boolean isEnabled() { + return enabled; + } + + public void setEnabled(boolean enabled) { + this.enabled = enabled; + } + + public int getMinConcurrency() { + return minConcurrency; + } + + public void setMinConcurrency(int minConcurrency) { + this.minConcurrency = minConcurrency; + } + + public int getMaxConcurrency() { + return maxConcurrency; + } + + public void setMaxConcurrency(int maxConcurrency) { + this.maxConcurrency = maxConcurrency; + } + + public int getMaxQueueSize() { + return maxQueueSize; + } + + public void setMaxQueueSize(int maxQueueSize) { + this.maxQueueSize = maxQueueSize; + } +} diff --git a/zipkin-server/src/main/resources/zipkin-server-shared.yml b/zipkin-server/src/main/resources/zipkin-server-shared.yml index 72b0fe8122f..895360c69b8 100644 --- a/zipkin-server/src/main/resources/zipkin-server-shared.yml +++ b/zipkin-server/src/main/resources/zipkin-server-shared.yml @@ -53,6 +53,11 @@ zipkin: autocomplete-ttl: ${AUTOCOMPLETE_TTL:3600000} autocomplete-cardinality: 20000 type: ${STORAGE_TYPE:mem} + throttle: + enabled: ${STORAGE_THROTTLE_ENABLED:false} + minConcurrency: ${STORAGE_THROTTLE_MIN_CONCURRENCY:10} + maxConcurrency: ${STORAGE_THROTTLE_MAX_CONCURRENCY:200} + maxQueueSize: ${STORAGE_THROTTLE_MAX_QUEUE_SIZE:1000} mem: # Maximum number of spans to keep in memory. When exceeded, oldest traces (and their spans) will be purged. # A safe estimate is 1K of memory per span (each span with 2 annotations + 1 binary annotation), plus diff --git a/zipkin-server/src/test/java/zipkin2/server/internal/elasticsearch/BasicAuthInterceptorTest.java b/zipkin-server/src/test/java/zipkin2/server/internal/elasticsearch/BasicAuthInterceptorTest.java index 443f7ccebfa..3ce16bd6797 100644 --- a/zipkin-server/src/test/java/zipkin2/server/internal/elasticsearch/BasicAuthInterceptorTest.java +++ b/zipkin-server/src/test/java/zipkin2/server/internal/elasticsearch/BasicAuthInterceptorTest.java @@ -37,7 +37,7 @@ public class BasicAuthInterceptorTest { @Before public void beforeEach() { BasicAuthInterceptor interceptor = - new BasicAuthInterceptor(new ZipkinElasticsearchStorageProperties()); + new BasicAuthInterceptor(new ZipkinElasticsearchStorageProperties(false, 0)); client = new OkHttpClient.Builder().addNetworkInterceptor(interceptor).build(); mockWebServer = new MockWebServer(); } diff --git a/zipkin-server/src/test/java/zipkin2/server/internal/throttle/FakeCall.java b/zipkin-server/src/test/java/zipkin2/server/internal/throttle/FakeCall.java new file mode 100644 index 00000000000..6e6d720b5f5 --- /dev/null +++ b/zipkin-server/src/test/java/zipkin2/server/internal/throttle/FakeCall.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package zipkin2.server.internal.throttle; + +import java.io.IOException; +import java.util.concurrent.RejectedExecutionException; +import zipkin2.Call; +import zipkin2.Callback; + +class FakeCall extends Call { + boolean overCapacity = false; + + public void setOverCapacity(boolean isOverCapacity) { + this.overCapacity = isOverCapacity; + } + + @Override + public Void execute() throws IOException { + if (overCapacity) { + throw new RejectedExecutionException(); + } + + return null; + } + + @Override + public void enqueue(Callback callback) { + if (overCapacity) { + callback.onError(new RejectedExecutionException()); + } else { + callback.onSuccess(null); + } + } + + @Override + public void cancel() { + } + + @Override + public boolean isCanceled() { + return false; + } + + @Override + public Call clone() { + return null; + } +} diff --git a/zipkin-server/src/test/java/zipkin2/server/internal/throttle/ThrottledCallTest.java b/zipkin-server/src/test/java/zipkin2/server/internal/throttle/ThrottledCallTest.java new file mode 100644 index 00000000000..de8d29b5efe --- /dev/null +++ b/zipkin-server/src/test/java/zipkin2/server/internal/throttle/ThrottledCallTest.java @@ -0,0 +1,317 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package zipkin2.server.internal.throttle; + +import com.netflix.concurrency.limits.Limiter; +import com.netflix.concurrency.limits.Limiter.Listener; +import com.netflix.concurrency.limits.limit.SettableLimit; +import com.netflix.concurrency.limits.limiter.SimpleLimiter; +import java.io.IOException; +import java.util.Optional; +import java.util.concurrent.Callable; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.Semaphore; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.function.Supplier; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; +import zipkin2.Call; +import zipkin2.Callback; + +public class ThrottledCallTest { + SettableLimit limit; + Limiter limiter; + + @Rule + public ExpectedException expectedException = ExpectedException.none(); + + @Before + public void setup() { + this.limit = SettableLimit.startingAt(0); + this.limiter = SimpleLimiter.newBuilder().limit(limit).build(); + } + + @Test + public void callCreation_isDeferred() throws IOException { + boolean[] created = new boolean[] {false}; + Supplier> delegate = () -> { + created[0] = true; + return Call.create(null); + }; + + ThrottledCall throttle = createThrottle(delegate); + + assertFalse(created[0]); + throttle.execute(); + assertTrue(created[0]); + } + + @Test + public void execute_isThrottled() throws Throwable { + int numThreads = 1; + int queueSize = 1; + int totalTasks = numThreads + queueSize; + + Semaphore startLock = new Semaphore(numThreads); + Semaphore waitLock = new Semaphore(totalTasks); + Semaphore failLock = new Semaphore(1); + Supplier> delegate = () -> new LockedCall(startLock, waitLock); + ThrottledCall throttle = createThrottle(numThreads, queueSize, delegate); + + // Step 1: drain appropriate locks + startLock.drainPermits(); + waitLock.drainPermits(); + failLock.drainPermits(); + + // Step 2: saturate threads and fill queue + ExecutorService backgroundPool = Executors.newCachedThreadPool(); + for (int i = 0; i < totalTasks; i++) { + backgroundPool.submit(throttle::execute); + } + + try { + // Step 3: make sure the threads actually started + startLock.acquire(numThreads); + + // Step 4: submit something beyond our limits + Future future = backgroundPool.submit(() -> { + try { + return throttle.execute(); + } catch (IOException e) { + throw new RuntimeException(e); + } finally { + // Step 6: signal that we tripped the limit + failLock.release(); + } + }); + + // Step 5: wait to make sure our limit actually tripped + failLock.acquire(); + + // Step 7: Expect great things + expectedException.expect(RejectedExecutionException.class); + future.get(); + } catch (ExecutionException e) { + throw e.getCause(); + } finally { + waitLock.release(totalTasks); + startLock.release(totalTasks); + backgroundPool.shutdownNow(); + } + } + + @Test + public void execute_trottlesBack_whenStorageRejects() throws IOException { + Listener listener = mock(Listener.class); + FakeCall call = new FakeCall(); + call.setOverCapacity(true); + + ThrottledCall throttle = new ThrottledCall<>(createPool(1, 1), mockLimiter(listener), () -> call); + try { + throttle.execute(); + fail("No Exception thrown"); + } catch (RejectedExecutionException e) { + verify(listener).onDropped(); + } + } + + @Test + public void execute_ignoresLimit_whenPoolFull() throws IOException { + Listener listener = mock(Listener.class); + + ThrottledCall throttle = new ThrottledCall<>(mockExhuastedPool(), mockLimiter(listener), FakeCall::new); + try { + throttle.execute(); + fail("No Exception thrown"); + } catch (RejectedExecutionException e) { + verify(listener).onIgnore(); + } + } + + @Test + public void enqueue_isThrottled() throws Throwable { + int numThreads = 1; + int queueSize = 1; + int totalTasks = numThreads + queueSize; + + Semaphore startLock = new Semaphore(numThreads); + Semaphore waitLock = new Semaphore(totalTasks); + Supplier> delegate = () -> new LockedCall(startLock, waitLock); + ThrottledCall throttle = createThrottle(numThreads, queueSize, delegate); + + // Step 1: drain appropriate locks + startLock.drainPermits(); + waitLock.drainPermits(); + + // Step 2: saturate threads and fill queue + Callback callback = new NoopCallback(); + for (int i = 0; i < totalTasks; i++) { + throttle.enqueue(callback); + } + + // Step 3: make sure the threads actually started + startLock.acquire(numThreads); + + try { + // Step 4: submit something beyond our limits and make sure it fails + expectedException.expect(RejectedExecutionException.class); + throttle.enqueue(callback); + } catch (Exception e) { + throw e; + } finally { + waitLock.release(totalTasks); + startLock.release(totalTasks); + } + } + + @Test + public void enqueue_trottlesBack_whenStorageRejects() throws IOException, InterruptedException { + Listener listener = mock(Listener.class); + FakeCall call = new FakeCall(); + call.setOverCapacity(true); + + ThrottledCall throttle = new ThrottledCall<>(createPool(1, 1), mockLimiter(listener), () -> call); + CountDownLatch latch = new CountDownLatch(1); + throttle.enqueue(new Callback() { + @Override + public void onSuccess(Void value) { + latch.countDown(); + } + + @Override + public void onError(Throwable t) { + latch.countDown(); + } + }); + + latch.await(1, TimeUnit.MINUTES); + verify(listener).onDropped(); + } + + @Test + public void enqueue_ignoresLimit_whenPoolFull() throws IOException { + Listener listener = mock(Listener.class); + + ThrottledCall throttle = new ThrottledCall<>(mockExhuastedPool(), mockLimiter(listener), FakeCall::new); + try { + throttle.enqueue(null); + fail("No Exception thrown"); + } catch (RejectedExecutionException e) { + verify(listener).onIgnore(); + } + } + + ThrottledCall createThrottle(Supplier> delegate) { + return createThrottle(1, 1, delegate); + } + + ThrottledCall createThrottle(int poolSize, int queueSize, Supplier> delegate) { + limit.setLimit(limit.getLimit() + 1); + return new ThrottledCall<>(createPool(poolSize, queueSize), limiter, delegate); + } + + static ExecutorService createPool(int poolSize, int queueSize) { + return new ThreadPoolExecutor(poolSize, poolSize, 0, TimeUnit.DAYS, new LinkedBlockingQueue<>(queueSize)); + } + + static ExecutorService mockExhuastedPool() { + ExecutorService mock = mock(ExecutorService.class); + doThrow(RejectedExecutionException.class).when(mock).execute(any()); + doThrow(RejectedExecutionException.class).when(mock).submit(any(Callable.class)); + return mock; + } + + static Limiter mockLimiter(Listener listener) { + Limiter mock = mock(Limiter.class); + when(mock.acquire(any())).thenReturn(Optional.of(listener)); + return mock; + } + + static final class LockedCall extends Call { + final Semaphore startLock; + final Semaphore waitLock; + + public LockedCall(Semaphore startLock, Semaphore waitLock) { + this.startLock = startLock; + this.waitLock = waitLock; + } + + @Override + public Void execute() throws IOException { + try { + startLock.release(); + waitLock.acquire(); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + + return null; + } + + @Override + public void enqueue(Callback callback) { + try { + startLock.release(); + waitLock.acquire(); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } + + @Override + public void cancel() { + throw new UnsupportedOperationException("Not supported yet."); + } + + @Override + public boolean isCanceled() { + return false; + } + + @Override + public Call clone() { + throw new UnsupportedOperationException("Not supported yet."); + } + } + + static final class NoopCallback implements Callback { + @Override + public void onSuccess(Void value) { + } + + @Override + public void onError(Throwable t) { + } + } +} diff --git a/zipkin-server/src/test/java/zipkin2/server/internal/throttle/ThrottledStorageComponentTest.java b/zipkin-server/src/test/java/zipkin2/server/internal/throttle/ThrottledStorageComponentTest.java new file mode 100644 index 00000000000..53e5613ece3 --- /dev/null +++ b/zipkin-server/src/test/java/zipkin2/server/internal/throttle/ThrottledStorageComponentTest.java @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package zipkin2.server.internal.throttle; + +import io.micrometer.core.instrument.MeterRegistry; +import io.micrometer.core.instrument.composite.CompositeMeterRegistry; +import static org.junit.Assert.assertSame; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; +import static org.mockito.Mockito.mock; +import zipkin2.storage.StorageComponent; + +public class ThrottledStorageComponentTest { + @Rule + public ExpectedException expectedException = ExpectedException.none(); + + @Test + public void spanConsumer_isProxied() { + StorageComponent delegate = mock(StorageComponent.class); + MeterRegistry registry = new CompositeMeterRegistry(); + + ThrottledStorageComponent throttle = new ThrottledStorageComponent(delegate, registry, 1, 2, 1); + + assertSame(ThrottledSpanConsumer.class, throttle.spanConsumer().getClass()); + } + + @Test + public void createComponent_withoutMeter() { + StorageComponent delegate = mock(StorageComponent.class); + + new ThrottledStorageComponent(delegate, null, 1, 2, 1); + // no exception == pass + } + + @Test + public void createComponent_withZeroSizedQueue() { + StorageComponent delegate = mock(StorageComponent.class); + MeterRegistry registry = new CompositeMeterRegistry(); + + int queueSize = 0; + new ThrottledStorageComponent(delegate, registry, 1, 2, queueSize); + // no exception == pass + } + + @Test + public void createComponent_withNegativeQueue() { + StorageComponent delegate = mock(StorageComponent.class); + MeterRegistry registry = new CompositeMeterRegistry(); + + expectedException.expect(IllegalArgumentException.class); + int queueSize = -1; + new ThrottledStorageComponent(delegate, registry, 1, 2, queueSize); + } +} diff --git a/zipkin-storage/elasticsearch/src/main/java/zipkin2/elasticsearch/internal/client/HttpCall.java b/zipkin-storage/elasticsearch/src/main/java/zipkin2/elasticsearch/internal/client/HttpCall.java index 6c5e4b3aa88..507ad8e7a34 100644 --- a/zipkin-storage/elasticsearch/src/main/java/zipkin2/elasticsearch/internal/client/HttpCall.java +++ b/zipkin-storage/elasticsearch/src/main/java/zipkin2/elasticsearch/internal/client/HttpCall.java @@ -105,6 +105,11 @@ public HttpCall newCall(Request request, BodyConverter bodyConverter) return new HttpCall(call.clone(), semaphore, bodyConverter); } + @Override + public String toString() { + return "HttpCall(" + call + ")"; + } + static class V2CallbackAdapter implements okhttp3.Callback { final Semaphore semaphore; final BodyConverter bodyConverter;