Skip to content

Commit

Permalink
Adds Elasticsearch support for Autocomplete tags
Browse files Browse the repository at this point in the history
  • Loading branch information
zeagord authored and Adrian Cole committed Dec 18, 2018
1 parent 0efe65a commit 6cc5a05
Show file tree
Hide file tree
Showing 15 changed files with 335 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
*/
package zipkin2.autoconfigure.storage.elasticsearch;

import java.util.List;
import java.util.logging.Logger;
import okhttp3.Interceptor;
import okhttp3.OkHttpClient;
Expand Down Expand Up @@ -65,12 +66,14 @@ ElasticsearchStorage.Builder esHttpBuilder(
@Qualifier("zipkinElasticsearchHttp") OkHttpClient client,
@Value("${zipkin.query.lookback:86400000}") int namesLookback,
@Value("${zipkin.storage.strict-trace-id:true}") boolean strictTraceId,
@Value("${zipkin.storage.search-enabled:true}") boolean searchEnabled) {
@Value("${zipkin.storage.search-enabled:true}") boolean searchEnabled,
@Value("${zipkin.storage.autocomplete-keys:}") List<String> autocompleteKeys) {
return elasticsearch
.toBuilder(client)
.namesLookback(namesLookback)
.strictTraceId(strictTraceId)
.searchEnabled(searchEnabled);
.searchEnabled(searchEnabled)
.autocompleteKeys(autocompleteKeys);
}

static final class HttpLoggingSet implements Condition {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -376,6 +376,20 @@ public void searchEnabled_false() {
assertThat(context.getBean(ElasticsearchStorage.class).searchEnabled()).isFalse();
}

@Test
public void autocompleteKeys_list() {
context = new AnnotationConfigApplicationContext();
TestPropertyValues.of(
"zipkin.storage.type:elasticsearch",
"zipkin.storage.autocomplete-keys:environment")
.applyTo(context);
Access.registerElasticsearchHttp(context);
context.refresh();

assertThat(context.getBean(ElasticsearchStorage.class).autocompleteKeys())
.containsOnly("environment");
}

ElasticsearchStorage es() {
return context.getBean(ElasticsearchStorage.class);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
/*
* Copyright 2015-2018 The OpenZipkin Authors
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package zipkin2.elasticsearch;

import java.util.List;
import java.util.Locale;
import zipkin2.Call;
import zipkin2.elasticsearch.internal.IndexNameFormatter;
import zipkin2.elasticsearch.internal.client.Aggregation;
import zipkin2.elasticsearch.internal.client.SearchCallFactory;
import zipkin2.elasticsearch.internal.client.SearchRequest;
import zipkin2.storage.AutocompleteTags;

final class ElasticsearchAutocompleteTags implements AutocompleteTags {
static final String AUTOCOMPLETE = "autocomplete";

final boolean enabled;
final IndexNameFormatter indexNameFormatter;
final SearchCallFactory search;
final int namesLookback;
final Call<List<String>> keysCall;

ElasticsearchAutocompleteTags(ElasticsearchStorage es) {
this.search = new SearchCallFactory(es.http());
this.indexNameFormatter = es.indexNameFormatter();
this.enabled = es.searchEnabled() && !es.autocompleteKeys().isEmpty();
this.namesLookback = es.namesLookback();
this.keysCall = Call.create(es.autocompleteKeys());
}

@Override public Call<List<String>> getKeys() {
if (!enabled) return Call.emptyList();
return keysCall.clone();
}

@Override public Call<List<String>> getValues(String key) {
if (key == null) throw new NullPointerException("key == null");
if (key.isEmpty()) throw new IllegalArgumentException("key was empty");
if (!enabled) return Call.emptyList();

long endMillis = System.currentTimeMillis();
long beginMillis = endMillis - namesLookback;
List<String> indices =
indexNameFormatter.formatTypeAndRange(AUTOCOMPLETE, beginMillis, endMillis);

if (indices.isEmpty()) return Call.emptyList();

SearchRequest.Filters filters =
new SearchRequest.Filters().addTerm("tagkey", key.toLowerCase(Locale.ROOT));

SearchRequest request = SearchRequest.create(indices)
.filters(filters)
.addAggregation(Aggregation.terms("tagvalue", Integer.MAX_VALUE));
return search.newCall(request, BodyConverters.KEYS);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,10 @@

import com.squareup.moshi.JsonWriter;
import java.io.IOException;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.logging.Level;
import java.util.logging.Logger;
Expand All @@ -31,6 +33,8 @@
import zipkin2.elasticsearch.internal.client.HttpCall;
import zipkin2.storage.SpanConsumer;

import static zipkin2.elasticsearch.ElasticsearchAutocompleteTags.AUTOCOMPLETE;

class ElasticsearchSpanConsumer implements SpanConsumer { // not final for testing
static final Logger LOG = Logger.getLogger(ElasticsearchSpanConsumer.class.getName());

Expand Down Expand Up @@ -66,18 +70,21 @@ void indexSpans(BulkSpanIndexer indexer, List<Span> spans) {
if (indexTimestamp == 0L) indexTimestamp = System.currentTimeMillis();
}
indexer.add(indexTimestamp, span, spanTimestamp);
indexer.addTag(indexTimestamp, span);
}
}

static final class BulkSpanIndexer {
final HttpBulkIndexer indexer;
final IndexNameFormatter indexNameFormatter;
final boolean searchEnabled;
final Set<String> autocompleteKeys;

BulkSpanIndexer(ElasticsearchStorage es) {
this.indexer = new HttpBulkIndexer("index-span", es);
this.indexNameFormatter = es.indexNameFormatter();
this.searchEnabled = es.searchEnabled();
this.autocompleteKeys = new LinkedHashSet<>(es.autocompleteKeys());
}

void add(long indexTimestamp, Span span, long timestampMillis) {
Expand All @@ -91,6 +98,37 @@ void add(long indexTimestamp, Span span, long timestampMillis) {
index, ElasticsearchSpanStore.SPAN, document, null /* Allow ES to choose an ID */);
}

