diff --git a/zipkin-storage/elasticsearch-http/README.md b/zipkin-storage/elasticsearch-http/README.md index ab3231a1f97..3815f4b1490 100644 --- a/zipkin-storage/elasticsearch-http/README.md +++ b/zipkin-storage/elasticsearch-http/README.md @@ -100,6 +100,36 @@ inputs are downcased. Span and service name queries default to look back 24hrs (2 index days). This can be controlled by `ElasticsearchHttpStorage.Builder.namesLookback` +#### Index format +Starting with Zipkin 1.23, service and span names are written to the +same daily indexes as spans and dependency links as the document type +"servicespan". This was added for performance reasons as formerly search +was using relatively expensive nested queries. + +The documents themselves represent service and span name pairs. Only one +document is present per daily index. This is to keep the documents from +repeating at a multiplier of span count, which also simplifies query. +This deduplication is enforced at write time by using an ID convention +of the service and span name. Ex. `id = MyServiceName|MySpanName` + +The document is a simple structure, like: +```json +{ + "serviceName": "MyServiceName", + "spanName": "MySpanName", +} +``` + +The document does replicate data in the ID, but it is needed as you +cannot query based on an ID expression. + +#### Notes for data written prior to Zipkin 1.23 +Before Zipkin 1.23, service and span names were nested queries against +the span type. This was an expensive operation, which resulted in high +latency particularly when the UI loads. When the "servicespan" type is +missing from an index, or there's no results returned, a fallback nested +query is invoked. + ## Customizing the ingest pipeline When using Elasticsearch 5.x, you can setup an [ingest pipeline](https://www.elastic.co/guide/en/elasticsearch/reference/master/pipeline.html) diff --git a/zipkin-storage/elasticsearch-http/src/main/java/zipkin/storage/elasticsearch/http/ElasticsearchHttpSpanConsumer.java b/zipkin-storage/elasticsearch-http/src/main/java/zipkin/storage/elasticsearch/http/ElasticsearchHttpSpanConsumer.java index 6c03e87a81a..674fa142983 100644 --- a/zipkin-storage/elasticsearch-http/src/main/java/zipkin/storage/elasticsearch/http/ElasticsearchHttpSpanConsumer.java +++ b/zipkin-storage/elasticsearch-http/src/main/java/zipkin/storage/elasticsearch/http/ElasticsearchHttpSpanConsumer.java @@ -13,17 +13,27 @@ */ package zipkin.storage.elasticsearch.http; +import com.squareup.moshi.JsonWriter; import java.io.IOException; +import java.util.LinkedHashMap; +import java.util.LinkedHashSet; import java.util.List; +import java.util.Map; +import java.util.Set; import java.util.concurrent.TimeUnit; +import okio.Buffer; +import zipkin.Codec; import zipkin.Span; +import zipkin.internal.Pair; import zipkin.storage.AsyncSpanConsumer; import zipkin.storage.Callback; import static zipkin.internal.ApplyTimestampAndDuration.guessTimestamp; +import static zipkin.internal.Util.UTF_8; import static zipkin.internal.Util.propagateIfFatal; +import static zipkin.storage.elasticsearch.http.ElasticsearchHttpSpanStore.SERVICE_SPAN; -final class ElasticsearchHttpSpanConsumer implements AsyncSpanConsumer { +class ElasticsearchHttpSpanConsumer implements AsyncSpanConsumer { // not final for testing final ElasticsearchHttpStorage es; final IndexNameFormatter indexNameFormatter; @@ -39,14 +49,21 @@ final class ElasticsearchHttpSpanConsumer implements AsyncSpanConsumer { return; } try { - indexSpans(new HttpBulkSpanIndexer(es), spans).execute(callback); + HttpBulkIndexer indexer = new HttpBulkIndexer("index-span", es); + Map>> indexToServiceSpans = indexSpans(indexer, spans); + if (!indexToServiceSpans.isEmpty()) { + indexNames(indexer, indexToServiceSpans); + } + indexer.execute(callback); } catch (Throwable t) { propagateIfFatal(t); callback.onError(t); } } - HttpBulkSpanIndexer indexSpans(HttpBulkSpanIndexer indexer, List spans) throws IOException { + /** Indexes spans and returns a mapping of indexes that may need a names update */ + Map>> indexSpans(HttpBulkIndexer indexer, List spans) { + Map>> indexToServiceSpans = new LinkedHashMap<>(); for (Span span : spans) { Long timestamp = guessTimestamp(span); Long timestampMillis; @@ -54,12 +71,68 @@ HttpBulkSpanIndexer indexSpans(HttpBulkSpanIndexer indexer, List spans) th if (timestamp != null) { timestampMillis = TimeUnit.MICROSECONDS.toMillis(timestamp); index = indexNameFormatter.indexNameForTimestamp(timestampMillis); + if (!span.name.isEmpty()) putServiceSpans(indexToServiceSpans, index, span); } else { timestampMillis = null; index = indexNameFormatter.indexNameForTimestamp(System.currentTimeMillis()); } - indexer.add(index, span, timestampMillis); + byte[] document = Codec.JSON.writeSpan(span); + if (timestampMillis != null) document = prefixWithTimestampMillis(document, timestampMillis); + indexer.add(index, ElasticsearchHttpSpanStore.SPAN, document, null /* Allow ES to choose an ID */); + } + return indexToServiceSpans; + } + + void putServiceSpans(Map>> indexToServiceSpans, String index, Span s) { + Set> serviceSpans = indexToServiceSpans.get(index); + if (serviceSpans == null) indexToServiceSpans.put(index, serviceSpans = new LinkedHashSet<>()); + for (String serviceName : s.serviceNames()) { + serviceSpans.add(Pair.create(serviceName, s.name)); + } + } + + /** + * Adds service and span names to the pending batch. The id is "serviceName|spanName" to prevent + * a large order of duplicates ending up in the daily index. This also means queries do not need + * to deduplicate. + */ + void indexNames(HttpBulkIndexer indexer, Map>> indexToServiceSpans) + throws IOException { + Buffer buffer = new Buffer(); + for (Map.Entry>> entry : indexToServiceSpans.entrySet()) { + String index = entry.getKey(); + for (Pair serviceSpan : entry.getValue()) { + JsonWriter writer = JsonWriter.of(buffer); + writer.beginObject(); + writer.name("serviceName").value(serviceSpan._1); + writer.name("spanName").value(serviceSpan._2); + writer.endObject(); + byte[] document = buffer.readByteArray(); + indexer.add(index, SERVICE_SPAN, document, serviceSpan._1 + "|" + serviceSpan._2); + } + } + } + + private static final byte[] TIMESTAMP_MILLIS_PREFIX = "{\"timestamp_millis\":".getBytes(UTF_8); + + /** + * In order to allow systems like Kibana to search by timestamp, we add a field "timestamp_millis" + * when storing. The cheapest way to do this without changing the codec is prefixing it to the + * json. For example. {"traceId":"... becomes {"timestamp_millis":12345,"traceId":"... + */ + static byte[] prefixWithTimestampMillis(byte[] input, long timestampMillis) { + String dateAsString = Long.toString(timestampMillis); + byte[] newSpanBytes = + new byte[TIMESTAMP_MILLIS_PREFIX.length + dateAsString.length() + input.length]; + int pos = 0; + System.arraycopy(TIMESTAMP_MILLIS_PREFIX, 0, newSpanBytes, pos, TIMESTAMP_MILLIS_PREFIX.length); + pos += TIMESTAMP_MILLIS_PREFIX.length; + for (int i = 0, length = dateAsString.length(); i < length; i++) { + newSpanBytes[pos++] = (byte) dateAsString.charAt(i); } - return indexer; + newSpanBytes[pos++] = ','; + // starting at position 1 discards the old head of '{' + System.arraycopy(input, 1, newSpanBytes, pos, input.length - 1); + return newSpanBytes; } } diff --git a/zipkin-storage/elasticsearch-http/src/main/java/zipkin/storage/elasticsearch/http/ElasticsearchHttpSpanStore.java b/zipkin-storage/elasticsearch-http/src/main/java/zipkin/storage/elasticsearch/http/ElasticsearchHttpSpanStore.java index b3a2290bbed..035388f13c0 100644 --- a/zipkin-storage/elasticsearch-http/src/main/java/zipkin/storage/elasticsearch/http/ElasticsearchHttpSpanStore.java +++ b/zipkin-storage/elasticsearch-http/src/main/java/zipkin/storage/elasticsearch/http/ElasticsearchHttpSpanStore.java @@ -40,6 +40,7 @@ final class ElasticsearchHttpSpanStore implements AsyncSpanStore { static final String SPAN = "span"; static final String DEPENDENCY_LINK = "dependencylink"; + static final String SERVICE_SPAN = "servicespan"; final SearchCallFactory search; final String[] allIndices; @@ -188,14 +189,28 @@ public void getRawTrace(long traceIdHigh, long traceIdLow, Callback> long beginMillis = endMillis - namesLookback; List indices = indexNameFormatter.indexNamePatternsForRange(beginMillis, endMillis); - SearchRequest.Filters filters = new SearchRequest.Filters(); - filters.addRange("timestamp_millis", beginMillis, endMillis); - SearchRequest request = SearchRequest.forIndicesAndType(indices, SPAN) - .filters(filters) - .addAggregation(Aggregation.nestedTerms("annotations.endpoint.serviceName")) - .addAggregation(Aggregation.nestedTerms("binaryAnnotations.endpoint.serviceName")); + SearchRequest request = SearchRequest.forIndicesAndType(indices, SERVICE_SPAN) + .addAggregation(Aggregation.terms("serviceName", Integer.MAX_VALUE)); + + search.newCall(request, BodyConverters.SORTED_KEYS).submit(new Callback>() { + @Override public void onSuccess(List value) { + if (!value.isEmpty()) callback.onSuccess(value); + + // Special cased code until sites update their collectors. What this does is do a more + // expensive nested query to get service names when the servicespan type returns nothing. + SearchRequest.Filters filters = new SearchRequest.Filters(); + filters.addRange("timestamp_millis", beginMillis, endMillis); + SearchRequest request = SearchRequest.forIndicesAndType(indices, SPAN) + .filters(filters) + .addAggregation(Aggregation.nestedTerms("annotations.endpoint.serviceName")) + .addAggregation(Aggregation.nestedTerms("binaryAnnotations.endpoint.serviceName")); + search.newCall(request, BodyConverters.SORTED_KEYS).submit(callback); + } - search.newCall(request, BodyConverters.SORTED_KEYS).submit(callback); + @Override public void onError(Throwable t) { + callback.onError(t); + } + }); } @Override public void getSpanNames(String serviceName, Callback> callback) { @@ -208,17 +223,33 @@ public void getRawTrace(long traceIdHigh, long traceIdLow, Callback> long beginMillis = endMillis - namesLookback; List indices = indexNameFormatter.indexNamePatternsForRange(beginMillis, endMillis); - SearchRequest.Filters filters = new SearchRequest.Filters(); - filters.addRange("timestamp_millis", beginMillis, endMillis); - filters.addNestedTerms(asList( - "annotations.endpoint.serviceName", - "binaryAnnotations.endpoint.serviceName" - ), serviceName.toLowerCase(Locale.ROOT)); - SearchRequest request = SearchRequest.forIndicesAndType(indices, SPAN) - .filters(filters) - .addAggregation(Aggregation.terms("name", Integer.MAX_VALUE)); - - search.newCall(request, BodyConverters.SORTED_KEYS).submit(callback); + + SearchRequest request = SearchRequest.forIndicesAndType(indices, SERVICE_SPAN) + .term("serviceName", serviceName.toLowerCase(Locale.ROOT)) + .addAggregation(Aggregation.terms("spanName", Integer.MAX_VALUE)); + + search.newCall(request, BodyConverters.SORTED_KEYS).submit(new Callback>() { + @Override public void onSuccess(List value) { + if (!value.isEmpty()) callback.onSuccess(value); + + // Special cased code until sites update their collectors. What this does is do a more + // expensive nested query to get span names when the servicespan type returns nothing. + SearchRequest.Filters filters = new SearchRequest.Filters(); + filters.addRange("timestamp_millis", beginMillis, endMillis); + filters.addNestedTerms(asList( + "annotations.endpoint.serviceName", + "binaryAnnotations.endpoint.serviceName" + ), serviceName.toLowerCase(Locale.ROOT)); + SearchRequest request = SearchRequest.forIndicesAndType(indices, SPAN) + .filters(filters) + .addAggregation(Aggregation.terms("name", Integer.MAX_VALUE)); + search.newCall(request, BodyConverters.SORTED_KEYS).submit(callback); + } + + @Override public void onError(Throwable t) { + callback.onError(t); + } + }); } @Override public void getDependencies(long endTs, @Nullable Long lookback, diff --git a/zipkin-storage/elasticsearch-http/src/main/java/zipkin/storage/elasticsearch/http/HttpBulkIndexer.java b/zipkin-storage/elasticsearch-http/src/main/java/zipkin/storage/elasticsearch/http/HttpBulkIndexer.java index e3f8e92ae59..a3a02d511fc 100644 --- a/zipkin-storage/elasticsearch-http/src/main/java/zipkin/storage/elasticsearch/http/HttpBulkIndexer.java +++ b/zipkin-storage/elasticsearch-http/src/main/java/zipkin/storage/elasticsearch/http/HttpBulkIndexer.java @@ -29,8 +29,7 @@ // See https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-bulk.html // exposed to re-use for testing writes of dependency links -abstract class HttpBulkIndexer { - final String typeName; +final class HttpBulkIndexer { final String tag; final HttpCall.Factory http; final String pipeline; @@ -40,22 +39,20 @@ abstract class HttpBulkIndexer { final Buffer body = new Buffer(); final Set indices = new LinkedHashSet<>(); - HttpBulkIndexer(String typeName, ElasticsearchHttpStorage es) { - this.typeName = typeName; - tag = "index-" + typeName; + HttpBulkIndexer(String tag, ElasticsearchHttpStorage es) { + this.tag = tag; http = es.http(); pipeline = es.pipeline(); flushOnWrites = es.flushOnWrites(); } - void add(String index, T object, @Nullable String id) { - writeIndexMetadata(index, id); - writeDocument(object); - - if (flushOnWrites) indices.add(index); + void add(String index, String typeName, byte[] document, @Nullable String id) { + writeIndexMetadata(index, typeName, id); + writeDocument(document); } - void writeIndexMetadata(String index, @Nullable String id) { + void writeIndexMetadata(String index, String typeName, @Nullable String id) { + if (flushOnWrites) indices.add(index); body.writeUtf8("{\"index\":{\"_index\":\"").writeUtf8(index).writeByte('"'); body.writeUtf8(",\"_type\":\"").writeUtf8(typeName).writeByte('"'); if (id != null) { @@ -64,13 +61,11 @@ void writeIndexMetadata(String index, @Nullable String id) { body.writeUtf8("}}\n"); } - void writeDocument(T object) { - body.write(toJsonBytes(object)); + void writeDocument(byte[] document) { + body.write(document); body.writeByte('\n'); } - abstract byte[] toJsonBytes(T object); - /** Creates a bulk request when there is more than one object to store */ void execute(Callback callback) { HttpUrl url = pipeline != null diff --git a/zipkin-storage/elasticsearch-http/src/main/java/zipkin/storage/elasticsearch/http/HttpBulkSpanIndexer.java b/zipkin-storage/elasticsearch-http/src/main/java/zipkin/storage/elasticsearch/http/HttpBulkSpanIndexer.java deleted file mode 100644 index 34778790fe5..00000000000 --- a/zipkin-storage/elasticsearch-http/src/main/java/zipkin/storage/elasticsearch/http/HttpBulkSpanIndexer.java +++ /dev/null @@ -1,78 +0,0 @@ -/** - * Copyright 2015-2017 The OpenZipkin Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except - * in compliance with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the License - * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express - * or implied. See the License for the specific language governing permissions and limitations under - * the License. - */ -package zipkin.storage.elasticsearch.http; - -import zipkin.Codec; -import zipkin.Span; - -import static zipkin.internal.Util.UTF_8; - -final class HttpBulkSpanIndexer extends HttpBulkIndexer { - - HttpBulkSpanIndexer(ElasticsearchHttpStorage es) { - super("span", es); - } - - /** - * In order to allow systems like Kibana to search by timestamp, we add a field - * "timestamp_millis" when storing a span that has a timestamp. The cheapest way to do this - * without changing the codec is prefixing it to the json. - * - *

