diff --git a/zipkin-collector/core/src/test/java/zipkin2/collector/CollectorTest.java b/zipkin-collector/core/src/test/java/zipkin2/collector/CollectorTest.java index aabcbcb40bc..56f988c2ea1 100644 --- a/zipkin-collector/core/src/test/java/zipkin2/collector/CollectorTest.java +++ b/zipkin-collector/core/src/test/java/zipkin2/collector/CollectorTest.java @@ -31,6 +31,7 @@ import zipkin2.storage.StorageComponent; import static java.util.Arrays.asList; +import java.util.concurrent.RejectedExecutionException; import static org.assertj.core.api.Assertions.assertThat; import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.mock; @@ -186,6 +187,16 @@ public void storeSpansCallback_onErrorWithMessage() { } @Test + public void errorAcceptingSpans_onErrorRejectedExecution() { + RuntimeException error = new RejectedExecutionException("slow down"); + collector.handleStorageError(TRACE, error, callback); + + verify(callback).onError(error); + assertThat(messages) + .containsOnly("Cannot store spans [1, 1, 2, ...] due to RejectedExecutionException(slow down)"); + verify(metrics).incrementSpansDropped(4); + } + public void handleStorageError_onErrorWithNullMessage() { RuntimeException error = new RuntimeException(); collector.handleStorageError(TRACE, error, callback); diff --git a/zipkin-server/README.md b/zipkin-server/README.md index 7e64055cdae..5863d7b0851 100644 --- a/zipkin-server/README.md +++ b/zipkin-server/README.md @@ -157,6 +157,16 @@ Defaults to true * `AUTOCOMPLETE_KEYS`: list of span tag keys which will be returned by the `/api/v2/autocompleteTags` endpoint * `AUTOCOMPLETE_TTL`: How long in milliseconds to suppress calls to write the same autocomplete key/value pair. Default 3600000 (1 hr) +### Throttled Storage (Experimental) +These settings can be used to help tune the rate at which Zipkin flushes data to another, underlying `StorageComponent` (such as Elasticsearch): + + * `STORAGE_THROTTLE_ENABLED`: Enables throttling + * `STORAGE_THROTTLE_MIN_CONCURRENCY`: Minimum number of Threads to use for writing to storage. + * `STORAGE_THROTTLE_MAX_CONCURRENCY`: Maximum number of Threads to use for writing to storage. In order to avoid configuration drift, this value may override other, storage-specific values such as Elasticsearch's `ES_MAX_REQUESTS`. + * `STORAGE_THROTTLE_MAX_QUEUE_SIZE`: How many messages to buffer while all Threads are writing data before abandoning a message (0 = no buffering). + +As this feature is experimental, it is not recommended to run this in production environments. + ### Cassandra Storage Zipkin's [Cassandra storage component](../zipkin-storage/cassandra) supports version 3.11+ and applies when `STORAGE_TYPE` is set to `cassandra3`: diff --git a/zipkin-server/pom.xml b/zipkin-server/pom.xml index f17dd8954ab..a456ddda0e2 100644 --- a/zipkin-server/pom.xml +++ b/zipkin-server/pom.xml @@ -250,6 +250,17 @@ ${micrometer.version} + + com.netflix.concurrency-limits + concurrency-limits-core + 0.2.2 + + + io.micrometer + micrometer-core + ${micrometer.version} + + io.zipkin.brave @@ -299,6 +310,11 @@ 2.4.0 test + + org.mockito + mockito-core + test + @@ -372,7 +388,6 @@ ${kotlin.version} ${main.java.version} - enable diff --git a/zipkin-server/src/main/java/zipkin2/server/internal/ConditionalOnThrottledStorage.java b/zipkin-server/src/main/java/zipkin2/server/internal/ConditionalOnThrottledStorage.java new file mode 100644 index 00000000000..0d7cb4e1845 --- /dev/null +++ b/zipkin-server/src/main/java/zipkin2/server/internal/ConditionalOnThrottledStorage.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package zipkin2.server.internal; + +import java.lang.annotation.ElementType; +import java.lang.annotation.Retention; +import java.lang.annotation.RetentionPolicy; +import java.lang.annotation.Target; +import org.springframework.boot.autoconfigure.condition.ConditionOutcome; +import org.springframework.boot.autoconfigure.condition.SpringBootCondition; +import org.springframework.context.annotation.ConditionContext; +import org.springframework.context.annotation.Conditional; +import org.springframework.core.type.AnnotatedTypeMetadata; + +@Conditional(ConditionalOnThrottledStorage.ThrottledStorageCondition.class) +@Retention(RetentionPolicy.RUNTIME) +@Target({ElementType.TYPE, ElementType.METHOD}) +@interface ConditionalOnThrottledStorage { + class ThrottledStorageCondition extends SpringBootCondition { + @Override + public ConditionOutcome getMatchOutcome(ConditionContext context, AnnotatedTypeMetadata a) { + String throttleEnabled = context.getEnvironment() + .getProperty("zipkin.storage.throttle.enabled"); + + if (!Boolean.valueOf(throttleEnabled)) { + return ConditionOutcome.noMatch("zipkin.storage.throttle.enabled isn't true"); + } + + return ConditionOutcome.match(); + } + } +} diff --git a/zipkin-server/src/main/java/zipkin2/server/internal/ZipkinServerConfiguration.java b/zipkin-server/src/main/java/zipkin2/server/internal/ZipkinServerConfiguration.java index cb381a7f338..b3d4b90ae3a 100644 --- a/zipkin-server/src/main/java/zipkin2/server/internal/ZipkinServerConfiguration.java +++ b/zipkin-server/src/main/java/zipkin2/server/internal/ZipkinServerConfiguration.java @@ -26,6 +26,9 @@ import io.micrometer.core.instrument.MeterRegistry; import io.micrometer.core.instrument.config.MeterFilter; import java.util.List; +import org.springframework.beans.BeansException; +import org.springframework.beans.factory.BeanFactory; +import org.springframework.beans.factory.BeanFactoryAware; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.beans.factory.config.BeanPostProcessor; @@ -33,20 +36,24 @@ import org.springframework.boot.actuate.health.HealthAggregator; import org.springframework.boot.autoconfigure.ImportAutoConfiguration; import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean; +import org.springframework.boot.context.properties.EnableConfigurationProperties; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Condition; import org.springframework.context.annotation.ConditionContext; import org.springframework.context.annotation.Conditional; import org.springframework.context.annotation.Configuration; +import org.springframework.context.annotation.Lazy; import org.springframework.core.annotation.Order; import org.springframework.core.type.AnnotatedTypeMetadata; import org.springframework.web.servlet.config.annotation.ViewControllerRegistry; import org.springframework.web.servlet.config.annotation.WebMvcConfigurer; +import zipkin2.server.internal.throttle.ZipkinStorageThrottleProperties; import zipkin2.collector.CollectorMetrics; import zipkin2.collector.CollectorSampler; import zipkin2.server.internal.brave.TracingStorageComponent; import zipkin2.storage.InMemoryStorage; import zipkin2.storage.StorageComponent; +import zipkin2.server.internal.throttle.ThrottledStorageComponent; @Configuration @ImportAutoConfiguration(ArmeriaSpringActuatorAutoConfiguration.class) @@ -157,10 +164,47 @@ public Object postProcessAfterInitialization(Object bean, String beanName) { } } + @Configuration + @EnableConfigurationProperties(ZipkinStorageThrottleProperties.class) + @ConditionalOnThrottledStorage + static class ThrottledStorageComponentEnhancer implements BeanPostProcessor, BeanFactoryAware { + + /** + * Need this to resolve cyclic instantiation issue with spring. Mostly, this is for MeterRegistry as really + * bad things happen if you try to Autowire it (loss of JVM metrics) but also using it for properties just to make + * sure no cycles exist at all as a result of turning throttling on. + * + *

Ref: Tracking down cause of Spring's "not eligible for auto-proxying"

+ */ + private BeanFactory beanFactory; + + @Override + public Object postProcessAfterInitialization(Object bean, String beanName) { + if (bean instanceof StorageComponent) { + ZipkinStorageThrottleProperties throttleProperties = beanFactory.getBean(ZipkinStorageThrottleProperties.class); + return new ThrottledStorageComponent((StorageComponent) bean, + beanFactory.getBean(MeterRegistry.class), + throttleProperties.getMinConcurrency(), + throttleProperties.getMaxConcurrency(), + throttleProperties.getMaxQueueSize()); + } + return bean; + } + + @Override + public void setBeanFactory(BeanFactory beanFactory) throws BeansException { + this.beanFactory = beanFactory; + } + } + /** * This is a special-case configuration if there's no StorageComponent of any kind. In-Mem can * supply both read apis, so we add two beans here. + * + *

Note: this needs to be {@link Lazy} to avoid circular dependency issues when using with + * {@link ThrottledStorageComponentEnhancer}. */ + @Lazy @Configuration @Conditional(StorageTypeMemAbsentOrEmpty.class) @ConditionalOnMissingBean(StorageComponent.class) diff --git a/zipkin-server/src/main/java/zipkin2/server/internal/elasticsearch/ZipkinElasticsearchStorageProperties.java b/zipkin-server/src/main/java/zipkin2/server/internal/elasticsearch/ZipkinElasticsearchStorageProperties.java index 8ba574f355b..a2fb4505ef8 100644 --- a/zipkin-server/src/main/java/zipkin2/server/internal/elasticsearch/ZipkinElasticsearchStorageProperties.java +++ b/zipkin-server/src/main/java/zipkin2/server/internal/elasticsearch/ZipkinElasticsearchStorageProperties.java @@ -23,6 +23,7 @@ import okhttp3.HttpUrl; import okhttp3.OkHttpClient; import okhttp3.logging.HttpLoggingInterceptor; +import org.springframework.beans.factory.annotation.Value; import org.springframework.boot.context.properties.ConfigurationProperties; import zipkin2.elasticsearch.ElasticsearchStorage; @@ -40,8 +41,10 @@ class ZipkinElasticsearchStorageProperties implements Serializable { // for Spar private String index = "zipkin"; /** The date separator used to create the index name. Default to -. */ private String dateSeparator = "-"; - /** Sets maximum in-flight requests from this process to any Elasticsearch host. Defaults to 64 */ + /** Sets maximum in-flight requests from this process to any Elasticsearch host. Defaults to 64 (overriden by throttle settings) */ private int maxRequests = 64; + /** Overrides maximum in-flight requests to match throttling settings if throttling is enabled. */ + private Integer throttleMaxConcurrency; /** Number of shards (horizontal scaling factor) per index. Defaults to 5. */ private int indexShards = 5; /** Number of replicas (redundancy factor) per index. Defaults to 1.` */ @@ -61,6 +64,14 @@ class ZipkinElasticsearchStorageProperties implements Serializable { // for Spar */ private int timeout = 10_000; + ZipkinElasticsearchStorageProperties( + @Value("${zipkin.storage.throttle.enabled:false}") boolean throttleEnabled, + @Value("${zipkin.storage.throttle.maxConcurrency:200}") int throttleMaxConcurrency) { + if (throttleEnabled) { + this.throttleMaxConcurrency = throttleMaxConcurrency; + } + } + public String getPipeline() { return pipeline; } @@ -180,7 +191,7 @@ public ElasticsearchStorage.Builder toBuilder(OkHttpClient client) { .index(index) .dateSeparator(dateSeparator.isEmpty() ? 0 : dateSeparator.charAt(0)) .pipeline(pipeline) - .maxRequests(maxRequests) + .maxRequests(throttleMaxConcurrency == null ? maxRequests : throttleMaxConcurrency) .indexShards(indexShards) .indexReplicas(indexReplicas); } diff --git a/zipkin-server/src/main/java/zipkin2/server/internal/throttle/ActuateThrottleMetrics.java b/zipkin-server/src/main/java/zipkin2/server/internal/throttle/ActuateThrottleMetrics.java new file mode 100644 index 00000000000..55e7c3656d3 --- /dev/null +++ b/zipkin-server/src/main/java/zipkin2/server/internal/throttle/ActuateThrottleMetrics.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package zipkin2.server.internal.throttle; + +import com.netflix.concurrency.limits.limiter.AbstractLimiter; +import io.micrometer.core.instrument.Gauge; +import io.micrometer.core.instrument.MeterRegistry; +import java.util.concurrent.ThreadPoolExecutor; +import zipkin2.server.internal.ActuateCollectorMetrics; + +/** Follows the same naming convention as {@link ActuateCollectorMetrics} */ +final class ActuateThrottleMetrics { + final MeterRegistry registryInstance; + + ActuateThrottleMetrics(MeterRegistry registryInstance) { + this.registryInstance = registryInstance; + } + + void bind(ThreadPoolExecutor pool) { + Gauge.builder("zipkin_storage.throttle.concurrency", pool::getCorePoolSize) + .description("number of threads running storage requests") + .register(registryInstance); + Gauge.builder("zipkin_storage.throttle.queue_size", pool.getQueue()::size) + .description("number of items queued waiting for access to storage") + .register(registryInstance); + } + + void bind(AbstractLimiter limiter) { + // This value should parallel (zipkin_storage.throttle.queue_size + zipkin_storage.throttle.concurrency) + // It is tracked to make sure it doesn't perpetually increase. If it does then we're not resolving LimitListeners. + Gauge.builder("zipkin_storage.throttle.in_flight_requests", limiter::getInflight) + .description("number of requests the limiter thinks are active") + .register(registryInstance); + } +} diff --git a/zipkin-server/src/main/java/zipkin2/server/internal/throttle/ThrottledCall.java b/zipkin-server/src/main/java/zipkin2/server/internal/throttle/ThrottledCall.java new file mode 100644 index 00000000000..f43d61ea719 --- /dev/null +++ b/zipkin-server/src/main/java/zipkin2/server/internal/throttle/ThrottledCall.java @@ -0,0 +1,233 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package zipkin2.server.internal.throttle; + +import com.netflix.concurrency.limits.Limiter; +import com.netflix.concurrency.limits.Limiter.Listener; +import java.io.IOException; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; +import java.util.concurrent.RejectedExecutionException; +import java.util.function.Supplier; +import zipkin2.Call; +import zipkin2.Callback; +import zipkin2.storage.InMemoryStorage; + +/** + * {@link Call} implementation that is backed by an {@link ExecutorService}. The ExecutorService + * serves two purposes: + *

    + *
  1. Limits the number of requests that can run in parallel.
  2. + *
  3. Depending on configuration, can queue up requests to make sure we don't aggressively drop + * requests that would otherwise succeed if given a moment. Bounded queues are safest for this as + * unbounded ones can lead to heap exhaustion and {@link OutOfMemoryError OOM errors}.
  4. + *
+ * + * @see ThrottledStorageComponent + */ +final class ThrottledCall extends Call { + final ExecutorService executor; + final Limiter limiter; + final Listener limitListener; + /** + * supplier call needs to be supplied later to avoid having it take action when it is created + * (like {@link InMemoryStorage} and thus avoid being throttled. + */ + final Supplier> supplier; + volatile Call delegate; + volatile boolean canceled; + + public ThrottledCall(ExecutorService executor, Limiter limiter, + Supplier> supplier) { + this.executor = executor; + this.limiter = limiter; + this.limitListener = limiter.acquire(null).orElseThrow(RejectedExecutionException::new); + this.supplier = supplier; + } + + // TODO: refactor this when in-memory no longer executes storage ops during assembly time + ThrottledCall(ThrottledCall other) { + this(other.executor, other.limiter, + other.delegate == null ? other.supplier : () -> other.delegate.clone()); + } + + // TODO: we cannot currently extend Call.Base as tests execute the call multiple times, + // which is invalid as calls are one-shot. It isn't worth refactoring until we refactor out + // the need for assembly time throttling (fix to in-memory storage) + @Override public V execute() throws IOException { + try { + delegate = supplier.get(); + + // Make sure we throttle + Future future = executor.submit(() -> { + String oldName = setCurrentThreadName(delegate.toString()); + try { + return delegate.execute(); + } finally { + setCurrentThreadName(oldName); + } + }); + V result = future.get(); // Still block for the response + + limitListener.onSuccess(); + return result; + } catch (ExecutionException e) { + Throwable cause = e.getCause(); + if (cause instanceof RejectedExecutionException) { + // Storage rejected us, throttle back + limitListener.onDropped(); + } else { + limitListener.onIgnore(); + } + + if (cause instanceof RuntimeException) { + throw (RuntimeException) cause; + } else if (cause instanceof IOException) { + throw (IOException) cause; + } else { + throw new RuntimeException("Issue while executing on a throttled call", cause); + } + } catch (InterruptedException e) { + limitListener.onIgnore(); + throw new RuntimeException("Interrupted while blocking on a throttled call", e); + } catch (RuntimeException | Error e) { + propagateIfFatal(e); + // Ignoring in all cases here because storage itself isn't saying we need to throttle. Though, we may still be + // write bound, but a drop in concurrency won't necessarily help. + limitListener.onIgnore(); + throw e; + } + } + + @Override public void enqueue(Callback callback) { + try { + executor.execute(new QueuedCall(callback)); + } catch (RuntimeException | Error e) { + propagateIfFatal(e); + // Ignoring in all cases here because storage itself isn't saying we need to throttle. Though, we may still be + // write bound, but a drop in concurrency won't necessarily help. + limitListener.onIgnore(); + throw e; + } + } + + @Override public void cancel() { + canceled = true; + if (delegate != null) delegate.cancel(); + } + + @Override public boolean isCanceled() { + return canceled || (delegate != null && delegate.isCanceled()); + } + + @Override public Call clone() { + return new ThrottledCall<>(this); + } + + @Override public String toString() { + return "Throttled" + supplier; + } + + static String setCurrentThreadName(String name) { + Thread thread = Thread.currentThread(); + String originalName = thread.getName(); + thread.setName(name); + return originalName; + } + + final class QueuedCall implements Runnable { + final Callback callback; + + QueuedCall(Callback callback) { + this.callback = callback; + } + + @Override public void run() { + try { + if (isCanceled()) return; + + delegate = ThrottledCall.this.supplier.get(); + + String oldName = setCurrentThreadName(delegate.toString()); + try { + enqueueAndWait(); + } finally { + setCurrentThreadName(oldName); + } + } catch (RuntimeException | Error e) { + propagateIfFatal(e); + limitListener.onIgnore(); + callback.onError(e); + } + } + + void enqueueAndWait() { + ThrottledCallback throttleCallback = new ThrottledCallback<>(callback, limitListener); + delegate.enqueue(throttleCallback); + + // Need to wait here since the callback call will run asynchronously also. + // This ensures we don't exceed our throttle/queue limits. + throttleCallback.await(); + } + } + + static final class ThrottledCallback implements Callback { + final Callback supplier; + final Listener limitListener; + final CountDownLatch latch = new CountDownLatch(1); + + ThrottledCallback(Callback supplier, Listener limitListener) { + this.supplier = supplier; + this.limitListener = limitListener; + } + + void await() { + try { + latch.await(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + limitListener.onIgnore(); + throw new RuntimeException("Interrupted while blocking on a throttled call", e); + } + } + + @Override public void onSuccess(V value) { + try { + limitListener.onSuccess(); + supplier.onSuccess(value); + } finally { + latch.countDown(); + } + } + + @Override public void onError(Throwable t) { + try { + if (t instanceof RejectedExecutionException) { + limitListener.onDropped(); + } else { + limitListener.onIgnore(); + } + + supplier.onError(t); + } finally { + latch.countDown(); + } + } + } +} diff --git a/zipkin-server/src/main/java/zipkin2/server/internal/throttle/ThrottledStorageComponent.java b/zipkin-server/src/main/java/zipkin2/server/internal/throttle/ThrottledStorageComponent.java new file mode 100644 index 00000000000..91e7b78ea4b --- /dev/null +++ b/zipkin-server/src/main/java/zipkin2/server/internal/throttle/ThrottledStorageComponent.java @@ -0,0 +1,203 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package zipkin2.server.internal.throttle; + +import com.netflix.concurrency.limits.Limit; +import com.netflix.concurrency.limits.Limiter; +import com.netflix.concurrency.limits.limit.Gradient2Limit; +import com.netflix.concurrency.limits.limiter.AbstractLimiter; +import io.micrometer.core.instrument.MeterRegistry; +import java.io.IOException; +import java.util.List; +import java.util.Objects; +import java.util.Optional; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.function.Consumer; +import zipkin2.Call; +import zipkin2.Span; +import zipkin2.storage.SpanConsumer; +import zipkin2.storage.SpanStore; +import zipkin2.storage.StorageComponent; + +/** + * Delegating implementation that limits requests to the {@link #spanConsumer()} of another {@link + * StorageComponent}. The theory here is that this class can be used to: + *
    + *
  • Prevent spamming the storage engine with excessive, spike requests when they come in; thus + * preserving it's life.
  • + *
  • Optionally act as a buffer so that a fixed number requests can be queued for execution when + * the throttle allows for it. This optional queue must be bounded in order to avoid running out of + * memory from infinitely queueing.
  • + *
+ * + * @see ThrottledSpanConsumer + */ +public final class ThrottledStorageComponent extends StorageComponent { + final StorageComponent delegate; + final AbstractLimiter limiter; + final ThreadPoolExecutor executor; + + public ThrottledStorageComponent(StorageComponent delegate, MeterRegistry registry, + int minConcurrency, + int maxConcurrency, + int maxQueueSize) { + this.delegate = Objects.requireNonNull(delegate); + + Limit limit = Gradient2Limit.newBuilder() + .minLimit(minConcurrency) + .initialLimit( + minConcurrency) // Limiter will trend towards min until otherwise necessary so may as well start there + .maxConcurrency(maxConcurrency) + .queueSize(0) + .build(); + this.limiter = new Builder().limit(limit).build(); + + // TODO: explain these parameters + this.executor = new ThreadPoolExecutor(limit.getLimit(), + limit.getLimit(), + 0, + TimeUnit.DAYS, + createQueue(maxQueueSize), + new ThottledThreadFactory(), + new ThreadPoolExecutor.AbortPolicy()); + + limit.notifyOnChange(new ThreadPoolExecutorResizer(executor)); + + ActuateThrottleMetrics metrics = new ActuateThrottleMetrics(registry); + metrics.bind(executor); + metrics.bind(limiter); + } + + @Override public SpanStore spanStore() { + return delegate.spanStore(); + } + + @Override public SpanConsumer spanConsumer() { + return new ThrottledSpanConsumer(delegate.spanConsumer(), limiter, executor); + } + + @Override public void close() throws IOException { + executor.shutdownNow(); + delegate.close(); + } + + @Override public String toString() { + return "Throttled" + delegate; + } + + final class ThrottledSpanConsumer implements SpanConsumer { + final SpanConsumer delegate; + final Limiter limiter; + final ExecutorService executor; + + ThrottledSpanConsumer(SpanConsumer delegate, Limiter limiter, ExecutorService executor) { + this.delegate = delegate; + this.limiter = limiter; + this.executor = executor; + } + + @Override public Call accept(List spans) { + return new ThrottledCall<>(executor, limiter, () -> delegate.accept(spans)); + } + + @Override public String toString() { + return "Throttled" + delegate; + } + } + + static BlockingQueue createQueue(int maxSize) { + if (maxSize < 0) throw new IllegalArgumentException("maxSize < 0"); + + if (maxSize == 0) { + // 0 means we should be bounded but we can't create a queue with that size so use 1 instead. + maxSize = 1; + } + + return new LinkedBlockingQueue<>(maxSize); + } + + static final class ThottledThreadFactory implements ThreadFactory { + @Override public Thread newThread(Runnable r) { + Thread thread = new Thread(r); + thread.setDaemon(true); + thread.setName("zipkin-throttle-pool-" + thread.getId()); + return thread; + } + } + + static final class ThreadPoolExecutorResizer implements Consumer { + final ThreadPoolExecutor executor; + + ThreadPoolExecutorResizer(ThreadPoolExecutor executor) { + this.executor = executor; + } + + /** + * This is {@code synchronized} to ensure that we don't let the core/max pool sizes get out of + * sync; even for an instant. The two need to be tightly coupled together to ensure that when + * our queue fills up we don't spin up extra Threads beyond our calculated limit. + * + *

There is also an unfortunate aspect where the {@code max} has to always be greater than + * {@code core} or an exception will be thrown. So they have to be adjust appropriately + * relative to the direction the size is going. + */ + @Override public synchronized void accept(Integer newValue) { + int previousValue = executor.getCorePoolSize(); + + int newValueInt = newValue; + if (previousValue < newValueInt) { + executor.setMaximumPoolSize(newValueInt); + executor.setCorePoolSize(newValueInt); + } else if (previousValue > newValueInt) { + executor.setCorePoolSize(newValueInt); + executor.setMaximumPoolSize(newValueInt); + } + // Note: no case for equals. Why modify something that doesn't need modified? + } + } + + static final class Builder extends AbstractLimiter.Builder { + NonLimitingLimiter build() { + return new NonLimitingLimiter(this); + } + + @Override protected Builder self() { + return this; + } + } + + /** + * Unlike a normal Limiter, this will actually not prevent the creation of a {@link Listener} in + * {@link #acquire(java.lang.Void)}. The point of this is to ensure that we can always derive an + * appropriate {@link Limit#getLimit() Limit} while the {@link #executor} handles actually + * limiting running requests. + */ + static final class NonLimitingLimiter extends AbstractLimiter { + NonLimitingLimiter(AbstractLimiter.Builder builder) { + super(builder); + } + + @Override public Optional acquire(Void context) { + return Optional.of(createListener()); + } + } +} diff --git a/zipkin-server/src/main/java/zipkin2/server/internal/throttle/ZipkinStorageThrottleProperties.java b/zipkin-server/src/main/java/zipkin2/server/internal/throttle/ZipkinStorageThrottleProperties.java new file mode 100644 index 00000000000..fd344db2c25 --- /dev/null +++ b/zipkin-server/src/main/java/zipkin2/server/internal/throttle/ZipkinStorageThrottleProperties.java @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package zipkin2.server.internal.throttle; + +import org.springframework.boot.context.properties.ConfigurationProperties; + +@ConfigurationProperties("zipkin.storage.throttle") +public final class ZipkinStorageThrottleProperties { + /** Should we throttle at all? */ + private boolean enabled; + /** Minimum number of storage requests to allow through at a given time. */ + private int minConcurrency; + /** + * Maximum number of storage requests to allow through at a given time. Should be tuned to + * (bulk_index_pool_size / num_servers_in_cluster). e.g. 200 (default pool size in Elasticsearch) + * / 2 (number of load balanced zipkin-server instances) = 100. + */ + private int maxConcurrency; + /** + * Maximum number of storage requests to buffer while waiting for open Thread. 0 = no buffering. + */ + private int maxQueueSize; + + public boolean isEnabled() { + return enabled; + } + + public void setEnabled(boolean enabled) { + this.enabled = enabled; + } + + public int getMinConcurrency() { + return minConcurrency; + } + + public void setMinConcurrency(int minConcurrency) { + this.minConcurrency = minConcurrency; + } + + public int getMaxConcurrency() { + return maxConcurrency; + } + + public void setMaxConcurrency(int maxConcurrency) { + this.maxConcurrency = maxConcurrency; + } + + public int getMaxQueueSize() { + return maxQueueSize; + } + + public void setMaxQueueSize(int maxQueueSize) { + this.maxQueueSize = maxQueueSize; + } +} diff --git a/zipkin-server/src/main/resources/zipkin-server-shared.yml b/zipkin-server/src/main/resources/zipkin-server-shared.yml index 72b0fe8122f..009e3a0504c 100644 --- a/zipkin-server/src/main/resources/zipkin-server-shared.yml +++ b/zipkin-server/src/main/resources/zipkin-server-shared.yml @@ -53,6 +53,11 @@ zipkin: autocomplete-ttl: ${AUTOCOMPLETE_TTL:3600000} autocomplete-cardinality: 20000 type: ${STORAGE_TYPE:mem} + throttle: + enabled: ${STORAGE_THROTTLE_ENABLED:false} + min-concurrency: ${STORAGE_THROTTLE_MIN_CONCURRENCY:10} + max-concurrency: ${STORAGE_THROTTLE_MAX_CONCURRENCY:200} + max-queue-size: ${STORAGE_THROTTLE_MAX_QUEUE_SIZE:1000} mem: # Maximum number of spans to keep in memory. When exceeded, oldest traces (and their spans) will be purged. # A safe estimate is 1K of memory per span (each span with 2 annotations + 1 binary annotation), plus diff --git a/zipkin-server/src/test/kotlin/zipkin2/server/internal/elasticsearch/BasicAuthInterceptorTest.kt b/zipkin-server/src/test/kotlin/zipkin2/server/internal/elasticsearch/BasicAuthInterceptorTest.kt index 40ec38c8a55..6a5f5f3a8f2 100644 --- a/zipkin-server/src/test/kotlin/zipkin2/server/internal/elasticsearch/BasicAuthInterceptorTest.kt +++ b/zipkin-server/src/test/kotlin/zipkin2/server/internal/elasticsearch/BasicAuthInterceptorTest.kt @@ -29,7 +29,7 @@ class BasicAuthInterceptorTest { @Rule @JvmField val thrown: ExpectedException = ExpectedException.none() var client: OkHttpClient = OkHttpClient.Builder() - .addNetworkInterceptor(BasicAuthInterceptor(ZipkinElasticsearchStorageProperties())) + .addNetworkInterceptor(BasicAuthInterceptor(ZipkinElasticsearchStorageProperties(false, 0))) .build() @Test fun intercept_whenESReturns403AndJsonBody_throwsWithResponseBodyMessage() { diff --git a/zipkin-server/src/test/kotlin/zipkin2/server/internal/throttle/ThrottledCallTest.kt b/zipkin-server/src/test/kotlin/zipkin2/server/internal/throttle/ThrottledCallTest.kt new file mode 100644 index 00000000000..00eb02f6330 --- /dev/null +++ b/zipkin-server/src/test/kotlin/zipkin2/server/internal/throttle/ThrottledCallTest.kt @@ -0,0 +1,289 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package zipkin2.server.internal.throttle + +import com.netflix.concurrency.limits.Limiter +import com.netflix.concurrency.limits.Limiter.Listener +import com.netflix.concurrency.limits.limit.SettableLimit +import com.netflix.concurrency.limits.limiter.SimpleLimiter +import org.assertj.core.api.Assertions.assertThat +import org.junit.Test +import org.mockito.ArgumentMatchers.any +import org.mockito.Mockito +import org.mockito.Mockito.`when` +import org.mockito.Mockito.doThrow +import org.mockito.Mockito.verify +import zipkin2.Call +import zipkin2.Callback +import java.io.IOException +import java.util.Optional +import java.util.concurrent.Callable +import java.util.concurrent.CountDownLatch +import java.util.concurrent.ExecutionException +import java.util.concurrent.ExecutorService +import java.util.concurrent.Executors +import java.util.concurrent.LinkedBlockingQueue +import java.util.concurrent.RejectedExecutionException +import java.util.concurrent.Semaphore +import java.util.concurrent.ThreadPoolExecutor +import java.util.concurrent.TimeUnit +import java.util.function.Supplier + +// TODO: this class re-uses Call objects which is bad as they are one-shot. This needs to be +// refactored in order to be realistic (calls throw if re-invoked, as clone() is the correct way) +class ThrottledCallTest { + var limit = SettableLimit.startingAt(0) + var limiter = SimpleLimiter.newBuilder().limit(limit).build() + + inline fun mock() = Mockito.mock(T::class.java) + + @Test fun callCreation_isDeferred() { + val created = booleanArrayOf(false) + + val throttle = createThrottle(Supplier { + created[0] = true + Call.create(null) + }) + + assertThat(created).contains(false) + throttle.execute() + assertThat(created).contains(true) + } + + @Test fun execute_isThrottled() { + val numThreads = 1 + val queueSize = 1 + val totalTasks = numThreads + queueSize + + val startLock = Semaphore(numThreads) + val waitLock = Semaphore(totalTasks) + val failLock = Semaphore(1) + val throttle = + createThrottle(numThreads, queueSize, Supplier { LockedCall(startLock, waitLock) }) + + // Step 1: drain appropriate locks + startLock.drainPermits() + waitLock.drainPermits() + failLock.drainPermits() + + // Step 2: saturate threads and fill queue + val backgroundPool = Executors.newCachedThreadPool() + for (i in 0 until totalTasks) { + backgroundPool.submit(Callable { throttle.execute() }) + } + + try { + // Step 3: make sure the threads actually started + startLock.acquire(numThreads) + + // Step 4: submit something beyond our limits + val future = backgroundPool.submit(Callable { + try { + throttle.execute() + } catch (e: IOException) { + throw RuntimeException(e) + } finally { + // Step 6: signal that we tripped the limit + failLock.release() + } + }) + + // Step 5: wait to make sure our limit actually tripped + failLock.acquire() + + future.get() + + // Step 7: Expect great things + assertThat(true).isFalse() // should raise a RejectedExecutionException + } catch (t: Throwable) { + assertThat(t) + .isInstanceOf(ExecutionException::class.java) // from future.get + .hasCauseInstanceOf(RejectedExecutionException::class.java) + } finally { + waitLock.release(totalTasks) + startLock.release(totalTasks) + backgroundPool.shutdownNow() + } + } + + @Test fun execute_trottlesBack_whenStorageRejects() { + val listener: Listener = mock() + val call = FakeCall() + call.overCapacity = true + + val throttle = ThrottledCall(createPool(1, 1), mockLimiter(listener), Supplier { call }) + try { + throttle.execute() + assertThat(true).isFalse() // should raise a RejectedExecutionException + } catch (e: RejectedExecutionException) { + verify(listener).onDropped() + } + } + + @Test fun execute_ignoresLimit_whenPoolFull() { + val listener: Listener = mock() + + val throttle = + ThrottledCall(mockExhaustedPool(), mockLimiter(listener), Supplier { FakeCall() }) + try { + throttle.execute() + assertThat(true).isFalse() // should raise a RejectedExecutionException + } catch (e: RejectedExecutionException) { + verify(listener).onIgnore() + } + } + + @Test fun enqueue_isThrottled() { + val numThreads = 1 + val queueSize = 1 + val totalTasks = numThreads + queueSize + + val startLock = Semaphore(numThreads) + val waitLock = Semaphore(totalTasks) + val throttle = + createThrottle(numThreads, queueSize, Supplier { LockedCall(startLock, waitLock) }) + + // Step 1: drain appropriate locks + startLock.drainPermits() + waitLock.drainPermits() + + // Step 2: saturate threads and fill queue + val callback: Callback = mock() + for (i in 0 until totalTasks) { + throttle.enqueue(callback) + } + + // Step 3: make sure the threads actually started + startLock.acquire(numThreads) + + try { + // Step 4: submit something beyond our limits and make sure it fails + throttle.enqueue(callback) + + assertThat(true).isFalse() // should raise a RejectedExecutionException + } catch (e: RejectedExecutionException) { + } finally { + waitLock.release(totalTasks) + startLock.release(totalTasks) + } + } + + @Test fun enqueue_throttlesBack_whenStorageRejects() { + val listener: Listener = mock() + val call = FakeCall() + call.overCapacity = true + + val throttle = ThrottledCall(createPool(1, 1), mockLimiter(listener), Supplier { call }) + val latch = CountDownLatch(1) + throttle.enqueue(object : Callback { + override fun onSuccess(value: Void) { + latch.countDown() + } + + override fun onError(t: Throwable) { + latch.countDown() + } + }) + + latch.await(1, TimeUnit.MINUTES) + verify(listener).onDropped() + } + + @Test fun enqueue_ignoresLimit_whenPoolFull() { + val listener: Listener = mock() + + val throttle = + ThrottledCall(mockExhaustedPool(), mockLimiter(listener), Supplier { FakeCall() }) + try { + throttle.enqueue(null) + assertThat(true).isFalse() // should raise a RejectedExecutionException + } catch (e: RejectedExecutionException) { + verify(listener).onIgnore() + } + } + + private fun createThrottle(delegate: Supplier>): ThrottledCall { + return createThrottle(1, 1, delegate) + } + + private fun createThrottle( + poolSize: Int, + queueSize: Int, + delegate: Supplier> + ): ThrottledCall { + limit.setLimit(limit.getLimit() + 1) + return ThrottledCall(createPool(poolSize, queueSize), limiter, delegate) + } + + private class LockedCall(val startLock: Semaphore, val waitLock: Semaphore) : Call.Base() { + override fun doExecute(): Void? { + try { + startLock.release() + waitLock.acquire() + return null; + } catch (e: InterruptedException) { + Thread.currentThread().interrupt() + throw AssertionError(e) + } + } + + override fun doEnqueue(callback: Callback) { + try { + callback.onSuccess(doExecute()) + } catch (t: Throwable) { + callback.onError(t) + } + } + + override fun clone() = LockedCall(startLock, waitLock); + } + + private fun createPool(poolSize: Int, queueSize: Int): ExecutorService { + return ThreadPoolExecutor(poolSize, poolSize, 0, TimeUnit.DAYS, + LinkedBlockingQueue(queueSize)) + } + + private fun mockExhaustedPool(): ExecutorService { + val mock: ExecutorService = mock() + doThrow(RejectedExecutionException::class.java).`when`(mock).execute(any()) + doThrow(RejectedExecutionException::class.java).`when`(mock).submit(any(Callable::class.java)) + return mock + } + + private fun mockLimiter(listener: Listener): Limiter { + val mock: Limiter = mock() + `when`(mock.acquire(any())).thenReturn(Optional.of(listener)) + return mock + } + + private class FakeCall(var overCapacity: Boolean = false) : Call.Base() { + override fun doExecute(): Void? { + if (overCapacity) throw RejectedExecutionException() + return null + } + + override fun doEnqueue(callback: Callback) { + if (overCapacity) { + callback.onError(RejectedExecutionException()) + } else { + callback.onSuccess(null) + } + } + + override fun clone() = FakeCall() + } +} diff --git a/zipkin-server/src/test/kotlin/zipkin2/server/internal/throttle/ThrottledStorageComponentTest.kt b/zipkin-server/src/test/kotlin/zipkin2/server/internal/throttle/ThrottledStorageComponentTest.kt new file mode 100644 index 00000000000..705967a6f77 --- /dev/null +++ b/zipkin-server/src/test/kotlin/zipkin2/server/internal/throttle/ThrottledStorageComponentTest.kt @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package zipkin2.server.internal.throttle + +import com.linecorp.armeria.common.metric.NoopMeterRegistry +import org.assertj.core.api.Assertions.assertThat +import org.junit.Test +import zipkin2.storage.InMemoryStorage + +class ThrottledStorageComponentTest { + val delegate = InMemoryStorage.newBuilder().build() + val registry = NoopMeterRegistry.get() + + @Test fun spanConsumer_isProxied() { + val throttle = ThrottledStorageComponent(delegate, registry, 1, 2, 1) + + assertThat(throttle.spanConsumer().accept(listOf())) + .isInstanceOf(ThrottledCall::class.java) + } + + @Test fun createComponent_withZeroSizedQueue() { + val queueSize = 0 + ThrottledStorageComponent(delegate, registry, 1, 2, queueSize) + // no exception == pass + } + + @Test(expected = IllegalArgumentException::class) + fun createComponent_withNegativeQueue() { + val queueSize = -1 + ThrottledStorageComponent(delegate, registry, 1, 2, queueSize) + } + + @Test fun niceToString() { + assertThat(ThrottledStorageComponent(delegate, registry, 1, 2, 1)) + .hasToString("ThrottledInMemoryStorage{traceCount=0}"); + } +} diff --git a/zipkin-storage/elasticsearch/src/main/java/zipkin2/elasticsearch/internal/client/HttpCall.java b/zipkin-storage/elasticsearch/src/main/java/zipkin2/elasticsearch/internal/client/HttpCall.java index 6c5e4b3aa88..507ad8e7a34 100644 --- a/zipkin-storage/elasticsearch/src/main/java/zipkin2/elasticsearch/internal/client/HttpCall.java +++ b/zipkin-storage/elasticsearch/src/main/java/zipkin2/elasticsearch/internal/client/HttpCall.java @@ -105,6 +105,11 @@ public HttpCall newCall(Request request, BodyConverter bodyConverter) return new HttpCall(call.clone(), semaphore, bodyConverter); } + @Override + public String toString() { + return "HttpCall(" + call + ")"; + } + static class V2CallbackAdapter implements okhttp3.Callback { final Semaphore semaphore; final BodyConverter bodyConverter; diff --git a/zipkin/src/main/java/zipkin2/storage/InMemoryStorage.java b/zipkin/src/main/java/zipkin2/storage/InMemoryStorage.java index 3057940ea5a..ce722387db5 100644 --- a/zipkin/src/main/java/zipkin2/storage/InMemoryStorage.java +++ b/zipkin/src/main/java/zipkin2/storage/InMemoryStorage.java @@ -574,4 +574,8 @@ public int hashCode() { return h$; } } + + @Override public String toString() { + return "InMemoryStorage{traceCount=" + traceIdToTraceIdTimeStamps.size() + "}"; + } }