diff --git a/zipkin-storage/elasticsearch/src/main/java/zipkin2/elasticsearch/ElasticsearchSpanConsumer.java b/zipkin-storage/elasticsearch/src/main/java/zipkin2/elasticsearch/ElasticsearchSpanConsumer.java index 3d9eeff9237..a4649939ed2 100644 --- a/zipkin-storage/elasticsearch/src/main/java/zipkin2/elasticsearch/ElasticsearchSpanConsumer.java +++ b/zipkin-storage/elasticsearch/src/main/java/zipkin2/elasticsearch/ElasticsearchSpanConsumer.java @@ -32,6 +32,7 @@ import zipkin2.elasticsearch.internal.HttpBulkIndexer; import zipkin2.elasticsearch.internal.IndexNameFormatter; import zipkin2.internal.DelayLimiter; +import zipkin2.internal.Nullable; import zipkin2.storage.SpanConsumer; import static zipkin2.elasticsearch.ElasticsearchAutocompleteTags.AUTOCOMPLETE; @@ -47,6 +48,7 @@ class ElasticsearchSpanConsumer implements SpanConsumer { // not final for testi final ElasticsearchStorage es; final Set autocompleteKeys; final IndexNameFormatter indexNameFormatter; + final char indexTypeDelimiter; final boolean searchEnabled; final DelayLimiter delayLimiter; @@ -54,12 +56,18 @@ class ElasticsearchSpanConsumer implements SpanConsumer { // not final for testi this.es = es; this.autocompleteKeys = new LinkedHashSet<>(es.autocompleteKeys()); this.indexNameFormatter = es.indexNameFormatter(); + this.indexTypeDelimiter = es.indexTypeDelimiter(); this.searchEnabled = es.searchEnabled(); this.delayLimiter = DelayLimiter.newBuilder() .ttl(es.autocompleteTtl()) .cardinality(es.autocompleteCardinality()).build(); } + String formatTypeAndTimestampForInsert(String type, long timestampMillis) { + return indexNameFormatter.formatTypeAndTimestampForInsert(type, indexTypeDelimiter, + timestampMillis); + } + @Override public Call accept(List spans) { if (spans.isEmpty()) return Call.create(null); BulkSpanIndexer indexer = new BulkSpanIndexer(this); @@ -101,8 +109,7 @@ static final class BulkSpanIndexer { } void add(long indexTimestamp, Span span, long timestampMillis) { - String index = consumer.indexNameFormatter - .formatTypeAndTimestamp(SPAN, indexTimestamp); + String index = consumer.formatTypeAndTimestampForInsert(SPAN, indexTimestamp); byte[] document = consumer.searchEnabled ? prefixWithTimestampMillisAndQuery(span, timestampMillis) : SpanBytesEncoder.JSON_V2.encode(span); @@ -110,7 +117,7 @@ void add(long indexTimestamp, Span span, long timestampMillis) { } void addAutocompleteValues(long indexTimestamp, Span span) { - String idx = consumer.indexNameFormatter.formatTypeAndTimestamp(AUTOCOMPLETE, indexTimestamp); + String idx = consumer.formatTypeAndTimestampForInsert(AUTOCOMPLETE, indexTimestamp); for (Map.Entry tag : span.tags().entrySet()) { int length = tag.getKey().length() + tag.getValue().length() + 1; if (length > INDEX_CHARS_LIMIT) continue; diff --git a/zipkin-storage/elasticsearch/src/main/java/zipkin2/elasticsearch/ElasticsearchStorage.java b/zipkin-storage/elasticsearch/src/main/java/zipkin2/elasticsearch/ElasticsearchStorage.java index 02fddbf6a35..68bdbea9927 100644 --- a/zipkin-storage/elasticsearch/src/main/java/zipkin2/elasticsearch/ElasticsearchStorage.java +++ b/zipkin-storage/elasticsearch/src/main/java/zipkin2/elasticsearch/ElasticsearchStorage.java @@ -266,6 +266,10 @@ public float version() { return ensureIndexTemplates().version(); } + char indexTypeDelimiter() { + return ensureIndexTemplates().indexTypeDelimiter(); + } + /** This is a blocking call, only used in tests. */ public void clear() throws IOException { Set toClear = new LinkedHashSet<>(); @@ -348,20 +352,21 @@ IndexTemplates ensureIndexTemplates() { String index = indexNameFormatter().index(); try { IndexTemplates templates = new VersionSpecificTemplates(this).get(http()); - EnsureIndexTemplate.apply(http(), index + ":" + SPAN + "_template", templates.span()); + char indexTypeDelimiter = templates.indexTypeDelimiter(); + EnsureIndexTemplate.apply( + http(), index + indexTypeDelimiter + SPAN + "_template", templates.span()); EnsureIndexTemplate.apply( - http(), index + ":" + DEPENDENCY + "_template", templates.dependency()); + http(), index + indexTypeDelimiter + DEPENDENCY + "_template", templates.dependency()); EnsureIndexTemplate.apply( - http(), index + ":" + AUTOCOMPLETE + "_template", templates.autocomplete()); + http(), index + indexTypeDelimiter + AUTOCOMPLETE + "_template", templates.autocomplete()); return templates; } catch (IOException e) { throw Platform.get().uncheckedIOException(e); } } - @Memoized - public // hosts resolution might imply a network call, and we might make a new okhttp instance - HttpCall.Factory http() { + @Memoized // hosts resolution might imply a network call, and we might make a new okhttp instance + public HttpCall.Factory http() { List hosts = hostsSupplier().get(); if (hosts.isEmpty()) throw new IllegalArgumentException("no hosts configured"); OkHttpClient ok = diff --git a/zipkin-storage/elasticsearch/src/main/java/zipkin2/elasticsearch/IndexTemplates.java b/zipkin-storage/elasticsearch/src/main/java/zipkin2/elasticsearch/IndexTemplates.java index 12f2445fb53..60ed746e5bf 100644 --- a/zipkin-storage/elasticsearch/src/main/java/zipkin2/elasticsearch/IndexTemplates.java +++ b/zipkin-storage/elasticsearch/src/main/java/zipkin2/elasticsearch/IndexTemplates.java @@ -29,6 +29,15 @@ static Builder newBuilder() { abstract String autocomplete(); + /** + * This returns a delimiter based on what's supported by the Elasticsearch version. + * + *

See https://github.com/openzipkin/zipkin/issues/2219 + */ + char indexTypeDelimiter() { + return version() < 7 ? ':' : '-'; + } + @AutoValue.Builder interface Builder { Builder version(float version); diff --git a/zipkin-storage/elasticsearch/src/main/java/zipkin2/elasticsearch/VersionSpecificTemplates.java b/zipkin-storage/elasticsearch/src/main/java/zipkin2/elasticsearch/VersionSpecificTemplates.java index 122dedf2341..7639ec1c1cc 100644 --- a/zipkin-storage/elasticsearch/src/main/java/zipkin2/elasticsearch/VersionSpecificTemplates.java +++ b/zipkin-storage/elasticsearch/src/main/java/zipkin2/elasticsearch/VersionSpecificTemplates.java @@ -20,9 +20,9 @@ import okio.BufferedSource; import zipkin2.elasticsearch.internal.client.HttpCall; +import static zipkin2.elasticsearch.ElasticsearchAutocompleteTags.AUTOCOMPLETE; import static zipkin2.elasticsearch.ElasticsearchSpanStore.DEPENDENCY; import static zipkin2.elasticsearch.ElasticsearchSpanStore.SPAN; -import static zipkin2.elasticsearch.ElasticsearchAutocompleteTags.AUTOCOMPLETE; import static zipkin2.elasticsearch.internal.JsonReaders.enterPath; /** Returns a version-specific span and dependency index template */ @@ -178,7 +178,7 @@ String spanIndexTemplate() { + " \"index.number_of_shards\": ${__NUMBER_OF_SHARDS__},\n" + " \"index.number_of_replicas\": ${__NUMBER_OF_REPLICAS__},\n" + " \"index.requests.cache.enable\": true,\n" - + " \"index.mapper.dynamic\": true\n" + + " \"index.mapper.dynamic\": false\n" + " },\n" + " \"mappings\": {\"" + AUTOCOMPLETE @@ -225,8 +225,9 @@ public String toString() { } private String versionSpecificSpanIndexTemplate(float version) { + String result; if (version >= 2 && version < 3) { - return spanIndexTemplate + result = spanIndexTemplate .replace("TEMPLATE", "template") .replace("STRING", "string") .replace("DISABLE_ALL", "\"_all\": {\"enabled\": false}" + (searchEnabled ? ",\n" : "")) @@ -234,7 +235,7 @@ private String versionSpecificSpanIndexTemplate(float version) { "KEYWORD", "\"type\": \"string\", \"norms\": {\"enabled\": false }, \"index\": \"not_analyzed\""); } else if (version >= 5) { - return spanIndexTemplate + result = spanIndexTemplate .replace("TEMPLATE", version >= 6 ? "index_patterns" : "template") .replace("STRING", "text") .replace("DISABLE_ALL", "") // _all isn't supported in 6.x anyway @@ -243,29 +244,37 @@ private String versionSpecificSpanIndexTemplate(float version) { "\"analyzer\": \"traceId_analyzer\" }", "\"fielddata\": \"true\", \"analyzer\": \"traceId_analyzer\" }"); } else { - throw new IllegalStateException( - "Elasticsearch 2.x, 5.x and 6.x are supported, was: " + version); + throw new IllegalStateException("Elasticsearch 2-7.x are supported, was: " + version); } + return maybeReviseFor7x(version, result); } private String versionSpecificDependencyLinkIndexTemplate(float version) { - return dependencyIndexTemplate.replace( - "TEMPLATE", version >= 6 ? "index_patterns" : "template"); + String result = dependencyIndexTemplate.replace( + "TEMPLATE", version >= 6 ? "index_patterns" : "template"); + return maybeReviseFor7x(version, result); } + private String versionSpecificAutocompleteIndexTemplate(float version) { + String result; if (version >= 2 && version < 3) { - return autocompleteIndexTemplate + result = autocompleteIndexTemplate .replace("TEMPLATE", "template") .replace("KEYWORD", "\"type\": \"string\", \"norms\": {\"enabled\": false }, \"index\": " + "\"not_analyzed\""); } else if (version >= 5) { - return autocompleteIndexTemplate + result = autocompleteIndexTemplate .replace("TEMPLATE", version >= 6 ? "index_patterns" : "template") .replace("KEYWORD", "\"type\": \"keyword\",\"norms\": false\n"); - }else { - throw new IllegalStateException( - "Elasticsearch 2.x, 5.x and 6.x are supported, was: " + version); + } else { + throw new IllegalStateException("Elasticsearch 2-7.x are supported, was: " + version); } + return maybeReviseFor7x(version, result); + } + + private String maybeReviseFor7x(float version, String result) { + if (version >= 7) return result.replaceAll(",\n +\"index\\.mapper\\.dynamic\": false", ""); + return result; } } diff --git a/zipkin-storage/elasticsearch/src/main/java/zipkin2/elasticsearch/internal/IndexNameFormatter.java b/zipkin-storage/elasticsearch/src/main/java/zipkin2/elasticsearch/internal/IndexNameFormatter.java index b464caf22b5..124ee357f77 100644 --- a/zipkin-storage/elasticsearch/src/main/java/zipkin2/elasticsearch/internal/IndexNameFormatter.java +++ b/zipkin-storage/elasticsearch/src/main/java/zipkin2/elasticsearch/internal/IndexNameFormatter.java @@ -1,5 +1,5 @@ /* - * Copyright 2015-2018 The OpenZipkin Authors + * Copyright 2015-2019 The OpenZipkin Authors * * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except * in compliance with the License. You may obtain a copy of the License at @@ -154,12 +154,19 @@ static GregorianCalendar midnightUTC(long epochMillis) { return result; } + /** On insert, require a version-specific index-type delimiter as ES 7+ dropped colons */ + public String formatTypeAndTimestampForInsert(String type, char delimiter, long timestampMillis) { + return index() + delimiter + type + '-' + dateFormat().get().format(new Date(timestampMillis)); + } + public String formatTypeAndTimestamp(@Nullable String type, long timestampMillis) { return prefix(type) + "-" + dateFormat().get().format(new Date(timestampMillis)); } private String prefix(@Nullable String type) { - return type != null ? index() + ":" + type : index(); + // We use single-character wildcard here in order to read both : and - as starting in ES 7, : + // is no longer permitted. + return type != null ? index() + "*" + type : index(); } // for testing diff --git a/zipkin-storage/elasticsearch/src/test/java/zipkin2/elasticsearch/InternalForTests.java b/zipkin-storage/elasticsearch/src/test/java/zipkin2/elasticsearch/InternalForTests.java index 6092517c854..e7a8c5ac5cb 100644 --- a/zipkin-storage/elasticsearch/src/test/java/zipkin2/elasticsearch/InternalForTests.java +++ b/zipkin-storage/elasticsearch/src/test/java/zipkin2/elasticsearch/InternalForTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2015-2018 The OpenZipkin Authors + * Copyright 2015-2019 The OpenZipkin Authors * * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except * in compliance with the License. You may obtain a copy of the License at @@ -24,8 +24,8 @@ public class InternalForTests { public static void writeDependencyLinks(ElasticsearchStorage es, List links, long midnightUTC) { - String index = - es.indexNameFormatter().formatTypeAndTimestamp("dependency", midnightUTC); + String index = ((ElasticsearchSpanConsumer) es.spanConsumer()) + .formatTypeAndTimestampForInsert("dependency", midnightUTC); HttpBulkIndexer indexer = new HttpBulkIndexer("indexlinks", es); for (DependencyLink link : links) { byte[] document = DependencyLinkBytesEncoder.JSON_V1.encode(link); diff --git a/zipkin-storage/elasticsearch/src/test/java/zipkin2/elasticsearch/integration/ITElasticsearchStorageV7.java b/zipkin-storage/elasticsearch/src/test/java/zipkin2/elasticsearch/integration/ITElasticsearchStorageV7.java new file mode 100644 index 00000000000..a2f9789de57 --- /dev/null +++ b/zipkin-storage/elasticsearch/src/test/java/zipkin2/elasticsearch/integration/ITElasticsearchStorageV7.java @@ -0,0 +1,143 @@ +/* + * Copyright 2015-2019 The OpenZipkin Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package zipkin2.elasticsearch.integration; + +import java.io.IOException; +import java.util.List; +import org.junit.Before; +import org.junit.ClassRule; +import org.junit.Ignore; +import org.junit.Rule; +import org.junit.Test; +import org.junit.experimental.runners.Enclosed; +import org.junit.rules.TestName; +import org.junit.runner.RunWith; +import zipkin2.Span; +import zipkin2.elasticsearch.ElasticsearchStorage; +import zipkin2.elasticsearch.InternalForTests; +import zipkin2.storage.StorageComponent; + +import static zipkin2.elasticsearch.integration.ElasticsearchStorageRule.index; + +@RunWith(Enclosed.class) +public class ITElasticsearchStorageV7 { + + static ElasticsearchStorageRule classRule() { + return new ElasticsearchStorageRule("openzipkin/zipkin-elasticsearch7:2.12.1", + "test_elasticsearch3"); + } + + public static class ITSpanStore extends zipkin2.storage.ITSpanStore { + @ClassRule public static ElasticsearchStorageRule backend = classRule(); + @Rule public TestName testName = new TestName(); + + ElasticsearchStorage storage; + + @Before public void connect() { + storage = backend.computeStorageBuilder().index(index(testName)).build(); + } + + @Override protected StorageComponent storage() { + return storage; + } + + // we don't map this in elasticsearch + @Test @Ignore @Override public void getSpanNames_mapsNameToRemoteServiceName() { + } + + @Before @Override public void clear() throws IOException { + storage.clear(); + } + } + + public static class ITSearchEnabledFalse extends zipkin2.storage.ITSearchEnabledFalse { + @ClassRule public static ElasticsearchStorageRule backend = classRule(); + @Rule public TestName testName = new TestName(); + + ElasticsearchStorage storage; + + @Before public void connect() { + storage = backend.computeStorageBuilder().index(index(testName)) + .searchEnabled(false).build(); + } + + @Override protected StorageComponent storage() { + return storage; + } + + @Before @Override public void clear() throws IOException { + storage.clear(); + } + } + + public static class ITAutocompleteTags extends zipkin2.storage.ITAutocompleteTags { + @ClassRule public static ElasticsearchStorageRule backend = classRule(); + @Rule public TestName testName = new TestName(); + + @Override protected StorageComponent.Builder storageBuilder() { + return backend.computeStorageBuilder().index(index(testName)); + } + + @Before @Override public void clear() throws IOException { + ((ElasticsearchStorage) storage).clear(); + } + } + + public static class ITStrictTraceIdFalse extends zipkin2.storage.ITStrictTraceIdFalse { + @ClassRule public static ElasticsearchStorageRule backend = classRule(); + @Rule public TestName testName = new TestName(); + + ElasticsearchStorage storage; + + @Before public void connect() { + storage = backend.computeStorageBuilder().index(index(testName)).strictTraceId(false).build(); + } + + @Override protected StorageComponent storage() { + return storage; + } + + @Before @Override public void clear() throws IOException { + storage.clear(); + } + } + + public static class ITDependencies extends zipkin2.storage.ITDependencies { + @ClassRule public static ElasticsearchStorageRule backend = classRule(); + @Rule public TestName testName = new TestName(); + + ElasticsearchStorage storage; + + @Before public void connect() { + storage = backend.computeStorageBuilder().index(index(testName)).build(); + } + + @Override protected StorageComponent storage() { + return storage; + } + + /** + * The current implementation does not include dependency aggregation. It includes retrieval of + * pre-aggregated links, usually made via zipkin-dependencies + */ + @Override protected void processDependencies(List spans) throws Exception { + aggregateLinks(spans).forEach( + (midnight, links) -> InternalForTests.writeDependencyLinks(storage, links, midnight)); + } + + @Before @Override public void clear() throws IOException { + storage.clear(); + } + } +}