Skip to content

Commit

Permalink
Ports tests and addresses PR feedback
Browse files Browse the repository at this point in the history
  • Loading branch information
Adrian Cole committed May 10, 2019
1 parent 13d9997 commit cfdccf0
Show file tree
Hide file tree
Showing 18 changed files with 507 additions and 654 deletions.
3 changes: 0 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -94,9 +94,6 @@ It stores spans as json and has been designed for larger scale.

Note: This store requires a [spark job](https://github.com/openzipkin/zipkin-dependencies) to aggregate dependency links.

### Throttling
As part of a [Collector surge and error handling](https://cwiki.apache.org/confluence/display/ZIPKIN/Collector+surge+and+error+handling) discussion that took place a throttling mechanism was added to allow more fine-grained control over how Zipkin interacts with the various `StorageComponents`. In particular, for those installations which use a push-based Collector (such as the HTTP rest API), enabling the throttle can allow Zipkin to buffer some messages in order to avoid aggressively dropping them. See [zipkin-server](zipkin-server#throttled-storage) for configuration information.

### Disabling search
The following API endpoints provide search features, and are enabled by
default. Search primarily allows the trace list screen of the UI operate.
Expand Down
1 change: 0 additions & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,6 @@
<armeria.version>0.84.0</armeria.version>
<!-- This is from armeria, but be careful to avoid >= v20 apis -->
<guava.version>27.0.1-jre</guava.version>
<netflix.concurrency.limits.version>0.2.0</netflix.concurrency.limits.version>

<!-- only used for proto interop testing -->
<wire.version>3.0.0-alpha01</wire.version>
Expand Down
4 changes: 3 additions & 1 deletion zipkin-server/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -157,14 +157,16 @@ Defaults to true
* `AUTOCOMPLETE_KEYS`: list of span tag keys which will be returned by the `/api/v2/autocompleteTags` endpoint
* `AUTOCOMPLETE_TTL`: How long in milliseconds to suppress calls to write the same autocomplete key/value pair. Default 3600000 (1 hr)

### Throttled Storage
### Throttled Storage (Experimental)
These settings can be used to help tune the rate at which Zipkin flushes data to another, underlying `StorageComponent` (such as Elasticsearch):

* `STORAGE_THROTTLE_ENABLED`: Enables throttling
* `STORAGE_THROTTLE_MIN_CONCURRENCY`: Minimum number of Threads to use for writing to storage.
* `STORAGE_THROTTLE_MAX_CONCURRENCY`: Maximum number of Threads to use for writing to storage. In order to avoid configuration drift, this value may override other, storage-specific values such as Elasticsearch's `ES_MAX_REQUESTS`.
* `STORAGE_THROTTLE_MAX_QUEUE_SIZE`: How many messages to buffer while all Threads are writing data before abandoning a message (0 = no buffering).

As this feature is experimental, it is not recommended to run this in production environments.

### Cassandra Storage
Zipkin's [Cassandra storage component](../zipkin-storage/cassandra)
supports version 3.11+ and applies when `STORAGE_TYPE` is set to `cassandra3`:
Expand Down
3 changes: 1 addition & 2 deletions zipkin-server/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -253,7 +253,7 @@
<dependency>
<groupId>com.netflix.concurrency-limits</groupId>
<artifactId>concurrency-limits-core</artifactId>
<version>${netflix.concurrency.limits.version}</version>
<version>0.2.2</version>
</dependency>
<dependency>
<groupId>io.micrometer</groupId>
Expand Down Expand Up @@ -388,7 +388,6 @@
<version>${kotlin.version}</version>
<configuration>
<jvmTarget>${main.java.version}</jvmTarget>
<experimentalCoroutines>enable</experimentalCoroutines>
</configuration>
<executions>
<execution>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,8 +64,9 @@ class ZipkinElasticsearchStorageProperties implements Serializable { // for Spar
*/
private int timeout = 10_000;

public ZipkinElasticsearchStorageProperties(@Value("${zipkin.storage.throttle.enabled:false}") boolean throttleEnabled,
@Value("${zipkin.storage.throttle.maxConcurrency:200}") int throttleMaxConcurrency) {
ZipkinElasticsearchStorageProperties(
@Value("${zipkin.storage.throttle.enabled:false}") boolean throttleEnabled,
@Value("${zipkin.storage.throttle.maxConcurrency:200}") int throttleMaxConcurrency) {
if (throttleEnabled) {
this.throttleMaxConcurrency = throttleMaxConcurrency;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,45 +16,34 @@
*/
package zipkin2.server.internal.throttle;

import com.netflix.concurrency.limits.Limiter;
import com.netflix.concurrency.limits.limiter.AbstractLimiter;
import io.micrometer.core.instrument.Gauge;
import io.micrometer.core.instrument.MeterRegistry;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import zipkin2.server.internal.ActuateCollectorMetrics;

/** Follows the same naming convention as {@link ActuateCollectorMetrics} */
final class ActuateThrottleMetrics {
private final MeterRegistry registryInstance;
final MeterRegistry registryInstance;

public ActuateThrottleMetrics(MeterRegistry registryInstance) {
ActuateThrottleMetrics(MeterRegistry registryInstance) {
this.registryInstance = registryInstance;
}

public void bind(ExecutorService executor) {
if(!(executor instanceof ThreadPoolExecutor)){
return;
}

ThreadPoolExecutor pool = (ThreadPoolExecutor) executor;
void bind(ThreadPoolExecutor pool) {
Gauge.builder("zipkin_storage.throttle.concurrency", pool::getCorePoolSize)
.description("number of threads running storage requests")
.register(registryInstance);
.description("number of threads running storage requests")
.register(registryInstance);
Gauge.builder("zipkin_storage.throttle.queue_size", pool.getQueue()::size)
.description("number of items queued waiting for access to storage")
.register(registryInstance);
.description("number of items queued waiting for access to storage")
.register(registryInstance);
}

public void bind(Limiter<Void> limiter) {
if(!(limiter instanceof AbstractLimiter)){
return;
}

AbstractLimiter abstractLimiter = (AbstractLimiter) limiter;

void bind(AbstractLimiter limiter) {
// This value should parallel (zipkin_storage.throttle.queue_size + zipkin_storage.throttle.concurrency)
// It is tracked to make sure it doesn't perpetually increase. If it does then we're not resolving LimitListeners.
Gauge.builder("zipkin_storage.throttle.in_flight_requests", abstractLimiter::getInflight)
.description("number of requests the limiter thinks are active")
.register(registryInstance);
Gauge.builder("zipkin_storage.throttle.in_flight_requests", limiter::getInflight)
.description("number of requests the limiter thinks are active")
.register(registryInstance);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,13 @@
import zipkin2.storage.InMemoryStorage;

/**
* {@link Call} implementation that is backed by an {@link ExecutorService}. The ExecutorService serves two
* purposes:
* {@link Call} implementation that is backed by an {@link ExecutorService}. The ExecutorService
* serves two purposes:
* <ol>
* <li>Limits the number of requests that can run in parallel.</li>
* <li>Depending on configuration, can queue up requests to make sure we don't aggressively drop requests that would
* otherwise succeed if given a moment. Bounded queues are safest for this as unbounded ones can lead to heap
* exhaustion and {@link OutOfMemoryError OOM errors}.</li>
* <li>Depending on configuration, can queue up requests to make sure we don't aggressively drop
* requests that would otherwise succeed if given a moment. Bounded queues are safest for this as
* unbounded ones can lead to heap exhaustion and {@link OutOfMemoryError OOM errors}.</li>
* </ol>
*
* @see ThrottledStorageComponent
Expand All @@ -46,34 +46,39 @@ final class ThrottledCall<V> extends Call<V> {
final Limiter<Void> limiter;
final Listener limitListener;
/**
* Delegate call needs to be supplied later to avoid having it take action when it is created (like
* {@link InMemoryStorage} and thus avoid being throttled.
* supplier call needs to be supplied later to avoid having it take action when it is created
* (like {@link InMemoryStorage} and thus avoid being throttled.
*/
final Supplier<Call<V>> delegate;
Call<V> call;
final Supplier<? extends Call<V>> supplier;
volatile Call<V> delegate;
volatile boolean canceled;

public ThrottledCall(ExecutorService executor, Limiter<Void> limiter, Supplier<Call<V>> delegate) {
public ThrottledCall(ExecutorService executor, Limiter<Void> limiter,
Supplier<? extends Call<V>> supplier) {
this.executor = executor;
this.limiter = limiter;
this.limitListener = limiter.acquire(null).orElseThrow(RejectedExecutionException::new);
this.delegate = delegate;
this.supplier = supplier;
}

private ThrottledCall(ThrottledCall other) {
this(other.executor, other.limiter, other.call == null ? other.delegate : () -> other.call.clone());
// TODO: refactor this when in-memory no longer executes storage ops during assembly time
ThrottledCall(ThrottledCall<V> other) {
this(other.executor, other.limiter,
other.delegate == null ? other.supplier : () -> other.delegate.clone());
}

@Override
public V execute() throws IOException {
// TODO: we cannot currently extend Call.Base as tests execute the call multiple times,
// which is invalid as calls are one-shot. It isn't worth refactoring until we refactor out
// the need for assembly time throttling (fix to in-memory storage)
@Override public V execute() throws IOException {
try {
call = delegate.get();
delegate = supplier.get();

// Make sure we throttle
Future<V> future = executor.submit(() -> {
String oldName = setCurrentThreadName(call.toString());
String oldName = setCurrentThreadName(delegate.toString());
try {
return call.execute();
return delegate.execute();
} finally {
setCurrentThreadName(oldName);
}
Expand Down Expand Up @@ -101,90 +106,80 @@ public V execute() throws IOException {
} catch (InterruptedException e) {
limitListener.onIgnore();
throw new RuntimeException("Interrupted while blocking on a throttled call", e);
} catch (Exception e) {
} catch (RuntimeException | Error e) {
propagateIfFatal(e);
// Ignoring in all cases here because storage itself isn't saying we need to throttle. Though, we may still be
// write bound, but a drop in concurrency won't necessarily help.
limitListener.onIgnore();
throw e;
}
}

@Override
public void enqueue(Callback<V> callback) {
@Override public void enqueue(Callback<V> callback) {
try {
executor.execute(new QueuedCall(callback));
} catch (Exception e) {
} catch (RuntimeException | Error e) {
propagateIfFatal(e);
// Ignoring in all cases here because storage itself isn't saying we need to throttle. Though, we may still be
// write bound, but a drop in concurrency won't necessarily help.
limitListener.onIgnore();
throw e;
}
}

@Override
public void cancel() {
@Override public void cancel() {
canceled = true;
if (call != null) {
call.cancel();
}
if (delegate != null) delegate.cancel();
}

@Override
public boolean isCanceled() {
return canceled || (call != null && call.isCanceled());
@Override public boolean isCanceled() {
return canceled || (delegate != null && delegate.isCanceled());
}

@Override
public Call<V> clone() {
@Override public Call<V> clone() {
return new ThrottledCall<>(this);
}

/**
* @param name New name for the current Thread
* @return Previous name of the current Thread
*/
@Override public String toString() {
return "Throttled" + supplier;
}

static String setCurrentThreadName(String name) {
Thread thread = Thread.currentThread();
String originalName = thread.getName();
try {
thread.setName(name);
return originalName;
} catch (SecurityException e) {
return originalName;
}
thread.setName(name);
return originalName;
}

final class QueuedCall implements Runnable {
final Callback<V> callback;

public QueuedCall(Callback<V> callback) {
QueuedCall(Callback<V> callback) {
this.callback = callback;
}

@Override
public void run() {
@Override public void run() {
try {
if (canceled) {
return;
}
if (isCanceled()) return;

call = ThrottledCall.this.delegate.get();
delegate = ThrottledCall.this.supplier.get();

String oldName = setCurrentThreadName(call.toString());
String oldName = setCurrentThreadName(delegate.toString());
try {
enqueueAndWait();
} finally {
setCurrentThreadName(oldName);
}
} catch (Exception e) {
} catch (RuntimeException | Error e) {
propagateIfFatal(e);
limitListener.onIgnore();
callback.onError(e);
}
}

void enqueueAndWait() {
ThrottledCallback<V> throttleCallback = new ThrottledCallback<>(callback, limitListener);
call.enqueue(throttleCallback);
delegate.enqueue(throttleCallback);

// Need to wait here since the callback call will run asynchronously also.
// This ensures we don't exceed our throttle/queue limits.
Expand All @@ -193,45 +188,43 @@ void enqueueAndWait() {
}

static final class ThrottledCallback<V> implements Callback<V> {
final Callback<V> delegate;
final Callback<V> supplier;
final Listener limitListener;
final CountDownLatch latch;
final CountDownLatch latch = new CountDownLatch(1);

public ThrottledCallback(Callback<V> delegate, Listener limitListener) {
this.delegate = delegate;
ThrottledCallback(Callback<V> supplier, Listener limitListener) {
this.supplier = supplier;
this.limitListener = limitListener;
this.latch = new CountDownLatch(1);
}

public void await() {
void await() {
try {
latch.await();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
limitListener.onIgnore();
throw new RuntimeException("Interrupted while blocking on a throttled call", e);
}
}

@Override
public void onSuccess(V value) {
@Override public void onSuccess(V value) {
try {
limitListener.onSuccess();
delegate.onSuccess(value);
supplier.onSuccess(value);
} finally {
latch.countDown();
}
}

@Override
public void onError(Throwable t) {
@Override public void onError(Throwable t) {
try {
if (t instanceof RejectedExecutionException) {
limitListener.onDropped();
} else {
limitListener.onIgnore();
}

delegate.onError(t);
supplier.onError(t);
} finally {
latch.countDown();
}
Expand Down
Loading

0 comments on commit cfdccf0

Please sign in to comment.