For example. {"traceId":".. becomes {"timestamp_millis":12345,"traceId":"... - */ - HttpBulkSpanIndexer add(String index, Span span, Long timestampMillis) { - String id = null; // Allow ES to choose an ID - if (timestampMillis == null) { - super.add(index, span, id); - return this; - } - writeIndexMetadata(index, id); - if (timestampMillis != null) { - body.write(prefixWithTimestampMillis(toJsonBytes(span), timestampMillis)); - } else { - body.write(toJsonBytes(span)); - } - body.writeByte('\n'); - - if (flushOnWrites) indices.add(index); - return this; - } - - @Override byte[] toJsonBytes(Span span) { - return Codec.JSON.writeSpan(span); - } - - private static final byte[] TIMESTAMP_MILLIS_PREFIX = "{\"timestamp_millis\":".getBytes(UTF_8); - - /** - * In order to allow systems like Kibana to search by timestamp, we add a field "timestamp_millis" - * when storing. The cheapest way to do this without changing the codec is prefixing it to the - * json. For example. {"traceId":"... becomes {"timestamp_millis":12345,"traceId":"... - */ - static byte[] prefixWithTimestampMillis(byte[] input, long timestampMillis) { - String dateAsString = Long.toString(timestampMillis); - byte[] newSpanBytes = - new byte[TIMESTAMP_MILLIS_PREFIX.length + dateAsString.length() + input.length]; - int pos = 0; - System.arraycopy(TIMESTAMP_MILLIS_PREFIX, 0, newSpanBytes, pos, TIMESTAMP_MILLIS_PREFIX.length); - pos += TIMESTAMP_MILLIS_PREFIX.length; - for (int i = 0, length = dateAsString.length(); i < length; i++) { - newSpanBytes[pos++] = (byte) dateAsString.charAt(i); - } - newSpanBytes[pos++] = ','; - // starting at position 1 discards the old head of '{' - System.arraycopy(input, 1, newSpanBytes, pos, input.length - 1); - return newSpanBytes; - } -} diff --git a/zipkin-storage/elasticsearch-http/src/main/java/zipkin/storage/elasticsearch/http/VersionSpecificTemplate.java b/zipkin-storage/elasticsearch-http/src/main/java/zipkin/storage/elasticsearch/http/VersionSpecificTemplate.java index 09080f7b898..7c9c893a810 100644 --- a/zipkin-storage/elasticsearch-http/src/main/java/zipkin/storage/elasticsearch/http/VersionSpecificTemplate.java +++ b/zipkin-storage/elasticsearch-http/src/main/java/zipkin/storage/elasticsearch/http/VersionSpecificTemplate.java @@ -99,7 +99,13 @@ final class VersionSpecificTemplate { + " }\n" + " }\n" + " },\n" - + " \"" + ElasticsearchHttpSpanStore.DEPENDENCY_LINK + "\": { \"enabled\": false }\n" + + " \"" + ElasticsearchHttpSpanStore.DEPENDENCY_LINK + "\": { \"enabled\": false },\n" + + " \"" + ElasticsearchHttpSpanStore.SERVICE_SPAN + "\": {\n" + + " \"properties\": {\n" + + " \"serviceName\": { KEYWORD },\n" + + " \"spanName\": { KEYWORD }\n" + + " }\n" + + " }\n" + " }\n" + "}"; diff --git a/zipkin-storage/elasticsearch-http/src/test/java/zipkin/storage/elasticsearch/http/ElasticsearchHttpSpanConsumerTest.java b/zipkin-storage/elasticsearch-http/src/test/java/zipkin/storage/elasticsearch/http/ElasticsearchHttpSpanConsumerTest.java new file mode 100644 index 00000000000..02cfdf299d6 --- /dev/null +++ b/zipkin-storage/elasticsearch-http/src/test/java/zipkin/storage/elasticsearch/http/ElasticsearchHttpSpanConsumerTest.java @@ -0,0 +1,176 @@ +/** + * Copyright 2015-2017 The OpenZipkin Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package zipkin.storage.elasticsearch.http; + +import java.io.IOException; +import okhttp3.mockwebserver.MockResponse; +import okhttp3.mockwebserver.MockWebServer; +import okhttp3.mockwebserver.RecordedRequest; +import org.junit.After; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; +import zipkin.Codec; +import zipkin.Span; +import zipkin.TestObjects; +import zipkin.internal.CallbackCaptor; + +import static java.util.Arrays.asList; +import static org.assertj.core.api.Assertions.assertThat; +import static zipkin.TestObjects.TODAY; +import static zipkin.internal.Util.UTF_8; +import static zipkin.storage.elasticsearch.http.ElasticsearchHttpSpanConsumer.prefixWithTimestampMillis; + +public class ElasticsearchHttpSpanConsumerTest { + @Rule + public ExpectedException thrown = ExpectedException.none(); + @Rule + public MockWebServer es = new MockWebServer(); + + ElasticsearchHttpStorage storage = ElasticsearchHttpStorage.builder() + .hosts(asList(es.url("").toString())) + .build(); + + String index = storage.indexNameFormatter().indexNameForTimestamp(TODAY); + + /** gets the index template so that each test doesn't have to */ + @Before + public void ensureIndexTemplate() throws IOException, InterruptedException { + es.enqueue(new MockResponse().setBody("{\"version\":{\"number\":\"2.4.0\"}}")); + es.enqueue(new MockResponse()); // get template + storage.ensureIndexTemplate(); + es.takeRequest(); // get version + es.takeRequest(); // get template + } + + @After + public void close() throws IOException { + storage.close(); + } + + @Test + public void addsTimestamp_millisIntoJson() throws Exception { + es.enqueue(new MockResponse()); + + Span span = Span.builder().traceId(20L).id(20L).name("get") + .timestamp(TODAY * 1000).build(); + + accept(span); + + assertThat(es.takeRequest().getBody().readUtf8()) + .contains("\n{\"timestamp_millis\":" + Long.toString(TODAY) + ",\"traceId\":"); + } + + @Test + public void prefixWithTimestampMillis_readable() throws Exception { + Span span = Span.builder().traceId(20L).id(20L).name("get") + .timestamp(TODAY * 1000).build(); + + byte[] document = prefixWithTimestampMillis(Codec.JSON.writeSpan(span), span.timestamp); + assertThat(Codec.JSON.readSpan(document)) + .isEqualTo(span); // ignores timestamp_millis field + } + + @Test + public void doesntWriteSpanId() throws Exception { + es.enqueue(new MockResponse()); + + accept(TestObjects.LOTS_OF_SPANS[0]); + + RecordedRequest request = es.takeRequest(); + assertThat(request.getBody().readByteString().utf8()) + .doesNotContain("\"_type\":\"span\",\"_id\""); + } + + @Test + public void writesSpanNaturallyWhenNoTimestamp() throws Exception { + es.enqueue(new MockResponse()); + + Span span = Span.builder().traceId(1L).id(1L).name("foo").build(); + accept(span); + + assertThat(es.takeRequest().getBody().readByteString().utf8()) + .contains("\n" + new String(Codec.JSON.writeSpan(span), UTF_8) + "\n"); + } + + @Test + public void indexesServiceSpan_explicitTimestamp() throws Exception { + es.enqueue(new MockResponse()); + + Span span = TestObjects.TRACE.get(0); + accept(span); + + assertThat(es.takeRequest().getBody().readByteString().utf8()).endsWith( + "\"_type\":\"servicespan\",\"_id\":\"web|get\"}}\n" + + "{\"serviceName\":\"web\",\"spanName\":\"get\"}\n" + ); + } + + @Test + public void indexesServiceSpan_multipleServices() throws Exception { + es.enqueue(new MockResponse()); + + Span span = TestObjects.TRACE.get(1); + accept(span); + + assertThat(es.takeRequest().getBody().readByteString().utf8()) + .contains( + "\"_type\":\"servicespan\",\"_id\":\"app|get\"}}\n" + + "{\"serviceName\":\"app\",\"spanName\":\"get\"}\n" + ) + .contains( + "\"_type\":\"servicespan\",\"_id\":\"web|get\"}}\n" + + "{\"serviceName\":\"web\",\"spanName\":\"get\"}\n" + ); + } + + @Test + public void indexesServiceSpan_implicitTimestamp() throws Exception { + es.enqueue(new MockResponse()); + + Span span = TestObjects.LOTS_OF_SPANS[0]; + accept(span); + + assertThat(es.takeRequest().getBody().readByteString().utf8()).endsWith( + "\"_type\":\"servicespan\",\"_id\":\"service|get\"}}\n" + + "{\"serviceName\":\"service\",\"spanName\":\"get\"}\n" + ); + } + + @Test + public void addsPipelineId() throws Exception { + close(); + + storage = ElasticsearchHttpStorage.builder() + .hosts(asList(es.url("").toString())) + .pipeline("zipkin") + .build(); + ensureIndexTemplate(); + + es.enqueue(new MockResponse()); + + accept(TestObjects.TRACE.get(0)); + + RecordedRequest request = es.takeRequest(); + assertThat(request.getPath()) + .isEqualTo("/_bulk?pipeline=zipkin"); + } + + void accept(Span span) throws Exception { + CallbackCaptor callback = new CallbackCaptor<>(); + storage.asyncSpanConsumer().accept(asList(span), callback); + callback.get(); + } +} diff --git a/zipkin-storage/elasticsearch-http/src/test/java/zipkin/storage/elasticsearch/http/ElasticsearchHttpSpanStoreTest.java b/zipkin-storage/elasticsearch-http/src/test/java/zipkin/storage/elasticsearch/http/ElasticsearchHttpSpanStoreTest.java index 8f5a62ee293..fde2285bb1c 100644 --- a/zipkin-storage/elasticsearch-http/src/test/java/zipkin/storage/elasticsearch/http/ElasticsearchHttpSpanStoreTest.java +++ b/zipkin-storage/elasticsearch-http/src/test/java/zipkin/storage/elasticsearch/http/ElasticsearchHttpSpanStoreTest.java @@ -27,6 +27,7 @@ import static java.util.Arrays.asList; import static org.assertj.core.api.Assertions.assertThat; +import static zipkin.storage.elasticsearch.http.ElasticsearchHttpSpanStore.SERVICE_SPAN; import static zipkin.storage.elasticsearch.http.TestResponses.SERVICE_NAMES; import static zipkin.storage.elasticsearch.http.TestResponses.SPAN_NAMES; @@ -83,8 +84,6 @@ private void requestLimitedTo2DaysOfIndices() throws InterruptedException { RecordedRequest request = es.takeRequest(); assertThat(request.getPath()) - .startsWith("/" + indexesToSearch + "/span/_search"); - assertThat(request.getBody().readUtf8()) - .contains("{\"range\":{\"timestamp_millis\""); + .startsWith("/" + indexesToSearch + "/" + SERVICE_SPAN + "/_search"); } } diff --git a/zipkin-storage/elasticsearch-http/src/test/java/zipkin/storage/elasticsearch/http/HttpBulkSpanIndexerTest.java b/zipkin-storage/elasticsearch-http/src/test/java/zipkin/storage/elasticsearch/http/HttpBulkSpanIndexerTest.java deleted file mode 100644 index 47f7c82e41a..00000000000 --- a/zipkin-storage/elasticsearch-http/src/test/java/zipkin/storage/elasticsearch/http/HttpBulkSpanIndexerTest.java +++ /dev/null @@ -1,116 +0,0 @@ -/** - * Copyright 2015-2017 The OpenZipkin Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except - * in compliance with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the License - * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express - * or implied. See the License for the specific language governing permissions and limitations under - * the License. - */ -package zipkin.storage.elasticsearch.http; - -import java.io.IOException; -import okhttp3.mockwebserver.MockResponse; -import okhttp3.mockwebserver.MockWebServer; -import okhttp3.mockwebserver.RecordedRequest; -import org.junit.After; -import org.junit.Rule; -import org.junit.Test; -import org.junit.rules.ExpectedException; -import zipkin.Codec; -import zipkin.Span; -import zipkin.TestObjects; -import zipkin.internal.CallbackCaptor; - -import static java.util.Arrays.asList; -import static org.assertj.core.api.Assertions.assertThat; -import static zipkin.TestObjects.TODAY; -import static zipkin.internal.Util.UTF_8; - -public class HttpBulkSpanIndexerTest { - @Rule - public ExpectedException thrown = ExpectedException.none(); - @Rule - public MockWebServer es = new MockWebServer(); - - CallbackCaptor callback = new CallbackCaptor<>(); - - ElasticsearchHttpStorage storage = ElasticsearchHttpStorage.builder() - .hosts(asList(es.url("").toString())) - .build(); - - HttpBulkSpanIndexer indexer = new HttpBulkSpanIndexer(storage); - - @After - public void close() throws IOException { - storage.close(); - } - - @Test - public void prefixWithTimestampMillis() { - Span span = Span.builder().traceId(20L).id(20L).name("get") - .timestamp(TODAY * 1000).build(); - - byte[] result = - HttpBulkSpanIndexer.prefixWithTimestampMillis(Codec.JSON.writeSpan(span), TODAY); - - String json = new String(result); - assertThat(json) - .startsWith("{\"timestamp_millis\":" + Long.toString(TODAY) + ",\"traceId\":"); - - assertThat(Codec.JSON.readSpan(json.getBytes())) - .isEqualTo(span); // ignores timestamp_millis field - } - - @Test - public void doesntWriteSpanId() throws Exception { - es.enqueue(new MockResponse()); - - indexer.add("test_zipkin_http-2016-10-01", TestObjects.LOTS_OF_SPANS[0], (Long) null); - indexer.execute(callback); - callback.get(); - - RecordedRequest request = es.takeRequest(); - assertThat(request.getBody().readByteString().utf8()) - .doesNotContain("\"_id\""); - } - - @Test - public void writesSpanNaturallyWhenNoTimestamp() throws Exception { - es.enqueue(new MockResponse()); - - indexer.add("test_zipkin_http-2016-10-01", TestObjects.LOTS_OF_SPANS[0], (Long) null); - indexer.execute(callback); - callback.get(); - - RecordedRequest request = es.takeRequest(); - assertThat(request.getBody().readByteString().utf8()) - .endsWith(new String(Codec.JSON.writeSpan(TestObjects.LOTS_OF_SPANS[0]), UTF_8) + "\n"); - } - - @Test - public void addsPipelineId() throws Exception { - close(); - - indexer = new HttpBulkSpanIndexer(storage = ElasticsearchHttpStorage.builder() - .hosts(asList(es.url("").toString())) - .pipeline("zipkin") - .build()); - - es.enqueue(new MockResponse()); - - CallbackCaptor callback = new CallbackCaptor<>(); - indexer - .add("zipkin-2016-10-01", TestObjects.TRACE.get(0), (Long) null) - .execute(callback); - callback.get(); - - RecordedRequest request = es.takeRequest(); - assertThat(request.getPath()) - .isEqualTo("/_bulk?pipeline=zipkin"); - } -} diff --git a/zipkin-storage/elasticsearch-http/src/test/java/zipkin/storage/elasticsearch/http/InternalForTests.java b/zipkin-storage/elasticsearch-http/src/test/java/zipkin/storage/elasticsearch/http/InternalForTests.java index b5849d4d5a7..b12fff78ada 100644 --- a/zipkin-storage/elasticsearch-http/src/test/java/zipkin/storage/elasticsearch/http/InternalForTests.java +++ b/zipkin-storage/elasticsearch-http/src/test/java/zipkin/storage/elasticsearch/http/InternalForTests.java @@ -15,9 +15,13 @@ import java.io.IOException; import java.util.List; +import java.util.Map; +import java.util.Set; import zipkin.Codec; import zipkin.DependencyLink; import zipkin.internal.CallbackCaptor; +import zipkin.internal.Pair; +import zipkin.storage.AsyncSpanConsumer; import static zipkin.storage.elasticsearch.http.ElasticsearchHttpSpanStore.DEPENDENCY_LINK; @@ -26,14 +30,11 @@ public class InternalForTests { public static void writeDependencyLinks(ElasticsearchHttpStorage es, List links, long midnightUTC) { String index = es.indexNameFormatter().indexNameForTimestamp(midnightUTC); - HttpBulkIndexer indexer = - new HttpBulkIndexer(DEPENDENCY_LINK, es) { - @Override byte[] toJsonBytes(DependencyLink link) { - return Codec.JSON.writeDependencyLink(link); - } - }; + HttpBulkIndexer indexer = new HttpBulkIndexer("index-links", es); for (DependencyLink link : links) { - indexer.add(index, link, link.parent + "|" + link.child); // Unique constraint + byte[] document = Codec.JSON.writeDependencyLink(link); + indexer.add(index, DEPENDENCY_LINK, document, + link.parent + "|" + link.child); // Unique constraint } CallbackCaptor callback = new CallbackCaptor<>(); indexer.execute(callback); @@ -47,4 +48,13 @@ public static void clear(ElasticsearchHttpStorage es) throws IOException { public static void flushOnWrites(ElasticsearchHttpStorage.Builder builder) { builder.flushOnWrites(true); } + + /** The old consumer didn't write to the "servicespan" type on ingest. */ + public static AsyncSpanConsumer oldConsumer(ElasticsearchHttpStorage es) { + es.ensureIndexTemplate(); + return new ElasticsearchHttpSpanConsumer(es) { + @Override void indexNames(HttpBulkIndexer ignored, Map>> ignored2) { + } + }; + } } diff --git a/zipkin-storage/elasticsearch-http/src/test/java/zipkin/storage/elasticsearch/http/integration/ElasticsearchHttpNamesFallbackTest.java b/zipkin-storage/elasticsearch-http/src/test/java/zipkin/storage/elasticsearch/http/integration/ElasticsearchHttpNamesFallbackTest.java new file mode 100644 index 00000000000..aadcf9afbc7 --- /dev/null +++ b/zipkin-storage/elasticsearch-http/src/test/java/zipkin/storage/elasticsearch/http/integration/ElasticsearchHttpNamesFallbackTest.java @@ -0,0 +1,63 @@ +/** + * Copyright 2015-2017 The OpenZipkin Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package zipkin.storage.elasticsearch.http.integration; + +import java.io.IOException; +import java.util.List; +import org.junit.Before; +import org.junit.Test; +import zipkin.Span; +import zipkin.TestObjects; +import zipkin.internal.CallbackCaptor; +import zipkin.storage.elasticsearch.http.ElasticsearchHttpStorage; +import zipkin.storage.elasticsearch.http.InternalForTests; + +import static org.assertj.core.api.Assertions.assertThat; + +abstract class ElasticsearchHttpNamesFallbackTest { + + /** Should maintain state between multiple calls within a test. */ + abstract ElasticsearchHttpStorage storage(); + + /** Setup test data which has doesnt map the "servicespan" type */ + @Before + public void clear() throws IOException { + InternalForTests.clear(storage()); + CallbackCaptor callback = new CallbackCaptor<>(); + InternalForTests.oldConsumer(storage()).accept(TestObjects.TRACE, callback); + callback.get(); + } + + @Test + public void getServiceNames() throws Exception { + accept(TestObjects.TRACE); + + assertThat(storage().spanStore().getServiceNames()) + .containsExactly("app", "db", "no_ip", "web"); + } + + @Test + public void getSpanNames() throws Exception { + accept(TestObjects.TRACE); + + assertThat(storage().spanStore().getSpanNames("app")) + .containsExactly("get", "query"); + } + + void accept(List trace) throws Exception { + CallbackCaptor callback = new CallbackCaptor<>(); + InternalForTests.oldConsumer(storage()).accept(trace, callback); + callback.get(); + } +} diff --git a/zipkin-storage/elasticsearch-http/src/test/java/zipkin/storage/elasticsearch/http/integration/ElasticsearchHttpSpanConsumerTest.java b/zipkin-storage/elasticsearch-http/src/test/java/zipkin/storage/elasticsearch/http/integration/ElasticsearchHttpSpanConsumerTest.java index 48aeea13f1d..78f3b1770b7 100644 --- a/zipkin-storage/elasticsearch-http/src/test/java/zipkin/storage/elasticsearch/http/integration/ElasticsearchHttpSpanConsumerTest.java +++ b/zipkin-storage/elasticsearch-http/src/test/java/zipkin/storage/elasticsearch/http/integration/ElasticsearchHttpSpanConsumerTest.java @@ -82,6 +82,32 @@ String findSpans(long endTs, long traceId) throws IOException { .get().build()).execute().body().string(); } + @Test + public void serviceSpanGoesIntoADailyIndex_whenTimestampIsDerived() throws Exception { + long twoDaysAgo = (TODAY - 2 * DAY); + + Span span = Span.builder().traceId(20L).id(20L).name("get") + .addAnnotation(Annotation.create(twoDaysAgo * 1000, SERVER_RECV, WEB_ENDPOINT)) + .addAnnotation(Annotation.create(TODAY * 1000, SERVER_SEND, WEB_ENDPOINT)) + .build(); + + accept(span); + + // make sure the servicespan went into an index corresponding to its first annotation timestamp + assertThat(findServiceSpan(twoDaysAgo, WEB_ENDPOINT.serviceName)) + .contains("\"hits\":{\"total\":1"); + } + + String findServiceSpan(long endTs, String serviceName) throws IOException { + return new OkHttpClient().newCall(new Request.Builder().url( + HttpUrl.parse(baseUrl()).newBuilder() + .addPathSegment(INDEX + "-" + dateFormat.format(new Date(endTs))) + .addPathSegment("servicespan") + .addPathSegment("_search") + .addQueryParameter("q", "serviceName:" + serviceName).build()) + .get().build()).execute().body().string(); + } + @Test public void spanGoesIntoADailyIndex_whenTimestampIsExplicit() throws Exception { long twoDaysAgo = (TODAY - 2 * DAY); diff --git a/zipkin-storage/elasticsearch-http/src/test/java/zipkin/storage/elasticsearch/http/integration/ElasticsearchHttpV2Test.java b/zipkin-storage/elasticsearch-http/src/test/java/zipkin/storage/elasticsearch/http/integration/ElasticsearchHttpV2Test.java index 2650266c654..0049f211b76 100644 --- a/zipkin-storage/elasticsearch-http/src/test/java/zipkin/storage/elasticsearch/http/integration/ElasticsearchHttpV2Test.java +++ b/zipkin-storage/elasticsearch-http/src/test/java/zipkin/storage/elasticsearch/http/integration/ElasticsearchHttpV2Test.java @@ -17,7 +17,6 @@ import org.junit.ClassRule; import org.junit.experimental.runners.Enclosed; import org.junit.runner.RunWith; -import zipkin.storage.SpanStoreTest; import zipkin.storage.elasticsearch.http.ElasticsearchHttpStorage; import zipkin.storage.elasticsearch.http.InternalForTests; @@ -28,17 +27,13 @@ public class ElasticsearchHttpV2Test { public static LazyElasticsearchHttpStorage storage = new LazyElasticsearchHttpStorage("openzipkin/zipkin-elasticsearch:1.19.2"); - public static class HttpDependenciesTest - extends ElasticsearchHttpDependenciesTest { - + public static class DependenciesTest extends ElasticsearchHttpDependenciesTest { @Override protected ElasticsearchHttpStorage storage() { return storage.get(); } } - public static class HttpSpanConsumerTest - extends ElasticsearchHttpSpanConsumerTest { - + public static class SpanConsumerTest extends ElasticsearchHttpSpanConsumerTest { @Override protected ElasticsearchHttpStorage storage() { return storage.get(); } @@ -48,22 +43,25 @@ public static class HttpSpanConsumerTest } } - public static class ElasticsearchSpanStoreTest extends SpanStoreTest { - + public static class SpanStoreTest extends zipkin.storage.SpanStoreTest { @Override protected ElasticsearchHttpStorage storage() { return storage.get(); } @Override public void clear() throws IOException { - InternalForTests.clear(storage.get()); + InternalForTests.clear(storage()); } } - public static class HttpStrictTraceIdFalseTest - extends ElasticsearchHttpStrictTraceIdFalseTest { - + public static class StrictTraceIdFalseTest extends ElasticsearchHttpStrictTraceIdFalseTest { @Override protected ElasticsearchHttpStorage.Builder storageBuilder() { return ElasticsearchHttpV2Test.storage.computeStorageBuilder(); } } + + public static class NamesFallbackTest extends ElasticsearchHttpNamesFallbackTest { + @Override protected ElasticsearchHttpStorage storage() { + return storage.get(); + } + } } diff --git a/zipkin-storage/elasticsearch-http/src/test/java/zipkin/storage/elasticsearch/http/integration/ElasticsearchHttpV5Test.java b/zipkin-storage/elasticsearch-http/src/test/java/zipkin/storage/elasticsearch/http/integration/ElasticsearchHttpV5Test.java index 9e0f9d281b0..a6f0cbfb907 100644 --- a/zipkin-storage/elasticsearch-http/src/test/java/zipkin/storage/elasticsearch/http/integration/ElasticsearchHttpV5Test.java +++ b/zipkin-storage/elasticsearch-http/src/test/java/zipkin/storage/elasticsearch/http/integration/ElasticsearchHttpV5Test.java @@ -17,7 +17,6 @@ import org.junit.ClassRule; import org.junit.experimental.runners.Enclosed; import org.junit.runner.RunWith; -import zipkin.storage.SpanStoreTest; import zipkin.storage.elasticsearch.http.ElasticsearchHttpStorage; import zipkin.storage.elasticsearch.http.InternalForTests; @@ -28,17 +27,13 @@ public class ElasticsearchHttpV5Test { public static LazyElasticsearchHttpStorage storage = new LazyElasticsearchHttpStorage("openzipkin/zipkin-elasticsearch5:1.19.2"); - public static class HttpDependenciesTest - extends ElasticsearchHttpDependenciesTest { - + public static class DependenciesTest extends ElasticsearchHttpDependenciesTest { @Override protected ElasticsearchHttpStorage storage() { return storage.get(); } } - public static class HttpSpanConsumerTest - extends ElasticsearchHttpSpanConsumerTest { - + public static class SpanConsumerTest extends ElasticsearchHttpSpanConsumerTest { @Override protected ElasticsearchHttpStorage storage() { return storage.get(); } @@ -48,8 +43,7 @@ public static class HttpSpanConsumerTest } } - public static class ElasticsearchSpanStoreTest extends SpanStoreTest { - + public static class SpanStoreTest extends zipkin.storage.SpanStoreTest { @Override protected ElasticsearchHttpStorage storage() { return storage.get(); } @@ -59,11 +53,15 @@ public static class ElasticsearchSpanStoreTest extends SpanStoreTest { } } - public static class HttpStrictTraceIdFalseTest - extends ElasticsearchHttpStrictTraceIdFalseTest { - + public static class StrictTraceIdFalseTest extends ElasticsearchHttpStrictTraceIdFalseTest { @Override protected ElasticsearchHttpStorage.Builder storageBuilder() { return ElasticsearchHttpV5Test.storage.computeStorageBuilder(); } } + + public static class NamesFallbackTest extends ElasticsearchHttpNamesFallbackTest { + @Override protected ElasticsearchHttpStorage storage() { + return storage.get(); + } + } }