From e2173ff3660d2599e8888bf2b5bb88dab78b04c3 Mon Sep 17 00:00:00 2001 From: Adrian Cole Date: Wed, 12 Apr 2017 16:12:31 +0800 Subject: [PATCH] early spike --- .../http/ElasticsearchHttpSpanConsumer.java | 88 ++++++++++++++++++- .../http/ElasticsearchHttpSpanStore.java | 22 ++--- .../http/ElasticsearchHttpSpanStoreTest.java | 5 +- 3 files changed, 94 insertions(+), 21 deletions(-) diff --git a/zipkin-storage/elasticsearch-http/src/main/java/zipkin/storage/elasticsearch/http/ElasticsearchHttpSpanConsumer.java b/zipkin-storage/elasticsearch-http/src/main/java/zipkin/storage/elasticsearch/http/ElasticsearchHttpSpanConsumer.java index 6c03e87a81a..fa017cd8b5b 100644 --- a/zipkin-storage/elasticsearch-http/src/main/java/zipkin/storage/elasticsearch/http/ElasticsearchHttpSpanConsumer.java +++ b/zipkin-storage/elasticsearch-http/src/main/java/zipkin/storage/elasticsearch/http/ElasticsearchHttpSpanConsumer.java @@ -13,15 +13,23 @@ */ package zipkin.storage.elasticsearch.http; +import com.squareup.moshi.JsonAdapter; +import com.squareup.moshi.Moshi; import java.io.IOException; +import java.util.LinkedHashMap; +import java.util.LinkedHashSet; import java.util.List; +import java.util.Map; +import java.util.Set; import java.util.concurrent.TimeUnit; +import okio.Buffer; import zipkin.Span; import zipkin.storage.AsyncSpanConsumer; import zipkin.storage.Callback; import static zipkin.internal.ApplyTimestampAndDuration.guessTimestamp; import static zipkin.internal.Util.propagateIfFatal; +import static zipkin.storage.elasticsearch.http.ElasticsearchHttpSpanStore.SERVICE_SPAN; final class ElasticsearchHttpSpanConsumer implements AsyncSpanConsumer { @@ -39,14 +47,79 @@ final class ElasticsearchHttpSpanConsumer implements AsyncSpanConsumer { return; } try { - indexSpans(new HttpBulkSpanIndexer(es), spans).execute(callback); + HttpBulkSpanIndexer spanIndexer = new HttpBulkSpanIndexer(es); + Map> indexToServiceSpans = indexSpans(spanIndexer, spans); + if (indexToServiceSpans.isEmpty()) { + spanIndexer.execute(callback); + return; + } + HttpBulkIndexer serviceSpanIndexer = + new HttpBulkIndexer(SERVICE_SPAN, es) { + Buffer buffer = new Buffer(); + + @Override byte[] toJsonBytes(ServiceSpan serviceSpan) { + try { + adapter.toJson(buffer, serviceSpan); + return buffer.readByteArray(); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + }; + for (Map.Entry> entry : indexToServiceSpans.entrySet()) { + String index = entry.getKey(); + for (ServiceSpan serviceSpan : entry.getValue()) { + serviceSpanIndexer.add(index, serviceSpan, + serviceSpan.serviceName + "|" + serviceSpan.spanName); + } + } + spanIndexer.execute(new Callback() { + @Override public void onSuccess(Void value) { + serviceSpanIndexer.execute(callback); + } + + @Override public void onError(Throwable t) { + callback.onError(t); + } + }); } catch (Throwable t) { propagateIfFatal(t); callback.onError(t); } } - HttpBulkSpanIndexer indexSpans(HttpBulkSpanIndexer indexer, List spans) throws IOException { + static final JsonAdapter adapter = + new Moshi.Builder().build().adapter(ServiceSpan.class); + + static final class ServiceSpan { + + final String serviceName; + final String spanName; + + ServiceSpan(String serviceName, String spanName) { + this.serviceName = serviceName; + this.spanName = spanName; + } + + @Override public boolean equals(Object o) { + ServiceSpan that = (ServiceSpan) o; + return (this.serviceName.equals(that.serviceName)) + && (this.spanName.equals(that.spanName)); + } + + @Override public int hashCode() { + int h = 1; + h *= 1000003; + h ^= serviceName.hashCode(); + h *= 1000003; + h ^= spanName.hashCode(); + return h; + } + } + + Map> indexSpans(HttpBulkSpanIndexer indexer, List spans) + throws IOException { + Map> indexToServiceSpans = new LinkedHashMap<>(); for (Span span : spans) { Long timestamp = guessTimestamp(span); Long timestampMillis; @@ -54,12 +127,21 @@ HttpBulkSpanIndexer indexSpans(HttpBulkSpanIndexer indexer, List spans) th if (timestamp != null) { timestampMillis = TimeUnit.MICROSECONDS.toMillis(timestamp); index = indexNameFormatter.indexNameForTimestamp(timestampMillis); + for (String serviceName : span.serviceNames()) { + if (!span.name.isEmpty()) { + Set serviceSpans = indexToServiceSpans.get(index); + if (serviceSpans == null) { + indexToServiceSpans.put(index, serviceSpans = new LinkedHashSet<>()); + } + serviceSpans.add(new ServiceSpan(serviceName, span.name)); + } + } } else { timestampMillis = null; index = indexNameFormatter.indexNameForTimestamp(System.currentTimeMillis()); } indexer.add(index, span, timestampMillis); } - return indexer; + return indexToServiceSpans; } } diff --git a/zipkin-storage/elasticsearch-http/src/main/java/zipkin/storage/elasticsearch/http/ElasticsearchHttpSpanStore.java b/zipkin-storage/elasticsearch-http/src/main/java/zipkin/storage/elasticsearch/http/ElasticsearchHttpSpanStore.java index b3a2290bbed..3044721b5a9 100644 --- a/zipkin-storage/elasticsearch-http/src/main/java/zipkin/storage/elasticsearch/http/ElasticsearchHttpSpanStore.java +++ b/zipkin-storage/elasticsearch-http/src/main/java/zipkin/storage/elasticsearch/http/ElasticsearchHttpSpanStore.java @@ -40,6 +40,7 @@ final class ElasticsearchHttpSpanStore implements AsyncSpanStore { static final String SPAN = "span"; static final String DEPENDENCY_LINK = "dependencylink"; + static final String SERVICE_SPAN = "servicespan"; final SearchCallFactory search; final String[] allIndices; @@ -188,12 +189,8 @@ public void getRawTrace(long traceIdHigh, long traceIdLow, Callback> long beginMillis = endMillis - namesLookback; List indices = indexNameFormatter.indexNamePatternsForRange(beginMillis, endMillis); - SearchRequest.Filters filters = new SearchRequest.Filters(); - filters.addRange("timestamp_millis", beginMillis, endMillis); - SearchRequest request = SearchRequest.forIndicesAndType(indices, SPAN) - .filters(filters) - .addAggregation(Aggregation.nestedTerms("annotations.endpoint.serviceName")) - .addAggregation(Aggregation.nestedTerms("binaryAnnotations.endpoint.serviceName")); + SearchRequest request = SearchRequest.forIndicesAndType(indices, SERVICE_SPAN) + .addAggregation(Aggregation.terms("serviceName", Integer.MAX_VALUE)); search.newCall(request, BodyConverters.SORTED_KEYS).submit(callback); } @@ -208,15 +205,10 @@ public void getRawTrace(long traceIdHigh, long traceIdLow, Callback> long beginMillis = endMillis - namesLookback; List indices = indexNameFormatter.indexNamePatternsForRange(beginMillis, endMillis); - SearchRequest.Filters filters = new SearchRequest.Filters(); - filters.addRange("timestamp_millis", beginMillis, endMillis); - filters.addNestedTerms(asList( - "annotations.endpoint.serviceName", - "binaryAnnotations.endpoint.serviceName" - ), serviceName.toLowerCase(Locale.ROOT)); - SearchRequest request = SearchRequest.forIndicesAndType(indices, SPAN) - .filters(filters) - .addAggregation(Aggregation.terms("name", Integer.MAX_VALUE)); + + SearchRequest request = SearchRequest.forIndicesAndType(indices, SERVICE_SPAN) + .term("serviceName", serviceName.toLowerCase(Locale.ROOT)) + .addAggregation(Aggregation.terms("spanName", Integer.MAX_VALUE)); search.newCall(request, BodyConverters.SORTED_KEYS).submit(callback); } diff --git a/zipkin-storage/elasticsearch-http/src/test/java/zipkin/storage/elasticsearch/http/ElasticsearchHttpSpanStoreTest.java b/zipkin-storage/elasticsearch-http/src/test/java/zipkin/storage/elasticsearch/http/ElasticsearchHttpSpanStoreTest.java index 8f5a62ee293..fde2285bb1c 100644 --- a/zipkin-storage/elasticsearch-http/src/test/java/zipkin/storage/elasticsearch/http/ElasticsearchHttpSpanStoreTest.java +++ b/zipkin-storage/elasticsearch-http/src/test/java/zipkin/storage/elasticsearch/http/ElasticsearchHttpSpanStoreTest.java @@ -27,6 +27,7 @@ import static java.util.Arrays.asList; import static org.assertj.core.api.Assertions.assertThat; +import static zipkin.storage.elasticsearch.http.ElasticsearchHttpSpanStore.SERVICE_SPAN; import static zipkin.storage.elasticsearch.http.TestResponses.SERVICE_NAMES; import static zipkin.storage.elasticsearch.http.TestResponses.SPAN_NAMES; @@ -83,8 +84,6 @@ private void requestLimitedTo2DaysOfIndices() throws InterruptedException { RecordedRequest request = es.takeRequest(); assertThat(request.getPath()) - .startsWith("/" + indexesToSearch + "/span/_search"); - assertThat(request.getBody().readUtf8()) - .contains("{\"range\":{\"timestamp_millis\""); + .startsWith("/" + indexesToSearch + "/" + SERVICE_SPAN + "/_search"); } }