Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adds Elasticsearch support for Autocomplete tags #2333

Merged
merged 3 commits into from
Jan 3, 2019
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
*/
package zipkin2.autoconfigure.storage.elasticsearch;

import java.util.List;
import java.util.logging.Logger;
import okhttp3.Interceptor;
import okhttp3.OkHttpClient;
Expand Down Expand Up @@ -65,12 +66,14 @@ ElasticsearchStorage.Builder esHttpBuilder(
@Qualifier("zipkinElasticsearchHttp") OkHttpClient client,
@Value("${zipkin.query.lookback:86400000}") int namesLookback,
@Value("${zipkin.storage.strict-trace-id:true}") boolean strictTraceId,
@Value("${zipkin.storage.search-enabled:true}") boolean searchEnabled) {
@Value("${zipkin.storage.search-enabled:true}") boolean searchEnabled,
@Value("${zipkin.storage.autocomplete-keys:}") List<String> autocompleteKeys) {
return elasticsearch
.toBuilder(client)
.namesLookback(namesLookback)
.strictTraceId(strictTraceId)
.searchEnabled(searchEnabled);
.searchEnabled(searchEnabled)
.autocompleteKeys(autocompleteKeys);
}

static final class HttpLoggingSet implements Condition {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -376,6 +376,20 @@ public void searchEnabled_false() {
assertThat(context.getBean(ElasticsearchStorage.class).searchEnabled()).isFalse();
}

@Test
public void autocompleteKeys_list() {
context = new AnnotationConfigApplicationContext();
TestPropertyValues.of(
"zipkin.storage.type:elasticsearch",
"zipkin.storage.autocomplete-keys:environment")
.applyTo(context);
Access.registerElasticsearchHttp(context);
context.refresh();

assertThat(context.getBean(ElasticsearchStorage.class).autocompleteKeys())
.containsOnly("environment");
}

ElasticsearchStorage es() {
return context.getBean(ElasticsearchStorage.class);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
/*
* Copyright 2015-2018 The OpenZipkin Authors
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package zipkin2.elasticsearch;

import java.util.List;
import java.util.Locale;
import zipkin2.Call;
import zipkin2.elasticsearch.internal.IndexNameFormatter;
import zipkin2.elasticsearch.internal.client.Aggregation;
import zipkin2.elasticsearch.internal.client.SearchCallFactory;
import zipkin2.elasticsearch.internal.client.SearchRequest;
import zipkin2.storage.AutocompleteTags;

final class ElasticsearchAutocompleteTags implements AutocompleteTags {
static final String AUTOCOMPLETE = "autocomplete";

final boolean enabled;
final IndexNameFormatter indexNameFormatter;
final SearchCallFactory search;
final int namesLookback;
final Call<List<String>> keysCall;

ElasticsearchAutocompleteTags(ElasticsearchStorage es) {
this.search = new SearchCallFactory(es.http());
this.indexNameFormatter = es.indexNameFormatter();
this.enabled = es.searchEnabled() && !es.autocompleteKeys().isEmpty();
this.namesLookback = es.namesLookback();
this.keysCall = Call.create(es.autocompleteKeys());
}

@Override public Call<List<String>> getKeys() {
if (!enabled) return Call.emptyList();
return keysCall.clone();
}

@Override public Call<List<String>> getValues(String key) {
if (key == null) throw new NullPointerException("key == null");
if (key.isEmpty()) throw new IllegalArgumentException("key was empty");
if (!enabled) return Call.emptyList();

long endMillis = System.currentTimeMillis();
long beginMillis = endMillis - namesLookback;
List<String> indices =
indexNameFormatter.formatTypeAndRange(AUTOCOMPLETE, beginMillis, endMillis);

if (indices.isEmpty()) return Call.emptyList();

SearchRequest.Filters filters =
new SearchRequest.Filters().addTerm("tagKey", key.toLowerCase(Locale.ROOT));

SearchRequest request = SearchRequest.create(indices)
.filters(filters)
.addAggregation(Aggregation.terms("tagValue", Integer.MAX_VALUE));
return search.newCall(request, BodyConverters.KEYS);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,10 @@

import com.squareup.moshi.JsonWriter;
import java.io.IOException;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.logging.Level;
import java.util.logging.Logger;
Expand All @@ -31,6 +33,8 @@
import zipkin2.elasticsearch.internal.client.HttpCall;
import zipkin2.storage.SpanConsumer;

import static zipkin2.elasticsearch.ElasticsearchAutocompleteTags.AUTOCOMPLETE;

class ElasticsearchSpanConsumer implements SpanConsumer { // not final for testing
static final Logger LOG = Logger.getLogger(ElasticsearchSpanConsumer.class.getName());

Expand Down Expand Up @@ -66,18 +70,21 @@ void indexSpans(BulkSpanIndexer indexer, List<Span> spans) {
if (indexTimestamp == 0L) indexTimestamp = System.currentTimeMillis();
}
indexer.add(indexTimestamp, span, spanTimestamp);
indexer.addTag(indexTimestamp, span);
}
}

static final class BulkSpanIndexer {
final HttpBulkIndexer indexer;
final IndexNameFormatter indexNameFormatter;
final boolean searchEnabled;
final Set<String> autocompleteKeys;

BulkSpanIndexer(ElasticsearchStorage es) {
this.indexer = new HttpBulkIndexer("index-span", es);
this.indexNameFormatter = es.indexNameFormatter();
this.searchEnabled = es.searchEnabled();
this.autocompleteKeys = new LinkedHashSet<>(es.autocompleteKeys());
}

void add(long indexTimestamp, Span span, long timestampMillis) {
Expand All @@ -91,6 +98,37 @@ void add(long indexTimestamp, Span span, long timestampMillis) {
index, ElasticsearchSpanStore.SPAN, document, null /* Allow ES to choose an ID */);
}

