diff --git a/README.md b/README.md index 5737cb8d2c3..b008ea13b97 100644 --- a/README.md +++ b/README.md @@ -92,6 +92,9 @@ It stores spans as json and has been designed for larger scale. Note: This store requires a [spark job](https://github.com/openzipkin/zipkin-dependencies) to aggregate dependency links. +### Throttling +As part of a [Collector surge and error handling](https://cwiki.apache.org/confluence/display/ZIPKIN/Collector+surge+and+error+handling) discussion that took place a throttling mechanism was added to allow more fine-grained control over how Zipkin interacts with the various `StorageComponents`. In particular, for those installations which use a push-based Collector (such as the HTTP rest API), enabling the throttle can allow Zipkin to buffer some messages in order to avoid aggressively dropping them. See [zipkin-server](zipkin-server#throttled-storage) for configuration information. + ### Disabling search The following API endpoints provide search features, and are enabled by default. Search primarily allows the trace list screen of the UI operate. diff --git a/zipkin-collector/core/src/main/java/zipkin2/collector/Collector.java b/zipkin-collector/core/src/main/java/zipkin2/collector/Collector.java index 1f468869650..ae5728da616 100644 --- a/zipkin-collector/core/src/main/java/zipkin2/collector/Collector.java +++ b/zipkin-collector/core/src/main/java/zipkin2/collector/Collector.java @@ -230,7 +230,8 @@ RuntimeException doError(String message, Throwable e) { } if (e instanceof RejectedExecutionException) { - // This can indicate to a higher layer that we need to slow down ingestion + // No need to wrap in this instance. Wrapping could also be detrimental to higher layers + // where we expect certain Exceptions. return (RejectedExecutionException) e; } else { return new RuntimeException(message, e); diff --git a/zipkin-collector/core/src/test/java/zipkin2/collector/CollectorTest.java b/zipkin-collector/core/src/test/java/zipkin2/collector/CollectorTest.java index 3f82c865c2a..ae4eb5c1ac8 100644 --- a/zipkin-collector/core/src/test/java/zipkin2/collector/CollectorTest.java +++ b/zipkin-collector/core/src/test/java/zipkin2/collector/CollectorTest.java @@ -25,8 +25,10 @@ import zipkin2.storage.StorageComponent; import static java.util.Arrays.asList; +import java.util.concurrent.RejectedExecutionException; import static org.assertj.core.api.Assertions.assertThat; -import static org.mockito.ArgumentMatchers.*; +import static org.mockito.ArgumentMatchers.contains; +import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.spy; import static org.mockito.Mockito.verify; @@ -133,6 +135,14 @@ public void errorAcceptingSpans_onErrorWithMessage() { assertThat(message).contains("due to IllegalArgumentException(no beer)"); } + @Test + public void errorAcceptingSpans_onErrorRejectedExecution() { + RuntimeException expected = new RejectedExecutionException("slow down"); + RuntimeException result = collector.errorStoringSpans(asList(CLIENT_SPAN), expected); + + assertThat(result).isInstanceOf(RejectedExecutionException.class); + } + @Test public void errorDecoding_onErrorWithNullMessage() { String message = collector.errorReading(new RuntimeException()).getMessage(); diff --git a/zipkin-server/README.md b/zipkin-server/README.md index 5b18374162f..c2dcac9650b 100644 --- a/zipkin-server/README.md +++ b/zipkin-server/README.md @@ -157,6 +157,14 @@ Defaults to true * `AUTOCOMPLETE_KEYS`: list of span tag keys which will be returned by the `/api/v2/autocompleteTags` endpoint * `AUTOCOMPLETE_TTL`: How long in milliseconds to suppress calls to write the same autocomplete key/value pair. Default 3600000 (1 hr) +### Throttled Storage +These settings can be used to help tune the rate at which Zipkin flushes data to another, underlying `StorageComponent` (such as Elasticsearch): + + * `STORAGE_THROTTLE_ENABLED`: Enables throttling + * `STORAGE_THROTTLE_MIN_CONCURRENCY`: Minimum number of Threads to use for writing to storage. + * `STORAGE_THROTTLE_MAX_CONCURRENCY`: Maximum number of Threads to use for writing to storage. In order to avoid configuration drift, this value may override other, storage-specific values such as Elasticsearch's `ES_MAX_REQUESTS`. + * `STORAGE_THROTTLE_MAX_QUEUE_SIZE`: How many messages to buffer while all Threads are writing data before abandoning a message (0 = no buffering). + ### Cassandra Storage Zipkin's [Cassandra storage component](../zipkin-storage/cassandra) supports version 3.11+ and applies when `STORAGE_TYPE` is set to `cassandra3`: diff --git a/zipkin-server/src/main/java/zipkin2/server/internal/ConditionalOnThrottledStorage.java b/zipkin-server/src/main/java/zipkin2/server/internal/ConditionalOnThrottledStorage.java index b09e1ca43b5..0d7cb4e1845 100644 --- a/zipkin-server/src/main/java/zipkin2/server/internal/ConditionalOnThrottledStorage.java +++ b/zipkin-server/src/main/java/zipkin2/server/internal/ConditionalOnThrottledStorage.java @@ -29,7 +29,7 @@ @Conditional(ConditionalOnThrottledStorage.ThrottledStorageCondition.class) @Retention(RetentionPolicy.RUNTIME) @Target({ElementType.TYPE, ElementType.METHOD}) -public @interface ConditionalOnThrottledStorage { +@interface ConditionalOnThrottledStorage { class ThrottledStorageCondition extends SpringBootCondition { @Override public ConditionOutcome getMatchOutcome(ConditionContext context, AnnotatedTypeMetadata a) { diff --git a/zipkin-server/src/main/java/zipkin2/server/internal/elasticsearch/ZipkinElasticsearchStorageProperties.java b/zipkin-server/src/main/java/zipkin2/server/internal/elasticsearch/ZipkinElasticsearchStorageProperties.java index a3b0f111e97..bd034f17c92 100644 --- a/zipkin-server/src/main/java/zipkin2/server/internal/elasticsearch/ZipkinElasticsearchStorageProperties.java +++ b/zipkin-server/src/main/java/zipkin2/server/internal/elasticsearch/ZipkinElasticsearchStorageProperties.java @@ -43,7 +43,7 @@ class ZipkinElasticsearchStorageProperties implements Serializable { // for Spar private String dateSeparator = "-"; /** Sets maximum in-flight requests from this process to any Elasticsearch host. Defaults to 64 (overriden by throttle settings) */ private int maxRequests = 64; - /** Overrdies maximum in-flight requests to match throttling settings if throttling is enabled. */ + /** Overrides maximum in-flight requests to match throttling settings if throttling is enabled. */ private Integer throttleMaxConcurrency; /** Number of shards (horizontal scaling factor) per index. Defaults to 5. */ private int indexShards = 5; diff --git a/zipkin-server/src/main/java/zipkin2/server/internal/throttle/ActuateThrottleMetrics.java b/zipkin-server/src/main/java/zipkin2/server/internal/throttle/ActuateThrottleMetrics.java index b3569a80049..acedc2dadaa 100644 --- a/zipkin-server/src/main/java/zipkin2/server/internal/throttle/ActuateThrottleMetrics.java +++ b/zipkin-server/src/main/java/zipkin2/server/internal/throttle/ActuateThrottleMetrics.java @@ -23,7 +23,7 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.ThreadPoolExecutor; -class ActuateThrottleMetrics { +final class ActuateThrottleMetrics { private final MeterRegistry registryInstance; public ActuateThrottleMetrics(MeterRegistry registryInstance) { diff --git a/zipkin-server/src/main/java/zipkin2/server/internal/throttle/ThrottledCall.java b/zipkin-server/src/main/java/zipkin2/server/internal/throttle/ThrottledCall.java index 05b868911de..14fcd5f43a7 100644 --- a/zipkin-server/src/main/java/zipkin2/server/internal/throttle/ThrottledCall.java +++ b/zipkin-server/src/main/java/zipkin2/server/internal/throttle/ThrottledCall.java @@ -41,17 +41,17 @@ * * @see ThrottledStorageComponent */ -class ThrottledCall extends Call { - private final ExecutorService executor; - private final Limiter limiter; - private final Listener limitListener; +final class ThrottledCall extends Call { + final ExecutorService executor; + final Limiter limiter; + final Listener limitListener; /** * Delegate call needs to be supplied later to avoid having it take action when it is created (like * {@link InMemoryStorage} and thus avoid being throttled. */ - private final Supplier> delegate; - private Call call; - private boolean canceled; + final Supplier> delegate; + Call call; + volatile boolean canceled; public ThrottledCall(ExecutorService executor, Limiter limiter, Supplier> delegate) { this.executor = executor; @@ -71,8 +71,11 @@ public V execute() throws IOException { // Make sure we throttle Future future = executor.submit(() -> { - try (AutoCloseable nameReverter = updateThreadName(call.toString())) { + String oldName = setCurrentThreadName(call.toString()); + try { return call.execute(); + } finally { + setCurrentThreadName(oldName); } }); V result = future.get(); // Still block for the response @@ -88,48 +91,33 @@ public V execute() throws IOException { limitListener.onIgnore(); } - throw new ThrottleException(cause); - } catch (RuntimeException e) { + if (cause instanceof RuntimeException) { + throw (RuntimeException) cause; + } else if (cause instanceof IOException) { + throw (IOException) cause; + } else { + throw new RuntimeException("Issue while executing on a throttled call", cause); + } + } catch (InterruptedException e) { limitListener.onIgnore(); - throw e; // E.g. RejectedExecutionException + throw new RuntimeException("Interrupted while blocking on a throttled call", e); } catch (Exception e) { + // Ignoring in all cases here because storage itself isn't saying we need to throttle. Though, we may still be + // write bound, but a drop in concurrency won't necessarily help. limitListener.onIgnore(); - throw new ThrottleException(e); + throw e; } } @Override public void enqueue(Callback callback) { try { - executor.execute(() -> { - if (canceled) { - return; - } - - call = delegate.get(); - - // Not using try-with-resources to avoid catching exceptions that occur anywhere outside of close() - AutoCloseable nameReverter = updateThreadName(call.toString()); - try { - ThrottledCallback throttleCallback = new ThrottledCallback<>(callback, limitListener); - call.enqueue(throttleCallback); - - // Need to wait here since the delegate call will run asynchronously also. - // This ensures we don't exceed our throttle/queue limits. - throttleCallback.await(); - } finally { - try { - nameReverter.close(); - } catch (Exception e) { - // swallow - } - } - }); + executor.execute(new QueuedCall(callback)); } catch (Exception e) { // Ignoring in all cases here because storage itself isn't saying we need to throttle. Though, we may still be - // write bound, but a drop in concurrency won't neccessarily help. + // write bound, but a drop in concurrency won't necessarily help. limitListener.onIgnore(); - throw e; // E.g. RejectedExecutionException + throw e; } } @@ -151,21 +139,63 @@ public Call clone() { return new ThrottledCall<>(this); } - private static AutoCloseable updateThreadName(String name) { + /** + * @param name New name for the current Thread + * @return Previous name of the current Thread + */ + static String setCurrentThreadName(String name) { + Thread thread = Thread.currentThread(); + String originalName = thread.getName(); try { - Thread thread = Thread.currentThread(); - String originalName = thread.getName(); thread.setName(name); - return () -> thread.setName(originalName); + return originalName; } catch (SecurityException e) { - return () -> {}; + return originalName; + } + } + + final class QueuedCall implements Runnable { + final Callback callback; + + public QueuedCall(Callback callback) { + this.callback = callback; + } + + @Override + public void run() { + try { + if (canceled) { + return; + } + + call = ThrottledCall.this.delegate.get(); + + String oldName = setCurrentThreadName(call.toString()); + try { + enqueueAndWait(); + } finally { + setCurrentThreadName(oldName); + } + } catch (Exception e) { + limitListener.onIgnore(); + callback.onError(e); + } + } + + void enqueueAndWait() { + ThrottledCallback throttleCallback = new ThrottledCallback<>(callback, limitListener); + call.enqueue(throttleCallback); + + // Need to wait here since the callback call will run asynchronously also. + // This ensures we don't exceed our throttle/queue limits. + throttleCallback.await(); } } - private static class ThrottledCallback implements Callback { - private final Callback delegate; - private final Listener limitListener; - private final CountDownLatch latch; + static final class ThrottledCallback implements Callback { + final Callback delegate; + final Listener limitListener; + final CountDownLatch latch; public ThrottledCallback(Callback delegate, Listener limitListener) { this.delegate = delegate; @@ -176,9 +206,9 @@ public ThrottledCallback(Callback delegate, Listener limitListener) { public void await() { try { latch.await(); - } catch (InterruptedException ex) { + } catch (InterruptedException e) { limitListener.onIgnore(); - throw new ThrottleException(ex); + throw new RuntimeException("Interrupted while blocking on a throttled call", e); } } @@ -195,17 +225,16 @@ public void onSuccess(V value) { @Override public void onError(Throwable t) { try { - limitListener.onDropped(); + if (t instanceof RejectedExecutionException) { + limitListener.onDropped(); + } else { + limitListener.onIgnore(); + } + delegate.onError(t); } finally { latch.countDown(); } } } - - public static class ThrottleException extends RuntimeException { - public ThrottleException(Throwable cause) { - super(cause); - } - } } diff --git a/zipkin-server/src/main/java/zipkin2/server/internal/throttle/ThrottledSpanConsumer.java b/zipkin-server/src/main/java/zipkin2/server/internal/throttle/ThrottledSpanConsumer.java index 84ac83526ec..ac6f6f3930c 100644 --- a/zipkin-server/src/main/java/zipkin2/server/internal/throttle/ThrottledSpanConsumer.java +++ b/zipkin-server/src/main/java/zipkin2/server/internal/throttle/ThrottledSpanConsumer.java @@ -29,10 +29,10 @@ * * @see ThrottledCall */ -class ThrottledSpanConsumer implements SpanConsumer { - private final SpanConsumer delegate; - private final Limiter limiter; - private final ExecutorService executor; +final class ThrottledSpanConsumer implements SpanConsumer { + final SpanConsumer delegate; + final Limiter limiter; + final ExecutorService executor; ThrottledSpanConsumer(SpanConsumer delegate, Limiter limiter, ExecutorService executor) { this.delegate = delegate; diff --git a/zipkin-server/src/main/java/zipkin2/server/internal/throttle/ThrottledStorageComponent.java b/zipkin-server/src/main/java/zipkin2/server/internal/throttle/ThrottledStorageComponent.java index 7f753c2cbb6..e87156e903e 100644 --- a/zipkin-server/src/main/java/zipkin2/server/internal/throttle/ThrottledStorageComponent.java +++ b/zipkin-server/src/main/java/zipkin2/server/internal/throttle/ThrottledStorageComponent.java @@ -44,10 +44,10 @@ * * @see ThrottledSpanConsumer */ -public class ThrottledStorageComponent extends StorageComponent { - private final StorageComponent delegate; - private final AbstractLimiter limiter; - private final ThreadPoolExecutor executor; +public final class ThrottledStorageComponent extends StorageComponent { + final StorageComponent delegate; + final AbstractLimiter limiter; + final ThreadPoolExecutor executor; public ThrottledStorageComponent(StorageComponent delegate, MeterRegistry registry, @@ -57,10 +57,11 @@ public ThrottledStorageComponent(StorageComponent delegate, this.delegate = Objects.requireNonNull(delegate); Limit limit = Gradient2Limit.newBuilder() - .minLimit(minConcurrency) - .initialLimit(minConcurrency) // Limiter will trend towards min until otherwise necessary so may as well start there - .maxConcurrency(maxConcurrency) - .build(); + .minLimit(minConcurrency) + .initialLimit(minConcurrency) // Limiter will trend towards min until otherwise necessary so may as well start there + .maxConcurrency(maxConcurrency) + .queueSize(0) + .build(); this.limiter = new Builder().limit(limit).build(); this.executor = new ThreadPoolExecutor(limit.getLimit(), @@ -70,7 +71,7 @@ public ThrottledStorageComponent(StorageComponent delegate, createQueue(maxQueueSize), new ThottledThreadFactory(), new ThreadPoolExecutor.AbortPolicy()); - limit.notifyOnChange(new PoolSizeConsumer(executor)); + limit.notifyOnChange(new ThreadPoolExecutorResizer(executor)); if (registry != null) { ActuateThrottleMetrics metrics = new ActuateThrottleMetrics(registry); @@ -108,7 +109,7 @@ private static BlockingQueue createQueue(int maxSize) { return new LinkedBlockingQueue<>(maxSize); } - private static class ThottledThreadFactory implements ThreadFactory { + static final class ThottledThreadFactory implements ThreadFactory { @Override public Thread newThread(Runnable r) { Thread thread = new Thread(r); @@ -118,10 +119,10 @@ public Thread newThread(Runnable r) { } } - private static class PoolSizeConsumer implements Consumer { - private final ThreadPoolExecutor executor; + static final class ThreadPoolExecutorResizer implements Consumer { + final ThreadPoolExecutor executor; - public PoolSizeConsumer(ThreadPoolExecutor executor) { + public ThreadPoolExecutorResizer(ThreadPoolExecutor executor) { this.executor = executor; } @@ -149,7 +150,7 @@ public synchronized void accept(Integer newValue) { } } - private static class Builder extends AbstractLimiter.Builder { + static final class Builder extends AbstractLimiter.Builder { public NonLimitingLimiter build() { return new NonLimitingLimiter(this); } @@ -165,7 +166,7 @@ protected Builder self() { * {@link #acquire(java.lang.Void)}. The point of this is to ensure that we can always derive an appropriate * {@link Limit#getLimit() Limit} while the {@link #executor} handles actually limiting running requests. */ - private static class NonLimitingLimiter extends AbstractLimiter { + static final class NonLimitingLimiter extends AbstractLimiter { public NonLimitingLimiter(AbstractLimiter.Builder builder) { super(builder); } diff --git a/zipkin-server/src/main/java/zipkin2/server/internal/throttle/ZipkinStorageThrottleProperties.java b/zipkin-server/src/main/java/zipkin2/server/internal/throttle/ZipkinStorageThrottleProperties.java index 37fb562a2d8..d144c7391b9 100644 --- a/zipkin-server/src/main/java/zipkin2/server/internal/throttle/ZipkinStorageThrottleProperties.java +++ b/zipkin-server/src/main/java/zipkin2/server/internal/throttle/ZipkinStorageThrottleProperties.java @@ -19,16 +19,20 @@ import org.springframework.boot.context.properties.ConfigurationProperties; @ConfigurationProperties("zipkin.storage.throttle") -public class ZipkinStorageThrottleProperties { +public final class ZipkinStorageThrottleProperties { /** Should we throttle at all? */ private boolean enabled; /** Minimum number of storage requests to allow through at a given time. */ private int minConcurrency; - /** Maximum number of storage requests to allow through at a given time. */ + /** + * Maximum number of storage requests to allow through at a given time. Should be tuned to (bulk_index_pool_size / num_servers_in_cluster). + * e.g. 200 (default pool size in Elasticsearch) / 2 (number of load balanced zipkin-server instances) = 100. + */ private int maxConcurrency; /** * Maximum number of storage requests to buffer while waiting for open Thread. - * 0 = no buffering. */ + * 0 = no buffering. + */ private int maxQueueSize; public boolean isEnabled() { diff --git a/zipkin-server/src/main/resources/zipkin-server-shared.yml b/zipkin-server/src/main/resources/zipkin-server-shared.yml index 620120f441b..d0a93296d2c 100644 --- a/zipkin-server/src/main/resources/zipkin-server-shared.yml +++ b/zipkin-server/src/main/resources/zipkin-server-shared.yml @@ -51,10 +51,10 @@ zipkin: autocomplete-cardinality: 20000 type: ${STORAGE_TYPE:mem} throttle: - enabled: ${THROTTLE_ENABLED:false} - minConcurrency: ${THROTTLE_MIN_CONCURRENCY:10} - maxConcurrency: ${THROTTLE_MAX_CONCURRENCY:200} - maxQueueSize: ${THROTTLE_MAX_QUEUE_SIZE:1000} + enabled: ${STORAGE_THROTTLE_ENABLED:false} + minConcurrency: ${STORAGE_THROTTLE_MIN_CONCURRENCY:10} + maxConcurrency: ${STORAGE_THROTTLE_MAX_CONCURRENCY:200} + maxQueueSize: ${STORAGE_THROTTLE_MAX_QUEUE_SIZE:1000} mem: # Maximum number of spans to keep in memory. When exceeded, oldest traces (and their spans) will be purged. # A safe estimate is 1K of memory per span (each span with 2 annotations + 1 binary annotation), plus diff --git a/zipkin-server/src/test/java/zipkin2/server/internal/throttle/FakeCall.java b/zipkin-server/src/test/java/zipkin2/server/internal/throttle/FakeCall.java new file mode 100644 index 00000000000..6e6d720b5f5 --- /dev/null +++ b/zipkin-server/src/test/java/zipkin2/server/internal/throttle/FakeCall.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package zipkin2.server.internal.throttle; + +import java.io.IOException; +import java.util.concurrent.RejectedExecutionException; +import zipkin2.Call; +import zipkin2.Callback; + +class FakeCall extends Call { + boolean overCapacity = false; + + public void setOverCapacity(boolean isOverCapacity) { + this.overCapacity = isOverCapacity; + } + + @Override + public Void execute() throws IOException { + if (overCapacity) { + throw new RejectedExecutionException(); + } + + return null; + } + + @Override + public void enqueue(Callback callback) { + if (overCapacity) { + callback.onError(new RejectedExecutionException()); + } else { + callback.onSuccess(null); + } + } + + @Override + public void cancel() { + } + + @Override + public boolean isCanceled() { + return false; + } + + @Override + public Call clone() { + return null; + } +} diff --git a/zipkin-server/src/test/java/zipkin2/server/internal/throttle/ThrottledCallTest.java b/zipkin-server/src/test/java/zipkin2/server/internal/throttle/ThrottledCallTest.java index 5655e79f08e..de8d29b5efe 100644 --- a/zipkin-server/src/test/java/zipkin2/server/internal/throttle/ThrottledCallTest.java +++ b/zipkin-server/src/test/java/zipkin2/server/internal/throttle/ThrottledCallTest.java @@ -17,9 +17,13 @@ package zipkin2.server.internal.throttle; import com.netflix.concurrency.limits.Limiter; +import com.netflix.concurrency.limits.Limiter.Listener; import com.netflix.concurrency.limits.limit.SettableLimit; import com.netflix.concurrency.limits.limiter.SimpleLimiter; import java.io.IOException; +import java.util.Optional; +import java.util.concurrent.Callable; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -30,17 +34,24 @@ import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.function.Supplier; -import static org.junit.Assert.*; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; import org.junit.Before; import org.junit.Rule; import org.junit.Test; import org.junit.rules.ExpectedException; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; import zipkin2.Call; import zipkin2.Callback; public class ThrottledCallTest { - private SettableLimit limit; - private Limiter limiter; + SettableLimit limit; + Limiter limiter; @Rule public ExpectedException expectedException = ExpectedException.none(); @@ -59,7 +70,7 @@ public void callCreation_isDeferred() throws IOException { return Call.create(null); }; - ThrottledCall throttle = createThrottle(delegate); + ThrottledCall throttle = createThrottle(delegate); assertFalse(created[0]); throttle.execute(); @@ -76,7 +87,7 @@ public void execute_isThrottled() throws Throwable { Semaphore waitLock = new Semaphore(totalTasks); Semaphore failLock = new Semaphore(1); Supplier> delegate = () -> new LockedCall(startLock, waitLock); - ThrottledCall throttle = createThrottle(numThreads, queueSize, delegate); + ThrottledCall throttle = createThrottle(numThreads, queueSize, delegate); // Step 1: drain appropriate locks startLock.drainPermits(); @@ -120,6 +131,34 @@ public void execute_isThrottled() throws Throwable { } } + @Test + public void execute_trottlesBack_whenStorageRejects() throws IOException { + Listener listener = mock(Listener.class); + FakeCall call = new FakeCall(); + call.setOverCapacity(true); + + ThrottledCall throttle = new ThrottledCall<>(createPool(1, 1), mockLimiter(listener), () -> call); + try { + throttle.execute(); + fail("No Exception thrown"); + } catch (RejectedExecutionException e) { + verify(listener).onDropped(); + } + } + + @Test + public void execute_ignoresLimit_whenPoolFull() throws IOException { + Listener listener = mock(Listener.class); + + ThrottledCall throttle = new ThrottledCall<>(mockExhuastedPool(), mockLimiter(listener), FakeCall::new); + try { + throttle.execute(); + fail("No Exception thrown"); + } catch (RejectedExecutionException e) { + verify(listener).onIgnore(); + } + } + @Test public void enqueue_isThrottled() throws Throwable { int numThreads = 1; @@ -129,7 +168,7 @@ public void enqueue_isThrottled() throws Throwable { Semaphore startLock = new Semaphore(numThreads); Semaphore waitLock = new Semaphore(totalTasks); Supplier> delegate = () -> new LockedCall(startLock, waitLock); - ThrottledCall throttle = createThrottle(numThreads, queueSize, delegate); + ThrottledCall throttle = createThrottle(numThreads, queueSize, delegate); // Step 1: drain appropriate locks startLock.drainPermits(); @@ -156,22 +195,72 @@ public void enqueue_isThrottled() throws Throwable { } } - private ThrottledCall createThrottle(Supplier> delegate) { + @Test + public void enqueue_trottlesBack_whenStorageRejects() throws IOException, InterruptedException { + Listener listener = mock(Listener.class); + FakeCall call = new FakeCall(); + call.setOverCapacity(true); + + ThrottledCall throttle = new ThrottledCall<>(createPool(1, 1), mockLimiter(listener), () -> call); + CountDownLatch latch = new CountDownLatch(1); + throttle.enqueue(new Callback() { + @Override + public void onSuccess(Void value) { + latch.countDown(); + } + + @Override + public void onError(Throwable t) { + latch.countDown(); + } + }); + + latch.await(1, TimeUnit.MINUTES); + verify(listener).onDropped(); + } + + @Test + public void enqueue_ignoresLimit_whenPoolFull() throws IOException { + Listener listener = mock(Listener.class); + + ThrottledCall throttle = new ThrottledCall<>(mockExhuastedPool(), mockLimiter(listener), FakeCall::new); + try { + throttle.enqueue(null); + fail("No Exception thrown"); + } catch (RejectedExecutionException e) { + verify(listener).onIgnore(); + } + } + + ThrottledCall createThrottle(Supplier> delegate) { return createThrottle(1, 1, delegate); } - private ThrottledCall createThrottle(int poolSize, int queueSize, Supplier> delegate) { + ThrottledCall createThrottle(int poolSize, int queueSize, Supplier> delegate) { limit.setLimit(limit.getLimit() + 1); - return new ThrottledCall(createPool(poolSize, queueSize), limiter, delegate); + return new ThrottledCall<>(createPool(poolSize, queueSize), limiter, delegate); } - private static ExecutorService createPool(int poolSize, int queueSize) { + static ExecutorService createPool(int poolSize, int queueSize) { return new ThreadPoolExecutor(poolSize, poolSize, 0, TimeUnit.DAYS, new LinkedBlockingQueue<>(queueSize)); } - private static class LockedCall extends Call { - private final Semaphore startLock; - private final Semaphore waitLock; + static ExecutorService mockExhuastedPool() { + ExecutorService mock = mock(ExecutorService.class); + doThrow(RejectedExecutionException.class).when(mock).execute(any()); + doThrow(RejectedExecutionException.class).when(mock).submit(any(Callable.class)); + return mock; + } + + static Limiter mockLimiter(Listener listener) { + Limiter mock = mock(Limiter.class); + when(mock.acquire(any())).thenReturn(Optional.of(listener)); + return mock; + } + + static final class LockedCall extends Call { + final Semaphore startLock; + final Semaphore waitLock; public LockedCall(Semaphore startLock, Semaphore waitLock) { this.startLock = startLock; @@ -216,7 +305,7 @@ public Call clone() { } } - private static class NoopCallback implements Callback { + static final class NoopCallback implements Callback { @Override public void onSuccess(Void value) { } diff --git a/zipkin-server/src/test/java/zipkin2/server/internal/throttle/ThrottledStorageComponentTest.java b/zipkin-server/src/test/java/zipkin2/server/internal/throttle/ThrottledStorageComponentTest.java index c596272fc9d..53e5613ece3 100644 --- a/zipkin-server/src/test/java/zipkin2/server/internal/throttle/ThrottledStorageComponentTest.java +++ b/zipkin-server/src/test/java/zipkin2/server/internal/throttle/ThrottledStorageComponentTest.java @@ -16,12 +16,13 @@ */ package zipkin2.server.internal.throttle; +import io.micrometer.core.instrument.MeterRegistry; import io.micrometer.core.instrument.composite.CompositeMeterRegistry; -import static org.junit.Assert.*; +import static org.junit.Assert.assertSame; import org.junit.Rule; import org.junit.Test; import org.junit.rules.ExpectedException; -import static org.mockito.Mockito.*; +import static org.mockito.Mockito.mock; import zipkin2.storage.StorageComponent; public class ThrottledStorageComponentTest { @@ -31,7 +32,7 @@ public class ThrottledStorageComponentTest { @Test public void spanConsumer_isProxied() { StorageComponent delegate = mock(StorageComponent.class); - CompositeMeterRegistry registry = new CompositeMeterRegistry(); + MeterRegistry registry = new CompositeMeterRegistry(); ThrottledStorageComponent throttle = new ThrottledStorageComponent(delegate, registry, 1, 2, 1); @@ -49,7 +50,7 @@ public void createComponent_withoutMeter() { @Test public void createComponent_withZeroSizedQueue() { StorageComponent delegate = mock(StorageComponent.class); - CompositeMeterRegistry registry = new CompositeMeterRegistry(); + MeterRegistry registry = new CompositeMeterRegistry(); int queueSize = 0; new ThrottledStorageComponent(delegate, registry, 1, 2, queueSize); @@ -59,7 +60,7 @@ public void createComponent_withZeroSizedQueue() { @Test public void createComponent_withNegativeQueue() { StorageComponent delegate = mock(StorageComponent.class); - CompositeMeterRegistry registry = new CompositeMeterRegistry(); + MeterRegistry registry = new CompositeMeterRegistry(); expectedException.expect(IllegalArgumentException.class); int queueSize = -1;