Skip to content

Commit

Permalink
Uses ES bulk api only when there's more than one span
Browse files Browse the repository at this point in the history
During a test where 100 single-span messages are sent to Kafka at the
same time, I noticed only 58-97 of them would end up in storage
eventhough all messages parsed properly and no operations failed.

After a 100ms/message pause was added, the store rate of this test went
to 100%, so figured it was some sort of state issue. I noticed the code
was using Bulk operations regardless of input size, so as a wild guess
changed the special-case single-span messages. At least in this test, it
raised the success rate to 100% without any pausing needed.

I don't know why this worked, but it seems sensible to not use bulk apis
when there's no bulk action to perform.

I started to write a unit test to validate single-length lists don't use
bulk, but the Mockito involved became too verbose as the Elasticsearch
client uses chaining and other patterns that are tedious to mock.

Instead, we should make a parallel integration test and apply them to
all storage components.

See #1141
  • Loading branch information
Adrian Cole committed Jun 26, 2016
1 parent 0ce6a01 commit e4b3c89
Showing 1 changed file with 19 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import com.google.common.base.Function;
import com.google.common.base.Functions;
import com.google.common.util.concurrent.AsyncFunction;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import java.util.List;
import java.util.concurrent.TimeUnit;
Expand All @@ -28,6 +29,7 @@
import zipkin.storage.guava.GuavaSpanConsumer;

import static com.google.common.util.concurrent.Futures.transform;
import static zipkin.storage.elasticsearch.ElasticFutures.toGuava;

final class ElasticsearchSpanConsumer implements GuavaSpanConsumer {
private static final Function<Object, Void> TO_VOID = Functions.<Void>constant(null);
Expand All @@ -42,24 +44,35 @@ final class ElasticsearchSpanConsumer implements GuavaSpanConsumer {

@Override
public ListenableFuture<Void> accept(List<Span> spans) {
BulkRequestBuilder request = client.prepareBulk();
for (Span span : spans) {
request.add(createSpanIndexRequest(ApplyTimestampAndDuration.apply(span)));
if (spans.isEmpty()) return Futures.immediateFuture(null);

// Create a bulk request when there is more than one span to store
ListenableFuture<?> future;
if (spans.size() == 1) {
future = toGuava(createSpanIndexRequest(spans.get(0)).execute());
} else {
BulkRequestBuilder request = client.prepareBulk();
for (Span span : spans) {
request.add(createSpanIndexRequest(span));
}
future = toGuava(request.execute());
}
ListenableFuture<?> future = ElasticFutures.toGuava(request.execute());

if (ElasticsearchStorage.FLUSH_ON_WRITES) {
future = transform(future, new AsyncFunction() {
@Override public ListenableFuture apply(Object input) {
return ElasticFutures.toGuava(client.admin().indices()
return toGuava(client.admin().indices()
.prepareFlush(indexNameFormatter.catchAll())
.execute());
}
});
}

return transform(future, TO_VOID);
}

private IndexRequestBuilder createSpanIndexRequest(Span span) {
private IndexRequestBuilder createSpanIndexRequest(Span input) {
Span span = ApplyTimestampAndDuration.apply(input);
long indexTimestampMillis;
if (span.timestamp != null) {
indexTimestampMillis = TimeUnit.MICROSECONDS.toMillis(span.timestamp);
Expand Down

0 comments on commit e4b3c89

Please sign in to comment.