void addTag(long indexTimestamp, Span span) {
if (span.tags().isEmpty()) return;
try {
Buffer query = new Buffer();
for (Map.Entry<String, String> tag : span.tags().entrySet()) {
// If the autocomplete whitelist doesn't contain the key, skip storing its value
if (!autocompleteKeys.contains(tag.getKey())) continue;

JsonWriter writer = JsonWriter.of(query);
writer.beginObject();
writer.name("tagKey");
writer.value(tag.getKey());
writer.name("tagValue");
writer.value(tag.getValue());
writer.endObject();
String index = indexNameFormatter.formatTypeAndTimestamp(AUTOCOMPLETE, indexTimestamp);
byte[] document = query.readByteArray();
query.clear();
// Id of the document will be combination of {key,value} so that duplicate autocomplete
// keys can be avoided
indexer.add(index, AUTOCOMPLETE, document, tag.getKey() + "|" + tag.getValue());
}
} catch (IOException e) {
// very unexpected to have an IOE for an in-memory write
assert false : "Error indexing autocomplete tags for span: " + span;
if (LOG.isLoggable(Level.FINE)) {
LOG.log(Level.FINE, "Error indexing autocomplete tags for span: " + span, e);
}
}
}

HttpCall<Void> newCall() {
return indexer.newCall();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,11 @@

import static java.util.Arrays.asList;

final class ElasticsearchSpanStore implements SpanStore {
final class ElasticsearchSpanStore implements SpanStore{

static final String SPAN = "span";
static final String DEPENDENCY = "dependency";

/** To not produce unnecessarily long queries, we don't look back further than first ES support */
static final long EARLIEST_MS = 1456790400000L; // March 2016

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import com.google.auto.value.extension.memoized.Memoized;
import com.squareup.moshi.JsonReader;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.LinkedHashSet;
import java.util.List;
Expand All @@ -36,9 +37,11 @@
import zipkin2.storage.SpanConsumer;
import zipkin2.storage.SpanStore;
import zipkin2.storage.StorageComponent;
import zipkin2.storage.AutocompleteTags;

import static zipkin2.elasticsearch.ElasticsearchSpanStore.DEPENDENCY;
import static zipkin2.elasticsearch.ElasticsearchSpanStore.SPAN;
import static zipkin2.elasticsearch.ElasticsearchAutocompleteTags.AUTOCOMPLETE;
import static zipkin2.elasticsearch.internal.JsonReaders.enterPath;

@AutoValue
Expand Down Expand Up @@ -67,7 +70,8 @@ public static Builder newBuilder(OkHttpClient client) {
.indexReplicas(1)
.namesLookback(86400000)
.shutdownClientOnClose(false)
.flushOnWrites(false);
.flushOnWrites(false)
.autocompleteKeys(new ArrayList<>());
}

public static Builder newBuilder() {
Expand Down Expand Up @@ -186,6 +190,9 @@ public final Builder dateSeparator(char dateSeparator) {
@Override
public abstract Builder searchEnabled(boolean searchEnabled);

@Override
public abstract Builder autocompleteKeys(List<String> keys);

@Override
public abstract ElasticsearchStorage build();

Expand All @@ -211,6 +218,8 @@ public final Builder dateSeparator(char dateSeparator) {

abstract boolean searchEnabled();

abstract List<String> autocompleteKeys();

abstract int indexShards();

abstract int indexReplicas();
Expand All @@ -225,6 +234,12 @@ public SpanStore spanStore() {
return new ElasticsearchSpanStore(this);
}

@Override
public AutocompleteTags autocompleteTags() {
ensureIndexTemplates();
return new ElasticsearchAutocompleteTags(this);
}

@Override
public SpanConsumer spanConsumer() {
ensureIndexTemplates();
Expand Down Expand Up @@ -321,6 +336,8 @@ IndexTemplates ensureIndexTemplates() {
EnsureIndexTemplate.apply(http(), index + ":" + SPAN + "_template", templates.span());
EnsureIndexTemplate.apply(
http(), index + ":" + DEPENDENCY + "_template", templates.dependency());
EnsureIndexTemplate.apply(
http(), index + ":" + AUTOCOMPLETE + "_template", templates.autocomplete());
return templates;
} catch (IOException e) {
throw Platform.get().uncheckedIOException(e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ static Builder newBuilder() {

abstract String dependency();

abstract String autocomplete();

@AutoValue.Builder
interface Builder {
Builder version(float version);
Expand All @@ -35,6 +37,8 @@ interface Builder {

Builder dependency(String dependency);

Builder autocomplete(String autocomplete);

IndexTemplates build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@

import static zipkin2.elasticsearch.ElasticsearchSpanStore.DEPENDENCY;
import static zipkin2.elasticsearch.ElasticsearchSpanStore.SPAN;
import static zipkin2.elasticsearch.ElasticsearchAutocompleteTags.AUTOCOMPLETE;
import static zipkin2.elasticsearch.internal.JsonReaders.enterPath;

/** Returns a version-specific span and dependency index template */
Expand All @@ -31,6 +32,7 @@ final class VersionSpecificTemplates {
final boolean searchEnabled;
final String spanIndexTemplate;
final String dependencyIndexTemplate;
final String autocompleteIndexTemplate;

VersionSpecificTemplates(ElasticsearchStorage es) {
this.searchEnabled = es.searchEnabled();
Expand All @@ -49,6 +51,10 @@ final class VersionSpecificTemplates {
.replace("${__INDEX__}", es.indexNameFormatter().index())
.replace("${__NUMBER_OF_SHARDS__}", String.valueOf(es.indexShards()))
.replace("${__NUMBER_OF_REPLICAS__}", String.valueOf(es.indexReplicas()));
this.autocompleteIndexTemplate = AUTOCOMPLETE_INDEX_TEMPLATE
.replace("${__INDEX__}", es.indexNameFormatter().index())
.replace("${__NUMBER_OF_SHARDS__}", String.valueOf(es.indexShards()))
.replace("${__NUMBER_OF_REPLICAS__}", String.valueOf(es.indexReplicas()));
}

/** Templatized due to version differences. Only fields used in search are declared */
Expand Down Expand Up @@ -161,12 +167,34 @@ String spanIndexTemplate() {
+ "\": { \"enabled\": false }}\n"
+ "}";

// The key filed of a autocompleteKeys is intentionally names as tagKey since it clashes with the
// BodyConverters KEY
static final String AUTOCOMPLETE_INDEX_TEMPLATE =
"{\n"
+ " \"TEMPLATE\": \"${__INDEX__}:"
+ AUTOCOMPLETE
+ "-*\",\n"
+ " \"settings\": {\n"
+ " \"index.number_of_shards\": ${__NUMBER_OF_SHARDS__},\n"
+ " \"index.number_of_replicas\": ${__NUMBER_OF_REPLICAS__},\n"
+ " \"index.requests.cache.enable\": true,\n"
+ " \"index.mapper.dynamic\": true\n"
+ " },\n"
+ " \"mappings\": {\""
+ AUTOCOMPLETE
+ "\": { \"enabled\": true,\n"
+ " \t\"properties\": {\n"
+ " \"tagKey\": { KEYWORD },\n"
+ " \"tagValue\": { KEYWORD }\n"
+ " }}}\n"
+ "}";
IndexTemplates get(HttpCall.Factory callFactory) throws IOException {
float version = getVersion(callFactory);
return IndexTemplates.newBuilder()
.version(version)
.span(versionSpecificSpanIndexTemplate(version))
.dependency(versionSpecificDependencyLinkIndexTemplate(version))
.autocomplete(versionSpecificAutocompleteIndexTemplate(version))
.build();
}

Expand Down Expand Up @@ -224,4 +252,20 @@ private String versionSpecificDependencyLinkIndexTemplate(float version) {
return dependencyIndexTemplate.replace(
"TEMPLATE", version >= 6 ? "index_patterns" : "template");
}
private String versionSpecificAutocompleteIndexTemplate(float version) {
if (version >= 2 && version < 3) {
return autocompleteIndexTemplate
.replace("TEMPLATE", "template")
.replace("KEYWORD", "\"type\": \"string\", \"norms\": {\"enabled\": false }, \"index\": "
+ "\"not_analyzed\"");
} else if (version >= 5) {
return autocompleteIndexTemplate
.replace("TEMPLATE", version >= 6 ? "index_patterns" : "template")
.replace("KEYWORD", "\"type\": \"text\",\"fielddata\": true\n");
}else {
throw new IllegalStateException(
"Elasticsearch 2.x, 5.x and 6.x are supported, was: " + version);
}
}
}

Loading