Skip to content

Commit

Permalink
Review feedback.
Browse files Browse the repository at this point in the history
Made all classes final and removed any `private` qualifiers.
Made Exception handling more consistent with the rest of the system.
Added test to validate some Exception handling.
Updated how Threads get renamed to be more straight-forward.
Added additional tests.
Improved documentation and added relevant sections to READMEs.
Renamed environment variables for consistency.
Updated imports to follow Square formatter.
  • Loading branch information
Logic-32 committed Apr 30, 2019
1 parent ca2d3ca commit 09e7ac4
Show file tree
Hide file tree
Showing 15 changed files with 314 additions and 106 deletions.
3 changes: 3 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,9 @@ It stores spans as json and has been designed for larger scale.

Note: This store requires a [spark job](https://github.com/openzipkin/zipkin-dependencies) to aggregate dependency links.

### Throttling
As part of a [Collector surge and error handling](https://cwiki.apache.org/confluence/display/ZIPKIN/Collector+surge+and+error+handling) discussion that took place a throttling mechanism was added to allow more fine-grained control over how Zipkin interacts with the various `StorageComponents`. In particular, for those installations which use a push-based Collector (such as the HTTP rest API), enabling the throttle can allow Zipkin to buffer some messages in order to avoid aggressively dropping them. See [zipkin-server](zipkin-server#throttled-storage) for configuration information.

### Disabling search
The following API endpoints provide search features, and are enabled by
default. Search primarily allows the trace list screen of the UI operate.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,8 @@ RuntimeException doError(String message, Throwable e) {
}

if (e instanceof RejectedExecutionException) {
// This can indicate to a higher layer that we need to slow down ingestion
// No need to wrap in this instance. Wrapping could also be detrimental to higher layers
// where we expect certain Exceptions.
return (RejectedExecutionException) e;
} else {
return new RuntimeException(message, e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,10 @@
import zipkin2.storage.StorageComponent;

import static java.util.Arrays.asList;
import java.util.concurrent.RejectedExecutionException;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.ArgumentMatchers.*;
import static org.mockito.ArgumentMatchers.contains;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.verify;
Expand Down Expand Up @@ -133,6 +135,14 @@ public void errorAcceptingSpans_onErrorWithMessage() {
assertThat(message).contains("due to IllegalArgumentException(no beer)");
}

@Test
public void errorAcceptingSpans_onErrorRejectedExecution() {
RuntimeException expected = new RejectedExecutionException("slow down");
RuntimeException result = collector.errorStoringSpans(asList(CLIENT_SPAN), expected);

assertThat(result).isInstanceOf(RejectedExecutionException.class);
}

@Test
public void errorDecoding_onErrorWithNullMessage() {
String message = collector.errorReading(new RuntimeException()).getMessage();
Expand Down
8 changes: 8 additions & 0 deletions zipkin-server/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,14 @@ Defaults to true
* `AUTOCOMPLETE_KEYS`: list of span tag keys which will be returned by the `/api/v2/autocompleteTags` endpoint
* `AUTOCOMPLETE_TTL`: How long in milliseconds to suppress calls to write the same autocomplete key/value pair. Default 3600000 (1 hr)

### Throttled Storage
These settings can be used to help tune the rate at which Zipkin flushes data to another, underlying `StorageComponent` (such as Elasticsearch):

* `STORAGE_THROTTLE_ENABLED`: Enables throttling
* `STORAGE_THROTTLE_MIN_CONCURRENCY`: Minimum number of Threads to use for writing to storage.
* `STORAGE_THROTTLE_MAX_CONCURRENCY`: Maximum number of Threads to use for writing to storage. In order to avoid configuration drift, this value may override other, storage-specific values such as Elasticsearch's `ES_MAX_REQUESTS`.
* `STORAGE_THROTTLE_MAX_QUEUE_SIZE`: How many messages to buffer while all Threads are writing data before abandoning a message (0 = no buffering).

### Cassandra Storage
Zipkin's [Cassandra storage component](../zipkin-storage/cassandra)
supports version 3.11+ and applies when `STORAGE_TYPE` is set to `cassandra3`:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
@Conditional(ConditionalOnThrottledStorage.ThrottledStorageCondition.class)
@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.TYPE, ElementType.METHOD})
public @interface ConditionalOnThrottledStorage {
@interface ConditionalOnThrottledStorage {
class ThrottledStorageCondition extends SpringBootCondition {
@Override
public ConditionOutcome getMatchOutcome(ConditionContext context, AnnotatedTypeMetadata a) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ class ZipkinElasticsearchStorageProperties implements Serializable { // for Spar
private String dateSeparator = "-";
/** Sets maximum in-flight requests from this process to any Elasticsearch host. Defaults to 64 (overriden by throttle settings) */
private int maxRequests = 64;
/** Overrdies maximum in-flight requests to match throttling settings if throttling is enabled. */
/** Overrides maximum in-flight requests to match throttling settings if throttling is enabled. */
private Integer throttleMaxConcurrency;
/** Number of shards (horizontal scaling factor) per index. Defaults to 5. */
private int indexShards = 5;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadPoolExecutor;

class ActuateThrottleMetrics {
final class ActuateThrottleMetrics {
private final MeterRegistry registryInstance;

public ActuateThrottleMetrics(MeterRegistry registryInstance) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,17 +41,17 @@
*
* @see ThrottledStorageComponent
*/
class ThrottledCall<V> extends Call<V> {
private final ExecutorService executor;
private final Limiter<Void> limiter;
private final Listener limitListener;
final class ThrottledCall<V> extends Call<V> {
final ExecutorService executor;
final Limiter<Void> limiter;
final Listener limitListener;
/**
* Delegate call needs to be supplied later to avoid having it take action when it is created (like
* {@link InMemoryStorage} and thus avoid being throttled.
*/
private final Supplier<Call<V>> delegate;
private Call<V> call;
private boolean canceled;
final Supplier<Call<V>> delegate;
Call<V> call;
volatile boolean canceled;

public ThrottledCall(ExecutorService executor, Limiter<Void> limiter, Supplier<Call<V>> delegate) {
this.executor = executor;
Expand All @@ -71,8 +71,11 @@ public V execute() throws IOException {

// Make sure we throttle
Future<V> future = executor.submit(() -> {
try (AutoCloseable nameReverter = updateThreadName(call.toString())) {
String oldName = setCurrentThreadName(call.toString());
try {
return call.execute();
} finally {
setCurrentThreadName(oldName);
}
});
V result = future.get(); // Still block for the response
Expand All @@ -88,48 +91,33 @@ public V execute() throws IOException {
limitListener.onIgnore();
}

throw new ThrottleException(cause);
} catch (RuntimeException e) {
if (cause instanceof RuntimeException) {
throw (RuntimeException) cause;
} else if (cause instanceof IOException) {
throw (IOException) cause;
} else {
throw new RuntimeException("Issue while executing on a throttled call", cause);
}
} catch (InterruptedException e) {
limitListener.onIgnore();
throw e; // E.g. RejectedExecutionException
throw new RuntimeException("Interrupted while blocking on a throttled call", e);
} catch (Exception e) {
// Ignoring in all cases here because storage itself isn't saying we need to throttle. Though, we may still be
// write bound, but a drop in concurrency won't necessarily help.
limitListener.onIgnore();
throw new ThrottleException(e);
throw e;
}
}

@Override
public void enqueue(Callback<V> callback) {
try {
executor.execute(() -> {
if (canceled) {
return;
}

call = delegate.get();

// Not using try-with-resources to avoid catching exceptions that occur anywhere outside of close()
AutoCloseable nameReverter = updateThreadName(call.toString());
try {
ThrottledCallback<V> throttleCallback = new ThrottledCallback<>(callback, limitListener);
call.enqueue(throttleCallback);

// Need to wait here since the delegate call will run asynchronously also.
// This ensures we don't exceed our throttle/queue limits.
throttleCallback.await();
} finally {
try {
nameReverter.close();
} catch (Exception e) {
// swallow
}
}
});
executor.execute(new QueuedCall(callback));
} catch (Exception e) {
// Ignoring in all cases here because storage itself isn't saying we need to throttle. Though, we may still be
// write bound, but a drop in concurrency won't neccessarily help.
// write bound, but a drop in concurrency won't necessarily help.
limitListener.onIgnore();
throw e; // E.g. RejectedExecutionException
throw e;
}
}

Expand All @@ -151,21 +139,63 @@ public Call<V> clone() {
return new ThrottledCall<>(this);
}

private static AutoCloseable updateThreadName(String name) {
/**
* @param name New name for the current Thread
* @return Previous name of the current Thread
*/
static String setCurrentThreadName(String name) {
Thread thread = Thread.currentThread();
String originalName = thread.getName();
try {
Thread thread = Thread.currentThread();
String originalName = thread.getName();
thread.setName(name);
return () -> thread.setName(originalName);
return originalName;
} catch (SecurityException e) {
return () -> {};
return originalName;
}
}

final class QueuedCall implements Runnable {
final Callback<V> callback;

public QueuedCall(Callback<V> callback) {
this.callback = callback;
}

@Override
public void run() {
try {
if (canceled) {
return;
}

call = ThrottledCall.this.delegate.get();

String oldName = setCurrentThreadName(call.toString());
try {
enqueueAndWait();
} finally {
setCurrentThreadName(oldName);
}
} catch (Exception e) {
limitListener.onIgnore();
callback.onError(e);
}
}

void enqueueAndWait() {
ThrottledCallback<V> throttleCallback = new ThrottledCallback<>(callback, limitListener);
call.enqueue(throttleCallback);

// Need to wait here since the callback call will run asynchronously also.
// This ensures we don't exceed our throttle/queue limits.
throttleCallback.await();
}
}

private static class ThrottledCallback<V> implements Callback<V> {
private final Callback<V> delegate;
private final Listener limitListener;
private final CountDownLatch latch;
static final class ThrottledCallback<V> implements Callback<V> {
final Callback<V> delegate;
final Listener limitListener;
final CountDownLatch latch;

public ThrottledCallback(Callback<V> delegate, Listener limitListener) {
this.delegate = delegate;
Expand All @@ -176,9 +206,9 @@ public ThrottledCallback(Callback<V> delegate, Listener limitListener) {
public void await() {
try {
latch.await();
} catch (InterruptedException ex) {
} catch (InterruptedException e) {
limitListener.onIgnore();
throw new ThrottleException(ex);
throw new RuntimeException("Interrupted while blocking on a throttled call", e);
}
}

Expand All @@ -195,17 +225,16 @@ public void onSuccess(V value) {
@Override
public void onError(Throwable t) {
try {
limitListener.onDropped();
if (t instanceof RejectedExecutionException) {
limitListener.onDropped();
} else {
limitListener.onIgnore();
}

delegate.onError(t);
} finally {
latch.countDown();
}
}
}

public static class ThrottleException extends RuntimeException {
public ThrottleException(Throwable cause) {
super(cause);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,10 @@
*
* @see ThrottledCall
*/
class ThrottledSpanConsumer implements SpanConsumer {
private final SpanConsumer delegate;
private final Limiter<Void> limiter;
private final ExecutorService executor;
final class ThrottledSpanConsumer implements SpanConsumer {
final SpanConsumer delegate;
final Limiter<Void> limiter;
final ExecutorService executor;

ThrottledSpanConsumer(SpanConsumer delegate, Limiter<Void> limiter, ExecutorService executor) {
this.delegate = delegate;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,10 +44,10 @@
*
* @see ThrottledSpanConsumer
*/
public class ThrottledStorageComponent extends StorageComponent {
private final StorageComponent delegate;
private final AbstractLimiter<Void> limiter;
private final ThreadPoolExecutor executor;
public final class ThrottledStorageComponent extends StorageComponent {
final StorageComponent delegate;
final AbstractLimiter<Void> limiter;
final ThreadPoolExecutor executor;

public ThrottledStorageComponent(StorageComponent delegate,
MeterRegistry registry,
Expand All @@ -57,10 +57,11 @@ public ThrottledStorageComponent(StorageComponent delegate,
this.delegate = Objects.requireNonNull(delegate);

Limit limit = Gradient2Limit.newBuilder()
.minLimit(minConcurrency)
.initialLimit(minConcurrency) // Limiter will trend towards min until otherwise necessary so may as well start there
.maxConcurrency(maxConcurrency)
.build();
.minLimit(minConcurrency)
.initialLimit(minConcurrency) // Limiter will trend towards min until otherwise necessary so may as well start there
.maxConcurrency(maxConcurrency)
.queueSize(0)
.build();
this.limiter = new Builder().limit(limit).build();

this.executor = new ThreadPoolExecutor(limit.getLimit(),
Expand All @@ -70,7 +71,7 @@ public ThrottledStorageComponent(StorageComponent delegate,
createQueue(maxQueueSize),
new ThottledThreadFactory(),
new ThreadPoolExecutor.AbortPolicy());
limit.notifyOnChange(new PoolSizeConsumer(executor));
limit.notifyOnChange(new ThreadPoolExecutorResizer(executor));

if (registry != null) {
ActuateThrottleMetrics metrics = new ActuateThrottleMetrics(registry);
Expand Down Expand Up @@ -108,7 +109,7 @@ private static BlockingQueue<Runnable> createQueue(int maxSize) {
return new LinkedBlockingQueue<>(maxSize);
}

private static class ThottledThreadFactory implements ThreadFactory {
static final class ThottledThreadFactory implements ThreadFactory {
@Override
public Thread newThread(Runnable r) {
Thread thread = new Thread(r);
Expand All @@ -118,10 +119,10 @@ public Thread newThread(Runnable r) {
}
}

private static class PoolSizeConsumer implements Consumer<Integer> {
private final ThreadPoolExecutor executor;
static final class ThreadPoolExecutorResizer implements Consumer<Integer> {
final ThreadPoolExecutor executor;

public PoolSizeConsumer(ThreadPoolExecutor executor) {
public ThreadPoolExecutorResizer(ThreadPoolExecutor executor) {
this.executor = executor;
}

Expand Down Expand Up @@ -149,7 +150,7 @@ public synchronized void accept(Integer newValue) {
}
}

private static class Builder extends AbstractLimiter.Builder<Builder> {
static final class Builder extends AbstractLimiter.Builder<Builder> {
public NonLimitingLimiter build() {
return new NonLimitingLimiter(this);
}
Expand All @@ -165,7 +166,7 @@ protected Builder self() {
* {@link #acquire(java.lang.Void)}. The point of this is to ensure that we can always derive an appropriate
* {@link Limit#getLimit() Limit} while the {@link #executor} handles actually limiting running requests.
*/
private static class NonLimitingLimiter extends AbstractLimiter<Void> {
static final class NonLimitingLimiter extends AbstractLimiter<Void> {
public NonLimitingLimiter(AbstractLimiter.Builder<?> builder) {
super(builder);
}
Expand Down
Loading

0 comments on commit 09e7ac4

Please sign in to comment.