Skip to content

Commit

Permalink
Makes it an error to store during assembly of a call (openzipkin#2580)
Browse files Browse the repository at this point in the history
Before this, there was some extra code in the throttle package handling
a bug in our in memory storage. This fixes that and removes the extra
code.

See openzipkin#2502
  • Loading branch information
adriancole authored and abesto committed Sep 10, 2019
1 parent 575ed12 commit 33a9d22
Show file tree
Hide file tree
Showing 13 changed files with 266 additions and 179 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import zipkin2.codec.CodecBenchmarks;
import zipkin2.codec.SpanBytesDecoder;
import zipkin2.elasticsearch.ElasticsearchStorage;
import zipkin2.elasticsearch.internal.BulkCallBuilder.IndexEntry;

@Measurement(iterations = 5, time = 1)
@Warmup(iterations = 10, time = 1)
Expand All @@ -50,18 +51,24 @@ public class BulkRequestBenchmarks {
static final Span CLIENT_SPAN = SpanBytesDecoder.JSON_V2.decodeOne(read("/zipkin2-client.json"));

final ElasticsearchStorage es = ElasticsearchStorage.newBuilder().build();
final BulkCallBuilder builder = new BulkCallBuilder(es, 6.7f, "index-span");

final long indexTimestamp = CLIENT_SPAN.timestampAsLong() / 1000L;
final String spanIndex =
es.indexNameFormatter().formatTypeAndTimestampForInsert("span", '-', indexTimestamp);
final IndexEntry<Span> entry =
BulkCallBuilder.newIndexEntry(spanIndex, "span", CLIENT_SPAN, BulkIndexWriter.SPAN);

@Benchmark public void writeRequest_singleSpan() throws IOException {
BulkCallBuilder.write(Okio.buffer(Okio.blackhole()), entry, true);
}

@Benchmark public void buildAndWriteRequest_singleSpan() throws IOException {
BulkCallBuilder builder = new BulkCallBuilder(es, 6.7f, "index-span");
builder.index(spanIndex, "span", CLIENT_SPAN, BulkIndexWriter.SPAN);
builder.build().call.request().body().writeTo(Okio.buffer(Okio.blackhole()));
}

@Benchmark public void buildAndWriteRequest_tenSpans() throws IOException {
BulkCallBuilder builder = new BulkCallBuilder(es, 6.7f, "index-span");
for (int i = 0; i < 10; i++) {
builder.index(spanIndex, "span", CLIENT_SPAN, BulkIndexWriter.SPAN);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ class ZipkinElasticsearchStorageProperties implements Serializable { // for Spar

ZipkinElasticsearchStorageProperties(
@Value("${zipkin.storage.throttle.enabled:false}") boolean throttleEnabled,
@Value("${zipkin.storage.throttle.maxConcurrency:200}") int throttleMaxConcurrency) {
@Value("${zipkin.storage.throttle.max-concurrency:200}") int throttleMaxConcurrency) {
if (throttleEnabled) {
this.throttleMaxConcurrency = throttleMaxConcurrency;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,8 @@
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.RejectedExecutionException;
import java.util.function.Supplier;
import zipkin2.Call;
import zipkin2.Callback;
import zipkin2.storage.InMemoryStorage;

/**
* {@link Call} implementation that is backed by an {@link ExecutorService}. The ExecutorService
Expand All @@ -41,39 +39,21 @@
*
* @see ThrottledStorageComponent
*/
final class ThrottledCall<V> extends Call<V> {
final class ThrottledCall<V> extends Call.Base<V> {
final ExecutorService executor;
final Limiter<Void> limiter;
final Listener limitListener;
/**
* supplier call needs to be supplied later to avoid having it take action when it is created
* (like {@link InMemoryStorage} and thus avoid being throttled.
*/
final Supplier<? extends Call<V>> supplier;
volatile Call<V> delegate;
volatile boolean canceled;

public ThrottledCall(ExecutorService executor, Limiter<Void> limiter,
Supplier<? extends Call<V>> supplier) {
final Call<V> delegate;

ThrottledCall(ExecutorService executor, Limiter<Void> limiter, Call<V> delegate) {
this.executor = executor;
this.limiter = limiter;
this.limitListener = limiter.acquire(null).orElseThrow(RejectedExecutionException::new);
this.supplier = supplier;
this.delegate = delegate;
}

// TODO: refactor this when in-memory no longer executes storage ops during assembly time
ThrottledCall(ThrottledCall<V> other) {
this(other.executor, other.limiter,
other.delegate == null ? other.supplier : () -> other.delegate.clone());
}
@Override protected V doExecute() throws IOException {
Listener limitListener = limiter.acquire(null).orElseThrow(RejectedExecutionException::new);

// TODO: we cannot currently extend Call.Base as tests execute the call multiple times,
// which is invalid as calls are one-shot. It isn't worth refactoring until we refactor out
// the need for assembly time throttling (fix to in-memory storage)
@Override public V execute() throws IOException {
try {
delegate = supplier.get();

// Make sure we throttle
Future<V> future = executor.submit(() -> {
String oldName = setCurrentThreadName(delegate.toString());
Expand Down Expand Up @@ -115,9 +95,11 @@ public ThrottledCall(ExecutorService executor, Limiter<Void> limiter,
}
}

@Override public void enqueue(Callback<V> callback) {
@Override protected void doEnqueue(Callback<V> callback) {
Listener limitListener = limiter.acquire(null).orElseThrow(RejectedExecutionException::new);

try {
executor.execute(new QueuedCall(callback));
executor.execute(new QueuedCall<>(delegate, callback, limitListener));
} catch (RuntimeException | Error e) {
propagateIfFatal(e);
// Ignoring in all cases here because storage itself isn't saying we need to throttle. Though, we may still be
Expand All @@ -127,21 +109,12 @@ public ThrottledCall(ExecutorService executor, Limiter<Void> limiter,
}
}

@Override public void cancel() {
canceled = true;
if (delegate != null) delegate.cancel();
}

@Override public boolean isCanceled() {
return canceled || (delegate != null && delegate.isCanceled());
}

@Override public Call<V> clone() {
return new ThrottledCall<>(this);
return new ThrottledCall<>(executor, limiter, delegate.clone());
}

@Override public String toString() {
return "Throttled" + supplier;
return "Throttled(" + delegate + ")";
}

static String setCurrentThreadName(String name) {
Expand All @@ -151,18 +124,20 @@ static String setCurrentThreadName(String name) {
return originalName;
}

final class QueuedCall implements Runnable {
static final class QueuedCall<V> implements Runnable {
final Call<V> delegate;
final Callback<V> callback;
final Listener limitListener;

QueuedCall(Callback<V> callback) {
QueuedCall(Call<V> delegate, Callback<V> callback, Listener limitListener) {
this.delegate = delegate;
this.callback = callback;
this.limitListener = limitListener;
}

@Override public void run() {
try {
if (isCanceled()) return;

delegate = ThrottledCall.this.supplier.get();
if (delegate.isCanceled()) return;

String oldName = setCurrentThreadName(delegate.toString());
try {
Expand All @@ -185,15 +160,19 @@ void enqueueAndWait() {
// This ensures we don't exceed our throttle/queue limits.
throttleCallback.await();
}

@Override public String toString() {
return "QueuedCall{delegate=" + delegate + ", callback=" + callback + "}";
}
}

static final class ThrottledCallback<V> implements Callback<V> {
final Callback<V> supplier;
final Callback<V> delegate;
final Listener limitListener;
final CountDownLatch latch = new CountDownLatch(1);

ThrottledCallback(Callback<V> supplier, Listener limitListener) {
this.supplier = supplier;
ThrottledCallback(Callback<V> delegate, Listener limitListener) {
this.delegate = delegate;
this.limitListener = limitListener;
}

Expand All @@ -210,7 +189,7 @@ void await() {
@Override public void onSuccess(V value) {
try {
limitListener.onSuccess();
supplier.onSuccess(value);
delegate.onSuccess(value);
} finally {
latch.countDown();
}
Expand All @@ -224,10 +203,14 @@ void await() {
limitListener.onIgnore();
}

supplier.onError(t);
delegate.onError(t);
} finally {
latch.countDown();
}
}

@Override public String toString() {
return "Throttled(" + delegate + ")";
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -101,10 +101,10 @@ public ThrottledStorageComponent(StorageComponent delegate, MeterRegistry regist
}

@Override public String toString() {
return "Throttled" + delegate;
return "Throttled(" + delegate + ")";
}

final class ThrottledSpanConsumer implements SpanConsumer {
static final class ThrottledSpanConsumer implements SpanConsumer {
final SpanConsumer delegate;
final Limiter<Void> limiter;
final ExecutorService executor;
Expand All @@ -116,11 +116,11 @@ final class ThrottledSpanConsumer implements SpanConsumer {
}

@Override public Call<Void> accept(List<Span> spans) {
return new ThrottledCall<>(executor, limiter, () -> delegate.accept(spans));
return new ThrottledCall<>(executor, limiter, delegate.accept(spans));
}

@Override public String toString() {
return "Throttled" + delegate;
return "Throttled(" + delegate + ")";
}
}

Expand Down
Loading

0 comments on commit 33a9d22

Please sign in to comment.