diff --git a/zipkin-storage/elasticsearch-http/src/main/java/zipkin/storage/elasticsearch/http/ElasticsearchHttpSpanConsumer.java b/zipkin-storage/elasticsearch-http/src/main/java/zipkin/storage/elasticsearch/http/ElasticsearchHttpSpanConsumer.java index 301a8fe6fcf..61b96256c3a 100644 --- a/zipkin-storage/elasticsearch-http/src/main/java/zipkin/storage/elasticsearch/http/ElasticsearchHttpSpanConsumer.java +++ b/zipkin-storage/elasticsearch-http/src/main/java/zipkin/storage/elasticsearch/http/ElasticsearchHttpSpanConsumer.java @@ -13,25 +13,18 @@ */ package zipkin.storage.elasticsearch.http; -import com.squareup.moshi.JsonWriter; -import java.io.IOException; -import java.util.LinkedHashMap; -import java.util.LinkedHashSet; import java.util.List; -import java.util.Map; -import java.util.Set; import java.util.concurrent.TimeUnit; -import okio.Buffer; -import zipkin.Codec; import zipkin.Span; -import zipkin.internal.Pair; +import zipkin.simplespan.SimpleSpan; +import zipkin.simplespan.SimpleSpanCodec; +import zipkin.simplespan.SimpleSpanConverter; import zipkin.storage.AsyncSpanConsumer; import zipkin.storage.Callback; import static zipkin.internal.ApplyTimestampAndDuration.guessTimestamp; import static zipkin.internal.Util.UTF_8; import static zipkin.internal.Util.propagateIfFatal; -import static zipkin.storage.elasticsearch.http.ElasticsearchHttpSpanStore.SERVICE_SPAN; class ElasticsearchHttpSpanConsumer implements AsyncSpanConsumer { // not final for testing @@ -50,10 +43,7 @@ class ElasticsearchHttpSpanConsumer implements AsyncSpanConsumer { // not final } try { HttpBulkIndexer indexer = new HttpBulkIndexer("index-span", es); - Map>> indexToServiceSpans = indexSpans(indexer, spans); - if (!indexToServiceSpans.isEmpty()) { - indexNames(indexer, indexToServiceSpans); - } + indexSpans(indexer, spans); indexer.execute(callback); } catch (Throwable t) { propagateIfFatal(t); @@ -61,9 +51,7 @@ class ElasticsearchHttpSpanConsumer implements AsyncSpanConsumer { // not final } } - /** Indexes spans and returns a mapping of indexes that may need a names update */ - Map>> indexSpans(HttpBulkIndexer indexer, List spans) { - Map>> indexToServiceSpans = new LinkedHashMap<>(); + void indexSpans(HttpBulkIndexer indexer, List spans) { for (Span span : spans) { Long timestamp = guessTimestamp(span); Long timestampMillis; @@ -83,40 +71,11 @@ Map>> indexSpans(HttpBulkIndexer indexer, List sp if (indexTimestamp == null) indexTimestamp = System.currentTimeMillis(); index = indexNameFormatter.indexNameForTimestamp(indexTimestamp); } - if (!span.name.isEmpty()) putServiceSpans(indexToServiceSpans, index, span); - byte[] document = Codec.JSON.writeSpan(span); - if (timestampMillis != null) document = prefixWithTimestampMillis(document, timestampMillis); - indexer.add(index, ElasticsearchHttpSpanStore.SPAN, document, null /* Allow ES to choose an ID */); - } - return indexToServiceSpans; - } - - void putServiceSpans(Map>> indexToServiceSpans, String index, Span s) { - Set> serviceSpans = indexToServiceSpans.get(index); - if (serviceSpans == null) indexToServiceSpans.put(index, serviceSpans = new LinkedHashSet<>()); - for (String serviceName : s.serviceNames()) { - serviceSpans.add(Pair.create(serviceName, s.name)); - } - } - - /** - * Adds service and span names to the pending batch. The id is "serviceName|spanName" to prevent - * a large order of duplicates ending up in the daily index. This also means queries do not need - * to deduplicate. - */ - void indexNames(HttpBulkIndexer indexer, Map>> indexToServiceSpans) - throws IOException { - Buffer buffer = new Buffer(); - for (Map.Entry>> entry : indexToServiceSpans.entrySet()) { - String index = entry.getKey(); - for (Pair serviceSpan : entry.getValue()) { - JsonWriter writer = JsonWriter.of(buffer); - writer.beginObject(); - writer.name("serviceName").value(serviceSpan._1); - writer.name("spanName").value(serviceSpan._2); - writer.endObject(); - byte[] document = buffer.readByteArray(); - indexer.add(index, SERVICE_SPAN, document, serviceSpan._1 + "|" + serviceSpan._2); + for (SimpleSpan simpleSpan: SimpleSpanConverter.fromSpan(span)) { + byte[] document = SimpleSpanCodec.JSON.writeSpan(simpleSpan); + if (timestampMillis != null) + document = prefixWithTimestampMillis(document, timestampMillis); + indexer.add(index, ElasticsearchHttpSpanStore.SPAN, document, null /* Allow ES to choose an ID */); } } } diff --git a/zipkin-storage/elasticsearch-http/src/main/java/zipkin/storage/elasticsearch/http/ElasticsearchHttpSpanStore.java b/zipkin-storage/elasticsearch-http/src/main/java/zipkin/storage/elasticsearch/http/ElasticsearchHttpSpanStore.java index 13e097e276d..6a54350018e 100644 --- a/zipkin-storage/elasticsearch-http/src/main/java/zipkin/storage/elasticsearch/http/ElasticsearchHttpSpanStore.java +++ b/zipkin-storage/elasticsearch-http/src/main/java/zipkin/storage/elasticsearch/http/ElasticsearchHttpSpanStore.java @@ -15,7 +15,6 @@ import java.util.Collections; import java.util.Iterator; -import java.util.LinkedHashMap; import java.util.List; import java.util.Locale; import java.util.Map; @@ -63,10 +62,7 @@ final class ElasticsearchHttpSpanStore implements AsyncSpanStore { SearchRequest.Filters filters = new SearchRequest.Filters(); filters.addRange("timestamp_millis", beginMillis, endMillis); if (request.serviceName != null) { - filters.addNestedTerms(asList( - "annotations.endpoint.serviceName", - "binaryAnnotations.endpoint.serviceName" - ), request.serviceName); + filters.addTerm("localEndpoint.serviceName", request.serviceName); } if (request.spanName != null) { @@ -74,28 +70,13 @@ final class ElasticsearchHttpSpanStore implements AsyncSpanStore { } for (String annotation : request.annotations) { - Map annotationValues = new LinkedHashMap<>(); - annotationValues.put("annotations.value", annotation); - Map binaryAnnotationKeys = new LinkedHashMap<>(); - binaryAnnotationKeys.put("binaryAnnotations.key", annotation); - if (request.serviceName != null) { - annotationValues.put("annotations.endpoint.serviceName", request.serviceName); - binaryAnnotationKeys.put("binaryAnnotations.endpoint.serviceName", request.serviceName); - } - filters.addNestedTerms(annotationValues, binaryAnnotationKeys); + filters.should() + .addTerm("annotations.value", annotation) + .addExists("tags." + annotation); } for (Map.Entry kv : request.binaryAnnotations.entrySet()) { - // In our index template, we make sure the binaryAnnotation value is indexed as string, - // meaning non-string values won't even be indexed at all. This means that we can only - // match string values here, which happens to be exactly what we want. - Map nestedTerms = new LinkedHashMap<>(); - nestedTerms.put("binaryAnnotations.key", kv.getKey()); - nestedTerms.put("binaryAnnotations.value", kv.getValue()); - if (request.serviceName != null) { - nestedTerms.put("binaryAnnotations.endpoint.serviceName", request.serviceName); - } - filters.addNestedTerms(nestedTerms); + filters.addTerm("tags." + kv.getKey(), kv.getValue()); } if (request.minDuration != null) { @@ -193,28 +174,15 @@ public void getRawTrace(long traceIdHigh, long traceIdLow, Callback> long beginMillis = endMillis - namesLookback; List indices = indexNameFormatter.indexNamePatternsForRange(beginMillis, endMillis); - SearchRequest request = SearchRequest.forIndicesAndType(indices, SERVICE_SPAN) - .addAggregation(Aggregation.terms("serviceName", Integer.MAX_VALUE)); - - search.newCall(request, BodyConverters.SORTED_KEYS).submit(new Callback>() { - @Override public void onSuccess(List value) { - if (!value.isEmpty()) callback.onSuccess(value); - - // Special cased code until sites update their collectors. What this does is do a more - // expensive nested query to get service names when the servicespan type returns nothing. - SearchRequest.Filters filters = new SearchRequest.Filters(); - filters.addRange("timestamp_millis", beginMillis, endMillis); - SearchRequest request = SearchRequest.forIndicesAndType(indices, SPAN) - .filters(filters) - .addAggregation(Aggregation.nestedTerms("annotations.endpoint.serviceName")) - .addAggregation(Aggregation.nestedTerms("binaryAnnotations.endpoint.serviceName")); - search.newCall(request, BodyConverters.SORTED_KEYS).submit(callback); - } - - @Override public void onError(Throwable t) { - callback.onError(t); - } - }); + // Service name queries include both local and remote endpoints. This is different than + // Span name, as a span name can only be on a local endpoint. + SearchRequest.Filters filters = new SearchRequest.Filters(); + filters.addRange("timestamp_millis", beginMillis, endMillis); + SearchRequest request = SearchRequest.forIndicesAndType(indices, SPAN) + .filters(filters) + .addAggregation(Aggregation.terms("localEndpoint.serviceName", Integer.MAX_VALUE)) + .addAggregation(Aggregation.terms("remoteEndpoint.serviceName", Integer.MAX_VALUE)); + search.newCall(request, BodyConverters.SORTED_KEYS).submit(callback); } @Override public void getSpanNames(String serviceName, Callback> callback) { @@ -228,32 +196,15 @@ public void getRawTrace(long traceIdHigh, long traceIdLow, Callback> List indices = indexNameFormatter.indexNamePatternsForRange(beginMillis, endMillis); - SearchRequest request = SearchRequest.forIndicesAndType(indices, SERVICE_SPAN) - .term("serviceName", serviceName.toLowerCase(Locale.ROOT)) - .addAggregation(Aggregation.terms("spanName", Integer.MAX_VALUE)); - - search.newCall(request, BodyConverters.SORTED_KEYS).submit(new Callback>() { - @Override public void onSuccess(List value) { - if (!value.isEmpty()) callback.onSuccess(value); - - // Special cased code until sites update their collectors. What this does is do a more - // expensive nested query to get span names when the servicespan type returns nothing. - SearchRequest.Filters filters = new SearchRequest.Filters(); - filters.addRange("timestamp_millis", beginMillis, endMillis); - filters.addNestedTerms(asList( - "annotations.endpoint.serviceName", - "binaryAnnotations.endpoint.serviceName" - ), serviceName.toLowerCase(Locale.ROOT)); - SearchRequest request = SearchRequest.forIndicesAndType(indices, SPAN) - .filters(filters) - .addAggregation(Aggregation.terms("name", Integer.MAX_VALUE)); - search.newCall(request, BodyConverters.SORTED_KEYS).submit(callback); - } + // A span name is only valid on a local endpoint, as a span name is defined locally + SearchRequest.Filters filters = new SearchRequest.Filters() + .addRange("timestamp_millis", beginMillis, endMillis) + .addTerm("localEndpoint.serviceName", serviceName.toLowerCase(Locale.ROOT)); - @Override public void onError(Throwable t) { - callback.onError(t); - } - }); + SearchRequest request = SearchRequest.forIndicesAndType(indices, SPAN) + .filters(filters) + .addAggregation(Aggregation.terms("name", Integer.MAX_VALUE)); + search.newCall(request, BodyConverters.SORTED_KEYS).submit(callback); } @Override public void getDependencies(long endTs, @Nullable Long lookback, diff --git a/zipkin-storage/elasticsearch-http/src/main/java/zipkin/storage/elasticsearch/http/JsonAdapters.java b/zipkin-storage/elasticsearch-http/src/main/java/zipkin/storage/elasticsearch/http/JsonAdapters.java index f70a6673097..35680cdabf7 100644 --- a/zipkin-storage/elasticsearch-http/src/main/java/zipkin/storage/elasticsearch/http/JsonAdapters.java +++ b/zipkin-storage/elasticsearch-http/src/main/java/zipkin/storage/elasticsearch/http/JsonAdapters.java @@ -25,10 +25,10 @@ import zipkin.DependencyLink; import zipkin.Endpoint; import zipkin.Span; -import zipkin.internal.Util; +import zipkin.simplespan.SimpleSpan; +import zipkin.simplespan.SimpleSpanConverter; import static zipkin.internal.Util.UTF_8; -import static zipkin.internal.Util.lowerHexToUnsignedLong; /** * Read-only json adapters resurrected from before we switched to Java 6 as storage components can @@ -38,7 +38,7 @@ final class JsonAdapters { static final JsonAdapter SPAN_ADAPTER = new JsonAdapter() { @Override public Span fromJson(JsonReader reader) throws IOException { - Span.Builder result = Span.builder(); + SimpleSpan.Builder result = SimpleSpan.builder(); reader.beginObject(); while (reader.hasNext()) { String nextName = reader.nextName(); @@ -48,20 +48,19 @@ public Span fromJson(JsonReader reader) throws IOException { } switch (nextName) { case "traceId": - String traceId = reader.nextString(); - if (traceId.length() == 32) { - result.traceIdHigh(lowerHexToUnsignedLong(traceId, 0)); - } - result.traceId(lowerHexToUnsignedLong(traceId)); + result.traceId(reader.nextString()); break; - case "name": - result.name(reader.nextString()); + case "parentId": + result.parentId(reader.nextString()); break; case "id": - result.id(Util.lowerHexToUnsignedLong(reader.nextString())); + result.id(reader.nextString()); break; - case "parentId": - result.parentId(Util.lowerHexToUnsignedLong(reader.nextString())); + case "kind": + result.kind(SimpleSpan.Kind.valueOf(reader.nextString())); + break; + case "name": + result.name(reader.nextString()); break; case "timestamp": result.timestamp(reader.nextLong()); @@ -69,29 +68,39 @@ public Span fromJson(JsonReader reader) throws IOException { case "duration": result.duration(reader.nextLong()); break; + case "localEndpoint": + result.localEndpoint(ENDPOINT_ADAPTER.fromJson(reader)); + break; + case "remoteEndpoint": + result.remoteEndpoint(ENDPOINT_ADAPTER.fromJson(reader)); + break; case "annotations": reader.beginArray(); while (reader.hasNext()) { - result.addAnnotation(ANNOTATION_ADAPTER.fromJson(reader)); + Annotation a = ANNOTATION_ADAPTER.fromJson(reader); + result.addAnnotation(a.timestamp, a.value); } reader.endArray(); break; - case "binaryAnnotations": - reader.beginArray(); + case "tags": + reader.beginObject(); while (reader.hasNext()) { - result.addBinaryAnnotation(BINARY_ANNOTATION_ADAPTER.fromJson(reader)); + result.putTag(reader.nextName(), reader.nextString()); } - reader.endArray(); + reader.endObject(); break; case "debug": result.debug(reader.nextBoolean()); break; + case "shared": + result.shared(reader.nextBoolean()); + break; default: reader.skipValue(); } } reader.endObject(); - return result.build(); + return SimpleSpanConverter.toSpan(result.build()); } @Override diff --git a/zipkin-storage/elasticsearch-http/src/main/java/zipkin/storage/elasticsearch/http/VersionSpecificTemplate.java b/zipkin-storage/elasticsearch-http/src/main/java/zipkin/storage/elasticsearch/http/VersionSpecificTemplate.java index 7c9c893a810..a64a73eb9a9 100644 --- a/zipkin-storage/elasticsearch-http/src/main/java/zipkin/storage/elasticsearch/http/VersionSpecificTemplate.java +++ b/zipkin-storage/elasticsearch-http/src/main/java/zipkin/storage/elasticsearch/http/VersionSpecificTemplate.java @@ -59,6 +59,18 @@ final class VersionSpecificTemplate { + " },\n" + " \"mappings\": {\n" + " \"_default_\": {\n" + + " \"dynamic_templates\": [\n" + + " {\n" + + " \"strings\": {\n" + + " \"mapping\": {\n" + + " KEYWORD,\n" + + " \"ignore_above\": 256\n" + + " },\n" + + " \"match_mapping_type\": \"string\",\n" + + " \"match\": \"*\"\n" + + " }\n" + + " }\n" + + " ],\n" + " \"_all\": {\n" + " \"enabled\": false\n" + " }\n" @@ -67,35 +79,31 @@ final class VersionSpecificTemplate { + " \"properties\": {\n" + " \"traceId\": ${__TRACE_ID_MAPPING__},\n" + " \"name\": { KEYWORD },\n" + + " \"localEndpoint\": {\n" + + " \"type\": \"object\",\n" + + " \"dynamic\": false,\n" + + " \"properties\": { \"serviceName\": { KEYWORD } }\n" + + " },\n" + + " \"remoteEndpoint\": {\n" + + " \"type\": \"object\",\n" + + " \"dynamic\": false,\n" + + " \"properties\": { \"serviceName\": { KEYWORD } }\n" + + " },\n" + " \"timestamp_millis\": {\n" + " \"type\": \"date\",\n" + " \"format\": \"epoch_millis\"\n" + " },\n" + " \"duration\": { \"type\": \"long\" },\n" + " \"annotations\": {\n" - + " \"type\": \"nested\",\n" + + " \"type\": \"object\",\n" + " \"dynamic\": false,\n" + " \"properties\": {\n" - + " \"value\": { KEYWORD },\n" - + " \"endpoint\": {\n" - + " \"type\": \"object\",\n" - + " \"dynamic\": false,\n" - + " \"properties\": { \"serviceName\": { KEYWORD } }\n" - + " }\n" + + " \"value\": { KEYWORD }\n" + " }\n" + " },\n" - + " \"binaryAnnotations\": {\n" - + " \"type\": \"nested\",\n" - + " \"dynamic\": false,\n" - + " \"properties\": {\n" - + " \"key\": { KEYWORD },\n" - + " \"value\": { KEYWORD },\n" - + " \"endpoint\": {\n" - + " \"type\": \"object\",\n" - + " \"dynamic\": false,\n" - + " \"properties\": { \"serviceName\": { KEYWORD } }\n" - + " }\n" - + " }\n" + + " \"tags\": {\n" + + " \"type\": \"object\",\n" + + " \"dynamic\": true\n" + " }\n" + " }\n" + " },\n" diff --git a/zipkin-storage/elasticsearch-http/src/main/java/zipkin/storage/elasticsearch/http/internal/client/SearchRequest.java b/zipkin-storage/elasticsearch-http/src/main/java/zipkin/storage/elasticsearch/http/internal/client/SearchRequest.java index 1a238fef1b0..f0bca466f7a 100644 --- a/zipkin-storage/elasticsearch-http/src/main/java/zipkin/storage/elasticsearch/http/internal/client/SearchRequest.java +++ b/zipkin-storage/elasticsearch-http/src/main/java/zipkin/storage/elasticsearch/http/internal/client/SearchRequest.java @@ -13,8 +13,6 @@ */ package zipkin.storage.elasticsearch.http.internal.client; -import java.util.ArrayList; -import java.util.Collection; import java.util.Collections; import java.util.LinkedHashMap; import java.util.LinkedList; @@ -44,58 +42,40 @@ public final class SearchRequest { this.type = type; } - public static class Filters extends LinkedList { - public Filters addRange(String field, long from, Long to) { - add(new Range(field, from, to)); + public static class Should extends LinkedList { + public Should addTerm(String field, String value) { + add(new Term(field, value)); return this; } - public Filters addTerm(String field, String value) { - add(new Term(field, value)); + public Should addExists(String field) { + add(new Exists(field)); return this; } + } - public Filters addNestedTerms(Collection nestedFields, String value) { - add(_nestedTermsEqual(nestedFields, value)); + public static class Filters extends LinkedList { + public Filters addRange(String field, long from, Long to) { + add(new Range(field, from, to)); return this; } - public Filters addNestedTerms(Map... nestedTerms) { - if (nestedTerms.length == 1) { - add(mustMatchAllNestedTerms(nestedTerms[0])); - return this; - } - List nestedBoolQueries = new ArrayList<>(nestedTerms.length); - for (Map next : nestedTerms) { - nestedBoolQueries.add(mustMatchAllNestedTerms(next)); - } - add(new SearchRequest.BoolQuery("should", nestedBoolQueries)); + public Filters addTerm(String field, String value) { + add(new Terms(field, Collections.singletonList(value))); return this; } - } - static NestedBoolQuery mustMatchAllNestedTerms(Map next) { - List terms = new ArrayList<>(); - String field = null; - for (Map.Entry nestedTerm : next.entrySet()) { - terms.add(new Term(field = nestedTerm.getKey(), nestedTerm.getValue())); + public Should should() { + Should result = new Should(); + add(new SearchRequest.BoolQuery("should", result)); + return result; } - return new NestedBoolQuery(field.substring(0, field.indexOf('.')), "must", terms); } public SearchRequest filters(Filters filters) { return query(new BoolQuery("must", filters)); } - static SearchRequest.BoolQuery _nestedTermsEqual(Collection nestedFields, String value) { - List conditions = new ArrayList<>(); - for (String nestedField : nestedFields) { - conditions.add(new NestedBoolQuery(nestedField.substring(0, nestedField.indexOf('.')), "must", - new SearchRequest.Term(nestedField, value))); - } - return new SearchRequest.BoolQuery("should", conditions); - } - public static SearchRequest forIndicesAndType(List indices, String type) { return new SearchRequest(indices, type); } @@ -133,6 +113,14 @@ static class Term { } } + static class Exists { + final Map exists; + + Exists(String field) { + exists = Collections.singletonMap("field", field); + } + } + static class Terms { final Map> terms; @@ -161,22 +149,6 @@ static class Bounds { } } - static class NestedBoolQuery { - final Map nested; - - NestedBoolQuery(String path, String condition, List terms) { - nested = new LinkedHashMap<>(2); - nested.put("path", path); - nested.put("query", new BoolQuery(condition, terms)); - } - - NestedBoolQuery(String path, String condition, Term term) { - nested = new LinkedHashMap<>(2); - nested.put("path", path); - nested.put("query", new BoolQuery(condition, term)); - } - } - static class BoolQuery { final Map bool; diff --git a/zipkin-storage/elasticsearch-http/src/test/java/zipkin/storage/elasticsearch/http/ElasticsearchHttpSpanConsumerTest.java b/zipkin-storage/elasticsearch-http/src/test/java/zipkin/storage/elasticsearch/http/ElasticsearchHttpSpanConsumerTest.java index f232209ac58..08fca4a5655 100644 --- a/zipkin-storage/elasticsearch-http/src/test/java/zipkin/storage/elasticsearch/http/ElasticsearchHttpSpanConsumerTest.java +++ b/zipkin-storage/elasticsearch-http/src/test/java/zipkin/storage/elasticsearch/http/ElasticsearchHttpSpanConsumerTest.java @@ -14,7 +14,6 @@ package zipkin.storage.elasticsearch.http; import java.io.IOException; -import java.util.concurrent.TimeUnit; import okhttp3.mockwebserver.MockResponse; import okhttp3.mockwebserver.MockWebServer; import okhttp3.mockwebserver.RecordedRequest; @@ -24,7 +23,6 @@ import org.junit.Test; import org.junit.rules.ExpectedException; import zipkin.Annotation; -import zipkin.BinaryAnnotation; import zipkin.Codec; import zipkin.Span; import zipkin.TestObjects; @@ -35,7 +33,6 @@ import static zipkin.Constants.CLIENT_SEND; import static zipkin.Constants.SERVER_RECV; import static zipkin.TestObjects.TODAY; -import static zipkin.internal.ApplyTimestampAndDuration.guessTimestamp; import static zipkin.internal.Util.UTF_8; import static zipkin.storage.elasticsearch.http.ElasticsearchHttpSpanConsumer.prefixWithTimestampMillis; @@ -109,35 +106,6 @@ public void writesSpanNaturallyWhenNoTimestamp() throws Exception { .contains("\n" + new String(Codec.JSON.writeSpan(span), UTF_8) + "\n"); } - @Test - public void indexesServiceSpan_explicitTimestamp() throws Exception { - es.enqueue(new MockResponse()); - - Span span = TestObjects.TRACE.get(0); - accept(span); - - assertThat(es.takeRequest().getBody().readByteString().utf8()).endsWith( - "\"_type\":\"servicespan\",\"_id\":\"web|get\"}}\n" - + "{\"serviceName\":\"web\",\"spanName\":\"get\"}\n" - ); - } - - /** Not a good span name, but better to test it than break mysteriously */ - @Test - public void indexesServiceSpan_jsonInSpanName() throws Exception { - es.enqueue(new MockResponse()); - - String name = "{\"foo\":\"bar\"}"; - String nameEscaped = "{\\\"foo\\\":\\\"bar\\\"}"; - - accept(TestObjects.TRACE.get(0).toBuilder().name(name).build()); - - assertThat(es.takeRequest().getBody().readByteString().utf8()).endsWith( - "\"_type\":\"servicespan\",\"_id\":\"web|" + nameEscaped + "\"}}\n" - + "{\"serviceName\":\"web\",\"spanName\":\"" + nameEscaped + "\"}\n" - ); - } - @Test public void traceIsSearchableBySRServiceName() throws Exception { es.enqueue(new MockResponse()); @@ -158,95 +126,6 @@ public void traceIsSearchableBySRServiceName() throws Exception { .contains("{\"timestamp_millis\":0"); } - @Test - public void indexesServiceSpan_multipleServices() throws Exception { - es.enqueue(new MockResponse()); - - Span span = TestObjects.TRACE.get(1); - accept(span); - - assertThat(es.takeRequest().getBody().readByteString().utf8()) - .contains( - "\"_type\":\"servicespan\",\"_id\":\"app|get\"}}\n" - + "{\"serviceName\":\"app\",\"spanName\":\"get\"}\n" - ) - .contains( - "\"_type\":\"servicespan\",\"_id\":\"web|get\"}}\n" - + "{\"serviceName\":\"web\",\"spanName\":\"get\"}\n" - ); - } - - @Test - public void indexesServiceSpan_basedOnGuessTimestamp() throws Exception { - es.enqueue(new MockResponse()); - - Annotation cs = Annotation.create( - TimeUnit.DAYS.toMicros(365), // 1971-01-01 - CLIENT_SEND, - TestObjects.APP_ENDPOINT - ); - - Span span = Span.builder().traceId(1L).id(1L).name("s").addAnnotation(cs).build(); - - // sanity check data - assertThat(span.timestamp).isNull(); - assertThat(guessTimestamp(span)).isNotNull(); - - accept(span); - - // index timestamp is the server timestamp, not current time! - assertThat(es.takeRequest().getBody().readByteString().utf8()).contains( - "{\"index\":{\"_index\":\"zipkin-1971-01-01\",\"_type\":\"span\"}}\n", - "{\"index\":{\"_index\":\"zipkin-1971-01-01\",\"_type\":\"servicespan\",\"_id\":\"app|s\"}}\n" - ); - } - - @Test - public void indexesServiceSpan_basedOnAnnotationTimestamp() throws Exception { - es.enqueue(new MockResponse()); - - Annotation foo = Annotation.create( - TimeUnit.DAYS.toMicros(365), // 1971-01-01 - "foo", - TestObjects.APP_ENDPOINT - ); - - Span span = Span.builder().traceId(1L).id(2L).parentId(1L).name("s").addAnnotation(foo).build(); - - // sanity check data - assertThat(span.timestamp).isNull(); - assertThat(guessTimestamp(span)).isNull(); - - accept(span); - - // index timestamp is the server timestamp, not current time! - assertThat(es.takeRequest().getBody().readByteString().utf8()).contains( - "{\"index\":{\"_index\":\"zipkin-1971-01-01\",\"_type\":\"span\"}}\n", - "{\"index\":{\"_index\":\"zipkin-1971-01-01\",\"_type\":\"servicespan\",\"_id\":\"app|s\"}}\n" - ); - } - - @Test - public void indexesServiceSpan_currentTimestamp() throws Exception { - es.enqueue(new MockResponse()); - - Span span = Span.builder().traceId(1L).id(2L).parentId(1L).name("s") - .addBinaryAnnotation(BinaryAnnotation.create("f", "", TestObjects.APP_ENDPOINT)) - .build(); - - // sanity check data - assertThat(span.timestamp).isNull(); - assertThat(guessTimestamp(span)).isNull(); - - accept(span); - - String today = storage.indexNameFormatter().indexNameForTimestamp(TODAY); - assertThat(es.takeRequest().getBody().readByteString().utf8()).contains( - "{\"index\":{\"_index\":\"" + today + "\",\"_type\":\"span\"}}\n", - "{\"index\":{\"_index\":\"" + today + "\",\"_type\":\"servicespan\",\"_id\":\"app|s\"}}\n" - ); - } - @Test public void addsPipelineId() throws Exception { close(); diff --git a/zipkin-storage/elasticsearch-http/src/test/java/zipkin/storage/elasticsearch/http/ElasticsearchHttpSpanStoreTest.java b/zipkin-storage/elasticsearch-http/src/test/java/zipkin/storage/elasticsearch/http/ElasticsearchHttpSpanStoreTest.java index fde2285bb1c..50f798d3632 100644 --- a/zipkin-storage/elasticsearch-http/src/test/java/zipkin/storage/elasticsearch/http/ElasticsearchHttpSpanStoreTest.java +++ b/zipkin-storage/elasticsearch-http/src/test/java/zipkin/storage/elasticsearch/http/ElasticsearchHttpSpanStoreTest.java @@ -27,7 +27,7 @@ import static java.util.Arrays.asList; import static org.assertj.core.api.Assertions.assertThat; -import static zipkin.storage.elasticsearch.http.ElasticsearchHttpSpanStore.SERVICE_SPAN; +import static zipkin.storage.elasticsearch.http.ElasticsearchHttpSpanStore.SPAN; import static zipkin.storage.elasticsearch.http.TestResponses.SERVICE_NAMES; import static zipkin.storage.elasticsearch.http.TestResponses.SPAN_NAMES; @@ -84,6 +84,6 @@ private void requestLimitedTo2DaysOfIndices() throws InterruptedException { RecordedRequest request = es.takeRequest(); assertThat(request.getPath()) - .startsWith("/" + indexesToSearch + "/" + SERVICE_SPAN + "/_search"); + .startsWith("/" + indexesToSearch + "/" + SPAN + "/_search"); } } diff --git a/zipkin-storage/elasticsearch-http/src/test/java/zipkin/storage/elasticsearch/http/InternalForTests.java b/zipkin-storage/elasticsearch-http/src/test/java/zipkin/storage/elasticsearch/http/InternalForTests.java index b12fff78ada..b0f31318ae0 100644 --- a/zipkin-storage/elasticsearch-http/src/test/java/zipkin/storage/elasticsearch/http/InternalForTests.java +++ b/zipkin-storage/elasticsearch-http/src/test/java/zipkin/storage/elasticsearch/http/InternalForTests.java @@ -48,13 +48,4 @@ public static void clear(ElasticsearchHttpStorage es) throws IOException { public static void flushOnWrites(ElasticsearchHttpStorage.Builder builder) { builder.flushOnWrites(true); } - - /** The old consumer didn't write to the "servicespan" type on ingest. */ - public static AsyncSpanConsumer oldConsumer(ElasticsearchHttpStorage es) { - es.ensureIndexTemplate(); - return new ElasticsearchHttpSpanConsumer(es) { - @Override void indexNames(HttpBulkIndexer ignored, Map>> ignored2) { - } - }; - } } diff --git a/zipkin-storage/elasticsearch-http/src/test/java/zipkin/storage/elasticsearch/http/JsonAdaptersTest.java b/zipkin-storage/elasticsearch-http/src/test/java/zipkin/storage/elasticsearch/http/JsonAdaptersTest.java index 37f8a5a1f88..bf7d6c7cd3d 100644 --- a/zipkin-storage/elasticsearch-http/src/test/java/zipkin/storage/elasticsearch/http/JsonAdaptersTest.java +++ b/zipkin-storage/elasticsearch-http/src/test/java/zipkin/storage/elasticsearch/http/JsonAdaptersTest.java @@ -14,20 +14,23 @@ package zipkin.storage.elasticsearch.http; import java.io.IOException; -import java.nio.ByteBuffer; +import java.util.List; import okio.Buffer; import org.junit.Test; -import zipkin.Annotation; import zipkin.BinaryAnnotation; import zipkin.Codec; import zipkin.DependencyLink; import zipkin.Endpoint; import zipkin.Span; import zipkin.TestObjects; +import zipkin.internal.ApplyTimestampAndDuration; import zipkin.internal.Util; +import zipkin.simplespan.SimpleSpan; +import zipkin.simplespan.SimpleSpanCodec; +import zipkin.simplespan.SimpleSpanConverter; -import static java.util.Arrays.asList; import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.entry; import static zipkin.storage.elasticsearch.http.JsonAdapters.SPAN_ADAPTER; public class JsonAdaptersTest { @@ -121,85 +124,38 @@ public void binaryAnnotation_long_read() throws IOException { + " \"traceId\": \"6b221d5bc9e6496c\",\n" + " \"name\": \"get-traces\",\n" + " \"id\": \"6b221d5bc9e6496c\",\n" - + " \"binaryAnnotations\": [\n" - + " {\n" - + " \"key\": \"num\",\n" - + " \"value\": 123456789,\n" - + " \"type\": \"I64\"\n" - + " }\n" - + " ]\n" + + " \"tags\": {" + + " \"num\": 123456789" + + " }" + "}"; - Span span = SPAN_ADAPTER.fromJson(new Buffer().writeUtf8(json)); - assertThat(span.binaryAnnotations) - .containsExactly(BinaryAnnotation.builder() - .key("num") - .type(BinaryAnnotation.Type.I64) - .value(new Buffer().writeLong(123456789).readByteArray()) - .build()); + List spans = SimpleSpanConverter.fromSpan(JsonAdapters.SPAN_ADAPTER.fromJson(json)); + assertThat(spans.get(0).tags()) + .containsExactly(entry("num", "123456789")); } @Test - public void binaryAnnotation_double_read() throws IOException { + public void tag_double_read() throws IOException { String json = "{\n" + " \"traceId\": \"6b221d5bc9e6496c\",\n" + " \"name\": \"get-traces\",\n" + " \"id\": \"6b221d5bc9e6496c\",\n" - + " \"binaryAnnotations\": [\n" - + " {\n" - + " \"key\": \"num\",\n" - + " \"value\": 1.23456789,\n" - + " \"type\": \"DOUBLE\"\n" - + " }\n" - + " ]\n" + + " \"tags\": {" + + " \"num\": 1.23456789" + + " }" + "}"; - Span span = SPAN_ADAPTER.fromJson(new Buffer().writeUtf8(json)); - assertThat(span.binaryAnnotations) - .containsExactly(BinaryAnnotation.builder() - .key("num") - .type(BinaryAnnotation.Type.DOUBLE) - .value(new Buffer().writeLong(Double.doubleToRawLongBits(1.23456789)).readByteArray()) - .build()); - } - - @Test - public void spanRoundTrip() throws IOException { - for (Span span : TestObjects.TRACE) { - Buffer bytes = new Buffer(); - bytes.write(Codec.JSON.writeSpan(span)); - assertThat(SPAN_ADAPTER.fromJson(bytes)) - .isEqualTo(span); - } + List spans = SimpleSpanConverter.fromSpan(JsonAdapters.SPAN_ADAPTER.fromJson(json)); + assertThat(spans.get(0).tags()) + .containsExactly(entry("num", "1.23456789")); } @Test - public void binaryAnnotation_long() throws IOException { - Span span = TestObjects.LOTS_OF_SPANS[0].toBuilder().binaryAnnotations(asList( - BinaryAnnotation.builder() - .key("Long.zero") - .type(BinaryAnnotation.Type.I64) - .value(ByteBuffer.allocate(8).putLong(0, 0L).array()) - .build(), - BinaryAnnotation.builder() - .key("Long.negative") - .type(BinaryAnnotation.Type.I64) - .value(ByteBuffer.allocate(8).putLong(0, -1005656679588439279L).array()) - .build(), - BinaryAnnotation.builder() - .key("Long.MIN_VALUE") - .type(BinaryAnnotation.Type.I64) - .value(ByteBuffer.allocate(8).putLong(0, Long.MIN_VALUE).array()) - .build(), - BinaryAnnotation.builder() - .key("Long.MAX_VALUE") - .type(BinaryAnnotation.Type.I64) - .value(ByteBuffer.allocate(8).putLong(0, Long.MAX_VALUE).array()) - .build() - )).build(); - + public void readSpan() throws IOException { + Span span = ApplyTimestampAndDuration.apply(TestObjects.LOTS_OF_SPANS[0]); + SimpleSpan simple = SimpleSpanConverter.fromSpan(span).get(0); Buffer bytes = new Buffer(); - bytes.write(Codec.JSON.writeSpan(span)); + bytes.write(SimpleSpanCodec.JSON.writeSpan(simple)); assertThat(SPAN_ADAPTER.fromJson(bytes)) .isEqualTo(span); } @@ -212,51 +168,21 @@ public void binaryAnnotation_long() throws IOException { public void specialCharsInJson() throws IOException { // service name is surrounded by control characters Endpoint e = Endpoint.create(new String(new char[] {0, 'a', 1}), 0); - Span worstSpanInTheWorld = Span.builder().traceId(1L).id(1L) + SimpleSpan worstSpanInTheWorld = SimpleSpan.builder().traceId(1L).id(1L) // name is terrible .name(new String(new char[] {'"', '\\', '\t', '\b', '\n', '\r', '\f'})) + .localEndpoint(e) // annotation value includes some json newline characters - .addAnnotation(Annotation.create(1L, "\u2028 and \u2029", e)) + .addAnnotation(1L, "\u2028 and \u2029") // binary annotation key includes a quote and value newlines - .addBinaryAnnotation(BinaryAnnotation.create("\"foo", - "Database error: ORA-00942:\u2028 and \u2029 table or view does not exist\n", e)) + .putTag("\"foo", + "Database error: ORA-00942:\u2028 and \u2029 table or view does not exist\n") .build(); Buffer bytes = new Buffer(); - bytes.write(Codec.JSON.writeSpan(worstSpanInTheWorld)); + bytes.write(SimpleSpanCodec.JSON.writeSpan(worstSpanInTheWorld)); assertThat(SPAN_ADAPTER.fromJson(bytes)) - .isEqualTo(worstSpanInTheWorld); - } - - @Test - public void binaryAnnotation_double() throws IOException { - Span span = TestObjects.LOTS_OF_SPANS[0].toBuilder().binaryAnnotations(asList( - BinaryAnnotation.builder() - .key("Double.zero") - .type(BinaryAnnotation.Type.DOUBLE) - .value(ByteBuffer.allocate(8).putDouble(0, 0.0).array()) - .build(), - BinaryAnnotation.builder() - .key("Double.negative") - .type(BinaryAnnotation.Type.DOUBLE) - .value(ByteBuffer.allocate(8).putDouble(0, -1.005656679588439279).array()) - .build(), - BinaryAnnotation.builder() - .key("Double.MIN_VALUE") - .type(BinaryAnnotation.Type.DOUBLE) - .value(ByteBuffer.allocate(8).putDouble(0, Double.MIN_VALUE).array()) - .build(), - BinaryAnnotation.builder() - .key("Double.MAX_VALUE") - .type(BinaryAnnotation.Type.I64) - .value(ByteBuffer.allocate(8).putDouble(0, Double.MAX_VALUE).array()) - .build() - )).build(); - - Buffer bytes = new Buffer(); - bytes.write(Codec.JSON.writeSpan(span)); - assertThat(SPAN_ADAPTER.fromJson(bytes)) - .isEqualTo(span); + .isEqualTo(SimpleSpanConverter.toSpan(worstSpanInTheWorld)); } @Test @@ -265,20 +191,14 @@ public void endpointHighPort() throws IOException { + " \"traceId\": \"6b221d5bc9e6496c\",\n" + " \"name\": \"get-traces\",\n" + " \"id\": \"6b221d5bc9e6496c\",\n" - + " \"binaryAnnotations\": [\n" - + " {\n" - + " \"key\": \"foo\",\n" - + " \"value\": \"bar\",\n" - + " \"endpoint\": {\n" - + " \"serviceName\": \"service\",\n" - + " \"port\": 65535\n" - + " }\n" - + " }\n" - + " ]\n" + + " \"localEndpoint\": {\n" + + " \"serviceName\": \"service\",\n" + + " \"port\": 65535\n" + + " }\n" + "}"; assertThat(SPAN_ADAPTER.fromJson(json).binaryAnnotations) - .containsExactly(BinaryAnnotation.create("foo", "bar", + .containsExactly(BinaryAnnotation.create("lc", "", Endpoint.builder().serviceName("service").port(65535).build())); } @@ -311,22 +231,18 @@ public void readsTraceIdHighFromTraceIdField() throws IOException { } @Test - public void binaryAnnotation_long_max() throws IOException { + public void tag_long_max() throws IOException { String json = ("{" + " \"traceId\": \"6b221d5bc9e6496c\"," + " \"id\": \"6b221d5bc9e6496c\"," + " \"name\": \"get-traces\"," - + " \"binaryAnnotations\": [" - + " {" - + " \"key\": \"num\"," - + " \"value\": \"9223372036854775807\"," - + " \"type\": \"I64\"" - + " }" - + " ]" + + " \"tags\": {" + + " \"num\": \"9223372036854775807\"" + + " }" + "}").replaceAll("\\s", ""); - Span span = JsonAdapters.SPAN_ADAPTER.fromJson(json); - assertThat(span.binaryAnnotations).extracting(b -> ByteBuffer.wrap(b.value).getLong()) - .containsExactly(9223372036854775807L); + List spans = SimpleSpanConverter.fromSpan(JsonAdapters.SPAN_ADAPTER.fromJson(json)); + assertThat(spans.get(0).tags()) + .containsExactly(entry("num", "9223372036854775807")); } } diff --git a/zipkin-storage/elasticsearch-http/src/test/java/zipkin/storage/elasticsearch/http/integration/ElasticsearchHttpNamesFallbackTest.java b/zipkin-storage/elasticsearch-http/src/test/java/zipkin/storage/elasticsearch/http/integration/ElasticsearchHttpNamesFallbackTest.java index aadcf9afbc7..6bd1d06aea6 100644 --- a/zipkin-storage/elasticsearch-http/src/test/java/zipkin/storage/elasticsearch/http/integration/ElasticsearchHttpNamesFallbackTest.java +++ b/zipkin-storage/elasticsearch-http/src/test/java/zipkin/storage/elasticsearch/http/integration/ElasticsearchHttpNamesFallbackTest.java @@ -13,7 +13,6 @@ */ package zipkin.storage.elasticsearch.http.integration; -import java.io.IOException; import java.util.List; import org.junit.Before; import org.junit.Test; @@ -32,11 +31,9 @@ abstract class ElasticsearchHttpNamesFallbackTest { /** Setup test data which has doesnt map the "servicespan" type */ @Before - public void clear() throws IOException { + public void clear() throws Exception { InternalForTests.clear(storage()); - CallbackCaptor callback = new CallbackCaptor<>(); - InternalForTests.oldConsumer(storage()).accept(TestObjects.TRACE, callback); - callback.get(); + accept(TestObjects.TRACE); } @Test @@ -57,7 +54,7 @@ public void getSpanNames() throws Exception { void accept(List trace) throws Exception { CallbackCaptor callback = new CallbackCaptor<>(); - InternalForTests.oldConsumer(storage()).accept(trace, callback); + storage().asyncSpanConsumer().accept(trace, callback); callback.get(); } } diff --git a/zipkin-storage/elasticsearch-http/src/test/java/zipkin/storage/elasticsearch/http/integration/ElasticsearchHttpSpanConsumerTest.java b/zipkin-storage/elasticsearch-http/src/test/java/zipkin/storage/elasticsearch/http/integration/ElasticsearchHttpSpanConsumerTest.java index 78f3b1770b7..d2c69aa174f 100644 --- a/zipkin-storage/elasticsearch-http/src/test/java/zipkin/storage/elasticsearch/http/integration/ElasticsearchHttpSpanConsumerTest.java +++ b/zipkin-storage/elasticsearch-http/src/test/java/zipkin/storage/elasticsearch/http/integration/ElasticsearchHttpSpanConsumerTest.java @@ -82,22 +82,6 @@ String findSpans(long endTs, long traceId) throws IOException { .get().build()).execute().body().string(); } - @Test - public void serviceSpanGoesIntoADailyIndex_whenTimestampIsDerived() throws Exception { - long twoDaysAgo = (TODAY - 2 * DAY); - - Span span = Span.builder().traceId(20L).id(20L).name("get") - .addAnnotation(Annotation.create(twoDaysAgo * 1000, SERVER_RECV, WEB_ENDPOINT)) - .addAnnotation(Annotation.create(TODAY * 1000, SERVER_SEND, WEB_ENDPOINT)) - .build(); - - accept(span); - - // make sure the servicespan went into an index corresponding to its first annotation timestamp - assertThat(findServiceSpan(twoDaysAgo, WEB_ENDPOINT.serviceName)) - .contains("\"hits\":{\"total\":1"); - } - String findServiceSpan(long endTs, String serviceName) throws IOException { return new OkHttpClient().newCall(new Request.Builder().url( HttpUrl.parse(baseUrl()).newBuilder() diff --git a/zipkin-storage/elasticsearch-http/src/test/java/zipkin/storage/elasticsearch/http/integration/ElasticsearchHttpV2Test.java b/zipkin-storage/elasticsearch-http/src/test/java/zipkin/storage/elasticsearch/http/integration/ElasticsearchHttpV2Test.java index 0049f211b76..406f2e39d2f 100644 --- a/zipkin-storage/elasticsearch-http/src/test/java/zipkin/storage/elasticsearch/http/integration/ElasticsearchHttpV2Test.java +++ b/zipkin-storage/elasticsearch-http/src/test/java/zipkin/storage/elasticsearch/http/integration/ElasticsearchHttpV2Test.java @@ -25,7 +25,7 @@ public class ElasticsearchHttpV2Test { @ClassRule public static LazyElasticsearchHttpStorage storage = - new LazyElasticsearchHttpStorage("openzipkin/zipkin-elasticsearch:1.19.2"); + new LazyElasticsearchHttpStorage("openzipkin/zipkin-elasticsearch:1.28.1"); public static class DependenciesTest extends ElasticsearchHttpDependenciesTest { @Override protected ElasticsearchHttpStorage storage() { diff --git a/zipkin-storage/elasticsearch-http/src/test/java/zipkin/storage/elasticsearch/http/integration/ElasticsearchHttpV5Test.java b/zipkin-storage/elasticsearch-http/src/test/java/zipkin/storage/elasticsearch/http/integration/ElasticsearchHttpV5Test.java index a6f0cbfb907..0574ae25f83 100644 --- a/zipkin-storage/elasticsearch-http/src/test/java/zipkin/storage/elasticsearch/http/integration/ElasticsearchHttpV5Test.java +++ b/zipkin-storage/elasticsearch-http/src/test/java/zipkin/storage/elasticsearch/http/integration/ElasticsearchHttpV5Test.java @@ -25,7 +25,7 @@ public class ElasticsearchHttpV5Test { @ClassRule public static LazyElasticsearchHttpStorage storage = - new LazyElasticsearchHttpStorage("openzipkin/zipkin-elasticsearch5:1.19.2"); + new LazyElasticsearchHttpStorage("openzipkin/zipkin-elasticsearch5:1.28.1"); public static class DependenciesTest extends ElasticsearchHttpDependenciesTest { @Override protected ElasticsearchHttpStorage storage() { diff --git a/zipkin-storage/elasticsearch-http/src/test/java/zipkin/storage/elasticsearch/http/integration/LazyElasticsearchHttpStorage.java b/zipkin-storage/elasticsearch-http/src/test/java/zipkin/storage/elasticsearch/http/integration/LazyElasticsearchHttpStorage.java index 31d56200605..9ffdbf2e478 100644 --- a/zipkin-storage/elasticsearch-http/src/test/java/zipkin/storage/elasticsearch/http/integration/LazyElasticsearchHttpStorage.java +++ b/zipkin-storage/elasticsearch-http/src/test/java/zipkin/storage/elasticsearch/http/integration/LazyElasticsearchHttpStorage.java @@ -43,6 +43,7 @@ class LazyElasticsearchHttpStorage extends LazyCloseable