void addTag(long indexTimestamp, Span span) {
if (span.tags().isEmpty()) return;
try {
Buffer query = new Buffer();
for (Map.Entry<String, String> tag : span.tags().entrySet()) {
// If the autocomplete whitelist doesn't contain the key, skip storing its value
if (!autocompleteKeys.contains(tag.getKey())) continue;

JsonWriter writer = JsonWriter.of(query);
writer.beginObject();
writer.name("tagkey");
writer.value(tag.getKey());
writer.name("tagvalue");
writer.value(tag.getValue());
writer.endObject();
String index = indexNameFormatter.formatTypeAndTimestamp(AUTOCOMPLETE, indexTimestamp);
byte[] document = query.readByteArray();
query.clear();
// Id of the document will be combination of {key,value} so that duplicate autocomplete
// keys can be avoided
indexer.add(index, AUTOCOMPLETE, document, tag.getKey() + "|" + tag.getValue());
}
} catch (IOException e) {
// very unexpected to have an IOE for an in-memory write
assert false : "Error indexing autocomplete tags for span: " + span;
if (LOG.isLoggable(Level.FINE)) {
LOG.log(Level.FINE, "Error indexing autocomplete tags for span: " + span, e);
}
}
}

HttpCall<Void> newCall() {
return indexer.newCall();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,11 @@

import static java.util.Arrays.asList;

final class ElasticsearchSpanStore implements SpanStore {
final class ElasticsearchSpanStore implements SpanStore{

static final String SPAN = "span";
static final String DEPENDENCY = "dependency";

/** To not produce unnecessarily long queries, we don't look back further than first ES support */
static final long EARLIEST_MS = 1456790400000L; // March 2016

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import com.google.auto.value.extension.memoized.Memoized;
import com.squareup.moshi.JsonReader;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.LinkedHashSet;
import java.util.List;
Expand All @@ -36,9 +37,11 @@
import zipkin2.storage.SpanConsumer;
import zipkin2.storage.SpanStore;
import zipkin2.storage.StorageComponent;
import zipkin2.storage.AutocompleteTags;

import static zipkin2.elasticsearch.ElasticsearchSpanStore.DEPENDENCY;
import static zipkin2.elasticsearch.ElasticsearchSpanStore.SPAN;
import static zipkin2.elasticsearch.ElasticsearchAutocompleteTags.AUTOCOMPLETE;
import static zipkin2.elasticsearch.internal.JsonReaders.enterPath;

@AutoValue
Expand Down Expand Up @@ -67,7 +70,8 @@ public static Builder newBuilder(OkHttpClient client) {
.indexReplicas(1)
.namesLookback(86400000)
.shutdownClientOnClose(false)
.flushOnWrites(false);
.flushOnWrites(false)
.autocompleteKeys(new ArrayList<>());
}

public static Builder newBuilder() {
Expand Down Expand Up @@ -186,6 +190,9 @@ public final Builder dateSeparator(char dateSeparator) {
@Override
public abstract Builder searchEnabled(boolean searchEnabled);

@Override
public abstract Builder autocompleteKeys(List<String> keys);

@Override
public abstract ElasticsearchStorage build();

Expand All @@ -211,6 +218,8 @@ public final Builder dateSeparator(char dateSeparator) {

abstract boolean searchEnabled();

abstract List<String> autocompleteKeys();

abstract int indexShards();

abstract int indexReplicas();
Expand All @@ -225,6 +234,12 @@ public SpanStore spanStore() {
return new ElasticsearchSpanStore(this);
}

@Override
public AutocompleteTags autocompleteTags() {
ensureIndexTemplates();
return new ElasticsearchAutocompleteTags(this);
}

@Override
public SpanConsumer spanConsumer() {
ensureIndexTemplates();
Expand Down Expand Up @@ -321,6 +336,8 @@ IndexTemplates ensureIndexTemplates() {
EnsureIndexTemplate.apply(http(), index + ":" + SPAN + "_template", templates.span());
EnsureIndexTemplate.apply(
http(), index + ":" + DEPENDENCY + "_template", templates.dependency());
EnsureIndexTemplate.apply(
http(), index + ":" + AUTOCOMPLETE + "_template", templates.tag());
return templates;
} catch (IOException e) {
throw Platform.get().uncheckedIOException(e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ static Builder newBuilder() {

abstract String dependency();

abstract String tag();

@AutoValue.Builder
interface Builder {
Builder version(float version);
Expand All @@ -35,6 +37,8 @@ interface Builder {

Builder dependency(String dependency);

Builder tag(String tag);

IndexTemplates build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@

import static zipkin2.elasticsearch.ElasticsearchSpanStore.DEPENDENCY;
import static zipkin2.elasticsearch.ElasticsearchSpanStore.SPAN;
import static zipkin2.elasticsearch.ElasticsearchAutocompleteTags.AUTOCOMPLETE;
import static zipkin2.elasticsearch.internal.JsonReaders.enterPath;

/** Returns a version-specific span and dependency index template */
Expand All @@ -31,6 +32,7 @@ final class VersionSpecificTemplates {
final boolean searchEnabled;
final String spanIndexTemplate;
final String dependencyIndexTemplate;
final String tagIndexTemplate;

VersionSpecificTemplates(ElasticsearchStorage es) {
this.searchEnabled = es.searchEnabled();
Expand All @@ -49,6 +51,10 @@ final class VersionSpecificTemplates {
.replace("${__INDEX__}", es.indexNameFormatter().index())
.replace("${__NUMBER_OF_SHARDS__}", String.valueOf(es.indexShards()))
.replace("${__NUMBER_OF_REPLICAS__}", String.valueOf(es.indexReplicas()));
this.tagIndexTemplate = TAG_INDEX_TEMPLATE
.replace("${__INDEX__}", es.indexNameFormatter().index())
.replace("${__NUMBER_OF_SHARDS__}", String.valueOf(es.indexShards()))
.replace("${__NUMBER_OF_REPLICAS__}", String.valueOf(es.indexReplicas()));
}

/** Templatized due to version differences. Only fields used in search are declared */
Expand Down Expand Up @@ -161,12 +167,34 @@ String spanIndexTemplate() {
+ "\": { \"enabled\": false }}\n"
+ "}";

// The key filed of a autocompleteKeys is intentionally names as tagkey since it clashes with the
// BodyConverters KEY
static final String TAG_INDEX_TEMPLATE =
"{\n"
+ " \"TEMPLATE\": \"${__INDEX__}:"
+ AUTOCOMPLETE
+ "-*\",\n"
+ " \"settings\": {\n"
+ " \"index.number_of_shards\": ${__NUMBER_OF_SHARDS__},\n"
+ " \"index.number_of_replicas\": ${__NUMBER_OF_REPLICAS__},\n"
+ " \"index.requests.cache.enable\": true,\n"
+ " \"index.mapper.dynamic\": true\n"
+ " },\n"
+ " \"mappings\": {\""
+ AUTOCOMPLETE
+ "\": { \"enabled\": true,\n"
+ " \t\"properties\": {\n"
+ " \"tagkey\": { KEYWORD },\n"
+ " \"tagvalue\": { KEYWORD }\n"
+ " }}}\n"
+ "}";
IndexTemplates get(HttpCall.Factory callFactory) throws IOException {
float version = getVersion(callFactory);
return IndexTemplates.newBuilder()
.version(version)
.span(versionSpecificSpanIndexTemplate(version))
.dependency(versionSpecificDependencyLinkIndexTemplate(version))
.tag(versionSpecificTagIndexTemplate(version))
.build();
}

Expand Down Expand Up @@ -224,4 +252,20 @@ private String versionSpecificDependencyLinkIndexTemplate(float version) {
return dependencyIndexTemplate.replace(
"TEMPLATE", version >= 6 ? "index_patterns" : "template");
}
private String versionSpecificTagIndexTemplate(float version) {
if (version >= 2 && version < 3) {
return tagIndexTemplate
.replace("TEMPLATE", "template")
.replace("KEYWORD", "\"type\": \"string\", \"norms\": {\"enabled\": false }, \"index\": "
+ "\"not_analyzed\"");
} else if (version >= 5) {
return tagIndexTemplate
.replace("TEMPLATE", version >= 6 ? "index_patterns" : "template")
.replace("KEYWORD", "\"type\": \"text\",\"fielddata\": true\n");
}else {
throw new IllegalStateException(
"Elasticsearch 2.x, 5.x and 6.x are supported, was: " + version);
}
}
}

Loading

0 comments on commit 6cc5a05

Please sign in to comment.