Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Makes it an error to store during assembly of a call #2580

Merged
merged 3 commits into from
May 11, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import zipkin2.codec.CodecBenchmarks;
import zipkin2.codec.SpanBytesDecoder;
import zipkin2.elasticsearch.ElasticsearchStorage;
import zipkin2.elasticsearch.internal.BulkCallBuilder.IndexEntry;

@Measurement(iterations = 5, time = 1)
@Warmup(iterations = 10, time = 1)
Expand All @@ -50,18 +51,24 @@ public class BulkRequestBenchmarks {
static final Span CLIENT_SPAN = SpanBytesDecoder.JSON_V2.decodeOne(read("/zipkin2-client.json"));

final ElasticsearchStorage es = ElasticsearchStorage.newBuilder().build();
final BulkCallBuilder builder = new BulkCallBuilder(es, 6.7f, "index-span");

final long indexTimestamp = CLIENT_SPAN.timestampAsLong() / 1000L;
final String spanIndex =
es.indexNameFormatter().formatTypeAndTimestampForInsert("span", '-', indexTimestamp);
final IndexEntry<Span> entry =
BulkCallBuilder.newIndexEntry(spanIndex, "span", CLIENT_SPAN, BulkIndexWriter.SPAN);

@Benchmark public void writeRequest_singleSpan() throws IOException {
BulkCallBuilder.write(Okio.buffer(Okio.blackhole()), entry, true);
}

@Benchmark public void buildAndWriteRequest_singleSpan() throws IOException {
BulkCallBuilder builder = new BulkCallBuilder(es, 6.7f, "index-span");
builder.index(spanIndex, "span", CLIENT_SPAN, BulkIndexWriter.SPAN);
builder.build().call.request().body().writeTo(Okio.buffer(Okio.blackhole()));
}

@Benchmark public void buildAndWriteRequest_tenSpans() throws IOException {
BulkCallBuilder builder = new BulkCallBuilder(es, 6.7f, "index-span");
for (int i = 0; i < 10; i++) {
builder.index(spanIndex, "span", CLIENT_SPAN, BulkIndexWriter.SPAN);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ class ZipkinElasticsearchStorageProperties implements Serializable { // for Spar

ZipkinElasticsearchStorageProperties(
@Value("${zipkin.storage.throttle.enabled:false}") boolean throttleEnabled,
@Value("${zipkin.storage.throttle.maxConcurrency:200}") int throttleMaxConcurrency) {
@Value("${zipkin.storage.throttle.max-concurrency:200}") int throttleMaxConcurrency) {
if (throttleEnabled) {
this.throttleMaxConcurrency = throttleMaxConcurrency;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,8 @@
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.RejectedExecutionException;
import java.util.function.Supplier;
import zipkin2.Call;
import zipkin2.Callback;
import zipkin2.storage.InMemoryStorage;

/**
* {@link Call} implementation that is backed by an {@link ExecutorService}. The ExecutorService
Expand All @@ -41,39 +39,21 @@
*
* @see ThrottledStorageComponent
*/
final class ThrottledCall<V> extends Call<V> {
final class ThrottledCall<V> extends Call.Base<V> {
final ExecutorService executor;
final Limiter<Void> limiter;
final Listener limitListener;
/**
* supplier call needs to be supplied later to avoid having it take action when it is created
* (like {@link InMemoryStorage} and thus avoid being throttled.
*/
final Supplier<? extends Call<V>> supplier;
volatile Call<V> delegate;
volatile boolean canceled;

public ThrottledCall(ExecutorService executor, Limiter<Void> limiter,
Supplier<? extends Call<V>> supplier) {
final Call<V> delegate;
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

look ma. no volatiles :)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

:D


ThrottledCall(ExecutorService executor, Limiter<Void> limiter, Call<V> delegate) {
this.executor = executor;
this.limiter = limiter;
this.limitListener = limiter.acquire(null).orElseThrow(RejectedExecutionException::new);
this.supplier = supplier;
this.delegate = delegate;
}

// TODO: refactor this when in-memory no longer executes storage ops during assembly time
ThrottledCall(ThrottledCall<V> other) {
this(other.executor, other.limiter,
other.delegate == null ? other.supplier : () -> other.delegate.clone());
}
@Override protected V doExecute() throws IOException {
Listener limitListener = limiter.acquire(null).orElseThrow(RejectedExecutionException::new);

// TODO: we cannot currently extend Call.Base as tests execute the call multiple times,
// which is invalid as calls are one-shot. It isn't worth refactoring until we refactor out
// the need for assembly time throttling (fix to in-memory storage)
@Override public V execute() throws IOException {
try {
delegate = supplier.get();

// Make sure we throttle
Future<V> future = executor.submit(() -> {
String oldName = setCurrentThreadName(delegate.toString());
Expand Down Expand Up @@ -115,9 +95,11 @@ public ThrottledCall(ExecutorService executor, Limiter<Void> limiter,
}
}

@Override public void enqueue(Callback<V> callback) {
@Override protected void doEnqueue(Callback<V> callback) {
Listener limitListener = limiter.acquire(null).orElseThrow(RejectedExecutionException::new);

try {
executor.execute(new QueuedCall(callback));
executor.execute(new QueuedCall<>(delegate, callback, limitListener));
} catch (RuntimeException | Error e) {
propagateIfFatal(e);
// Ignoring in all cases here because storage itself isn't saying we need to throttle. Though, we may still be
Expand All @@ -127,21 +109,12 @@ public ThrottledCall(ExecutorService executor, Limiter<Void> limiter,
}
}

@Override public void cancel() {
canceled = true;
if (delegate != null) delegate.cancel();
}

@Override public boolean isCanceled() {
return canceled || (delegate != null && delegate.isCanceled());
}

@Override public Call<V> clone() {
return new ThrottledCall<>(this);
return new ThrottledCall<>(executor, limiter, delegate.clone());
}

@Override public String toString() {
return "Throttled" + supplier;
return "Throttled(" + delegate + ")";
}

static String setCurrentThreadName(String name) {
Expand All @@ -151,18 +124,20 @@ static String setCurrentThreadName(String name) {
return originalName;
}

final class QueuedCall implements Runnable {
static final class QueuedCall<V> implements Runnable {
final Call<V> delegate;
final Callback<V> callback;
final Listener limitListener;

QueuedCall(Callback<V> callback) {
QueuedCall(Call<V> delegate, Callback<V> callback, Listener limitListener) {
this.delegate = delegate;
this.callback = callback;
this.limitListener = limitListener;
}

@Override public void run() {
try {
if (isCanceled()) return;

delegate = ThrottledCall.this.supplier.get();
if (delegate.isCanceled()) return;

String oldName = setCurrentThreadName(delegate.toString());
try {
Expand All @@ -185,15 +160,19 @@ void enqueueAndWait() {
// This ensures we don't exceed our throttle/queue limits.
throttleCallback.await();
}

@Override public String toString() {
return "QueuedCall{delegate=" + delegate + ", callback=" + callback + "}";
}
}

static final class ThrottledCallback<V> implements Callback<V> {
final Callback<V> supplier;
final Callback<V> delegate;
final Listener limitListener;
final CountDownLatch latch = new CountDownLatch(1);

ThrottledCallback(Callback<V> supplier, Listener limitListener) {
this.supplier = supplier;
ThrottledCallback(Callback<V> delegate, Listener limitListener) {
this.delegate = delegate;
this.limitListener = limitListener;
}

Expand All @@ -210,7 +189,7 @@ void await() {
@Override public void onSuccess(V value) {
try {
limitListener.onSuccess();
supplier.onSuccess(value);
delegate.onSuccess(value);
} finally {
latch.countDown();
}
Expand All @@ -224,10 +203,14 @@ void await() {
limitListener.onIgnore();
}

supplier.onError(t);
delegate.onError(t);
} finally {
latch.countDown();
}
}

@Override public String toString() {
return "Throttled(" + delegate + ")";
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -101,10 +101,10 @@ public ThrottledStorageComponent(StorageComponent delegate, MeterRegistry regist
}

@Override public String toString() {
return "Throttled" + delegate;
return "Throttled(" + delegate + ")";
}

final class ThrottledSpanConsumer implements SpanConsumer {
static final class ThrottledSpanConsumer implements SpanConsumer {
final SpanConsumer delegate;
final Limiter<Void> limiter;
final ExecutorService executor;
Expand All @@ -116,11 +116,11 @@ final class ThrottledSpanConsumer implements SpanConsumer {
}

@Override public Call<Void> accept(List<Span> spans) {
return new ThrottledCall<>(executor, limiter, () -> delegate.accept(spans));
return new ThrottledCall<>(executor, limiter, delegate.accept(spans));
}

@Override public String toString() {
return "Throttled" + delegate;
return "Throttled(" + delegate + ")";
}
}

Expand Down
Loading