Skip to content

Commit

Permalink
WIP: attempts support of Elasticsearch 7.x
Browse files Browse the repository at this point in the history
There are multiple issues to resolve, not just the colon banning.

See https://www.elastic.co/guide/en/elasticsearch/reference/7.x/breaking-changes-7.0.html
See #2219
  • Loading branch information
Adrian Cole committed Feb 18, 2019
1 parent 8fe80bf commit bec429b
Show file tree
Hide file tree
Showing 7 changed files with 207 additions and 27 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import zipkin2.elasticsearch.internal.HttpBulkIndexer;
import zipkin2.elasticsearch.internal.IndexNameFormatter;
import zipkin2.internal.DelayLimiter;
import zipkin2.internal.Nullable;
import zipkin2.storage.SpanConsumer;

import static zipkin2.elasticsearch.ElasticsearchAutocompleteTags.AUTOCOMPLETE;
Expand All @@ -47,19 +48,26 @@ class ElasticsearchSpanConsumer implements SpanConsumer { // not final for testi
final ElasticsearchStorage es;
final Set<String> autocompleteKeys;
final IndexNameFormatter indexNameFormatter;
final char indexTypeDelimiter;
final boolean searchEnabled;
final DelayLimiter<AutocompleteContext> delayLimiter;

ElasticsearchSpanConsumer(ElasticsearchStorage es) {
this.es = es;
this.autocompleteKeys = new LinkedHashSet<>(es.autocompleteKeys());
this.indexNameFormatter = es.indexNameFormatter();
this.indexTypeDelimiter = es.indexTypeDelimiter();
this.searchEnabled = es.searchEnabled();
this.delayLimiter = DelayLimiter.newBuilder()
.ttl(es.autocompleteTtl())
.cardinality(es.autocompleteCardinality()).build();
}

String formatTypeAndTimestampForInsert(String type, long timestampMillis) {
return indexNameFormatter.formatTypeAndTimestampForInsert(type, indexTypeDelimiter,
timestampMillis);
}

@Override public Call<Void> accept(List<Span> spans) {
if (spans.isEmpty()) return Call.create(null);
BulkSpanIndexer indexer = new BulkSpanIndexer(this);
Expand Down Expand Up @@ -101,16 +109,15 @@ static final class BulkSpanIndexer {
}

void add(long indexTimestamp, Span span, long timestampMillis) {
String index = consumer.indexNameFormatter
.formatTypeAndTimestamp(SPAN, indexTimestamp);
String index = consumer.formatTypeAndTimestampForInsert(SPAN, indexTimestamp);
byte[] document = consumer.searchEnabled
? prefixWithTimestampMillisAndQuery(span, timestampMillis)
: SpanBytesEncoder.JSON_V2.encode(span);
indexer.add(index, SPAN, document, null /* Allow ES to choose an ID */);
}

void addAutocompleteValues(long indexTimestamp, Span span) {
String idx = consumer.indexNameFormatter.formatTypeAndTimestamp(AUTOCOMPLETE, indexTimestamp);
String idx = consumer.formatTypeAndTimestampForInsert(AUTOCOMPLETE, indexTimestamp);
for (Map.Entry<String, String> tag : span.tags().entrySet()) {
int length = tag.getKey().length() + tag.getValue().length() + 1;
if (length > INDEX_CHARS_LIMIT) continue;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -266,6 +266,10 @@ public float version() {
return ensureIndexTemplates().version();
}

char indexTypeDelimiter() {
return ensureIndexTemplates().indexTypeDelimiter();
}

/** This is a blocking call, only used in tests. */
public void clear() throws IOException {
Set<String> toClear = new LinkedHashSet<>();
Expand Down Expand Up @@ -348,20 +352,21 @@ IndexTemplates ensureIndexTemplates() {
String index = indexNameFormatter().index();
try {
IndexTemplates templates = new VersionSpecificTemplates(this).get(http());
EnsureIndexTemplate.apply(http(), index + ":" + SPAN + "_template", templates.span());
char indexTypeDelimiter = templates.indexTypeDelimiter();
EnsureIndexTemplate.apply(
http(), index + indexTypeDelimiter + SPAN + "_template", templates.span());
EnsureIndexTemplate.apply(
http(), index + ":" + DEPENDENCY + "_template", templates.dependency());
http(), index + indexTypeDelimiter + DEPENDENCY + "_template", templates.dependency());
EnsureIndexTemplate.apply(
http(), index + ":" + AUTOCOMPLETE + "_template", templates.autocomplete());
http(), index + indexTypeDelimiter + AUTOCOMPLETE + "_template", templates.autocomplete());
return templates;
} catch (IOException e) {
throw Platform.get().uncheckedIOException(e);
}
}

@Memoized
public // hosts resolution might imply a network call, and we might make a new okhttp instance
HttpCall.Factory http() {
@Memoized // hosts resolution might imply a network call, and we might make a new okhttp instance
public HttpCall.Factory http() {
List<String> hosts = hostsSupplier().get();
if (hosts.isEmpty()) throw new IllegalArgumentException("no hosts configured");
OkHttpClient ok =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,15 @@ static Builder newBuilder() {

abstract String autocomplete();

/**
* This returns a delimiter based on what's supported by the Elasticsearch version.
*
* <p>See https://github.com/openzipkin/zipkin/issues/2219
*/
char indexTypeDelimiter() {
return version() < 7 ? ':' : '-';
}

@AutoValue.Builder
interface Builder {
Builder version(float version);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,9 @@
import okio.BufferedSource;
import zipkin2.elasticsearch.internal.client.HttpCall;

import static zipkin2.elasticsearch.ElasticsearchAutocompleteTags.AUTOCOMPLETE;
import static zipkin2.elasticsearch.ElasticsearchSpanStore.DEPENDENCY;
import static zipkin2.elasticsearch.ElasticsearchSpanStore.SPAN;
import static zipkin2.elasticsearch.ElasticsearchAutocompleteTags.AUTOCOMPLETE;
import static zipkin2.elasticsearch.internal.JsonReaders.enterPath;

/** Returns a version-specific span and dependency index template */
Expand Down Expand Up @@ -178,7 +178,7 @@ String spanIndexTemplate() {
+ " \"index.number_of_shards\": ${__NUMBER_OF_SHARDS__},\n"
+ " \"index.number_of_replicas\": ${__NUMBER_OF_REPLICAS__},\n"
+ " \"index.requests.cache.enable\": true,\n"
+ " \"index.mapper.dynamic\": true\n"
+ " \"index.mapper.dynamic\": false\n"
+ " },\n"
+ " \"mappings\": {\""
+ AUTOCOMPLETE
Expand Down Expand Up @@ -225,16 +225,17 @@ public String toString() {
}

private String versionSpecificSpanIndexTemplate(float version) {
String result;
if (version >= 2 && version < 3) {
return spanIndexTemplate
result = spanIndexTemplate
.replace("TEMPLATE", "template")
.replace("STRING", "string")
.replace("DISABLE_ALL", "\"_all\": {\"enabled\": false}" + (searchEnabled ? ",\n" : ""))
.replace(
"KEYWORD",
"\"type\": \"string\", \"norms\": {\"enabled\": false }, \"index\": \"not_analyzed\"");
} else if (version >= 5) {
return spanIndexTemplate
result = spanIndexTemplate
.replace("TEMPLATE", version >= 6 ? "index_patterns" : "template")
.replace("STRING", "text")
.replace("DISABLE_ALL", "") // _all isn't supported in 6.x anyway
Expand All @@ -243,29 +244,37 @@ private String versionSpecificSpanIndexTemplate(float version) {
"\"analyzer\": \"traceId_analyzer\" }",
"\"fielddata\": \"true\", \"analyzer\": \"traceId_analyzer\" }");
} else {
throw new IllegalStateException(
"Elasticsearch 2.x, 5.x and 6.x are supported, was: " + version);
throw new IllegalStateException("Elasticsearch 2-7.x are supported, was: " + version);
}
return maybeReviseFor7x(version, result);
}

private String versionSpecificDependencyLinkIndexTemplate(float version) {
return dependencyIndexTemplate.replace(
"TEMPLATE", version >= 6 ? "index_patterns" : "template");
String result = dependencyIndexTemplate.replace(
"TEMPLATE", version >= 6 ? "index_patterns" : "template");
return maybeReviseFor7x(version, result);
}

private String versionSpecificAutocompleteIndexTemplate(float version) {
String result;
if (version >= 2 && version < 3) {
return autocompleteIndexTemplate
result = autocompleteIndexTemplate
.replace("TEMPLATE", "template")
.replace("KEYWORD", "\"type\": \"string\", \"norms\": {\"enabled\": false }, \"index\": "
+ "\"not_analyzed\"");
} else if (version >= 5) {
return autocompleteIndexTemplate
result = autocompleteIndexTemplate
.replace("TEMPLATE", version >= 6 ? "index_patterns" : "template")
.replace("KEYWORD", "\"type\": \"keyword\",\"norms\": false\n");
}else {
throw new IllegalStateException(
"Elasticsearch 2.x, 5.x and 6.x are supported, was: " + version);
} else {
throw new IllegalStateException("Elasticsearch 2-7.x are supported, was: " + version);
}
return maybeReviseFor7x(version, result);
}

private String maybeReviseFor7x(float version, String result) {
if (version >= 7) return result.replaceAll(",\n +\"index\\.mapper\\.dynamic\": false", "");
return result;
}
}

Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2015-2018 The OpenZipkin Authors
* Copyright 2015-2019 The OpenZipkin Authors
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
Expand Down Expand Up @@ -154,12 +154,19 @@ static GregorianCalendar midnightUTC(long epochMillis) {
return result;
}

/** On insert, require a version-specific index-type delimiter as ES 7+ dropped colons */
public String formatTypeAndTimestampForInsert(String type, char delimiter, long timestampMillis) {
return index() + delimiter + type + '-' + dateFormat().get().format(new Date(timestampMillis));
}

public String formatTypeAndTimestamp(@Nullable String type, long timestampMillis) {
return prefix(type) + "-" + dateFormat().get().format(new Date(timestampMillis));
}

private String prefix(@Nullable String type) {
return type != null ? index() + ":" + type : index();
// We use single-character wildcard here in order to read both : and - as starting in ES 7, :
// is no longer permitted.
return type != null ? index() + "*" + type : index();
}

// for testing
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2015-2018 The OpenZipkin Authors
* Copyright 2015-2019 The OpenZipkin Authors
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
Expand All @@ -24,8 +24,8 @@
public class InternalForTests {
public static void writeDependencyLinks(ElasticsearchStorage es, List<DependencyLink> links,
long midnightUTC) {
String index =
es.indexNameFormatter().formatTypeAndTimestamp("dependency", midnightUTC);
String index = ((ElasticsearchSpanConsumer) es.spanConsumer())
.formatTypeAndTimestampForInsert("dependency", midnightUTC);
HttpBulkIndexer indexer = new HttpBulkIndexer("indexlinks", es);
for (DependencyLink link : links) {
byte[] document = DependencyLinkBytesEncoder.JSON_V1.encode(link);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
/*
* Copyright 2015-2019 The OpenZipkin Authors
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package zipkin2.elasticsearch.integration;

import java.io.IOException;
import java.util.List;
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Ignore;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.runners.Enclosed;
import org.junit.rules.TestName;
import org.junit.runner.RunWith;
import zipkin2.Span;
import zipkin2.elasticsearch.ElasticsearchStorage;
import zipkin2.elasticsearch.InternalForTests;
import zipkin2.storage.StorageComponent;

import static zipkin2.elasticsearch.integration.ElasticsearchStorageRule.index;

@RunWith(Enclosed.class)
public class ITElasticsearchStorageV7 {

static ElasticsearchStorageRule classRule() {
return new ElasticsearchStorageRule("openzipkin/zipkin-elasticsearch7:2.12.1",
"test_elasticsearch3");
}

public static class ITSpanStore extends zipkin2.storage.ITSpanStore {
@ClassRule public static ElasticsearchStorageRule backend = classRule();
@Rule public TestName testName = new TestName();

ElasticsearchStorage storage;

@Before public void connect() {
storage = backend.computeStorageBuilder().index(index(testName)).build();
}

@Override protected StorageComponent storage() {
return storage;
}

// we don't map this in elasticsearch
@Test @Ignore @Override public void getSpanNames_mapsNameToRemoteServiceName() {
}

@Before @Override public void clear() throws IOException {
storage.clear();
}
}

public static class ITSearchEnabledFalse extends zipkin2.storage.ITSearchEnabledFalse {
@ClassRule public static ElasticsearchStorageRule backend = classRule();
@Rule public TestName testName = new TestName();

ElasticsearchStorage storage;

@Before public void connect() {
storage = backend.computeStorageBuilder().index(index(testName))
.searchEnabled(false).build();
}

@Override protected StorageComponent storage() {
return storage;
}

@Before @Override public void clear() throws IOException {
storage.clear();
}
}

public static class ITAutocompleteTags extends zipkin2.storage.ITAutocompleteTags {
@ClassRule public static ElasticsearchStorageRule backend = classRule();
@Rule public TestName testName = new TestName();

@Override protected StorageComponent.Builder storageBuilder() {
return backend.computeStorageBuilder().index(index(testName));
}

@Before @Override public void clear() throws IOException {
((ElasticsearchStorage) storage).clear();
}
}

public static class ITStrictTraceIdFalse extends zipkin2.storage.ITStrictTraceIdFalse {
@ClassRule public static ElasticsearchStorageRule backend = classRule();
@Rule public TestName testName = new TestName();

ElasticsearchStorage storage;

@Before public void connect() {
storage = backend.computeStorageBuilder().index(index(testName)).strictTraceId(false).build();
}

@Override protected StorageComponent storage() {
return storage;
}

@Before @Override public void clear() throws IOException {
storage.clear();
}
}

public static class ITDependencies extends zipkin2.storage.ITDependencies {
@ClassRule public static ElasticsearchStorageRule backend = classRule();
@Rule public TestName testName = new TestName();

ElasticsearchStorage storage;

@Before public void connect() {
storage = backend.computeStorageBuilder().index(index(testName)).build();
}

@Override protected StorageComponent storage() {
return storage;
}

/**
* The current implementation does not include dependency aggregation. It includes retrieval of
* pre-aggregated links, usually made via zipkin-dependencies
*/
@Override protected void processDependencies(List<Span> spans) throws Exception {
aggregateLinks(spans).forEach(
(midnight, links) -> InternalForTests.writeDependencyLinks(storage, links, midnight));
}

@Before @Override public void clear() throws IOException {
storage.clear();
}
}
}

0 comments on commit bec429b

Please sign in to comment.