Skip to content

Commit

Permalink
Supports Elasticsearch 7.x (#2398)
Browse files Browse the repository at this point in the history
Fixes #2219
  • Loading branch information
adriancole authored May 3, 2019
1 parent f2d9cc8 commit 762a795
Show file tree
Hide file tree
Showing 16 changed files with 411 additions and 218 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -38,18 +38,15 @@ public class ZipkinElasticsearchStorageAutoConfigurationTest {

@Rule public ExpectedException thrown = ExpectedException.none();

AnnotationConfigApplicationContext context;
final AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext();

@After
public void close() {
if (context != null) {
context.close();
}
context.close();
}

@Test
public void doesntProvideStorageComponent_whenStorageTypeNotElasticsearch() {
context = new AnnotationConfigApplicationContext();
TestPropertyValues.of("zipkin.storage.type:cassandra").applyTo(context);
Access.registerElasticsearchHttp(context);
context.refresh();
Expand All @@ -60,7 +57,6 @@ public void doesntProvideStorageComponent_whenStorageTypeNotElasticsearch() {

@Test
public void providesStorageComponent_whenStorageTypeElasticsearchAndHostsAreUrls() {
context = new AnnotationConfigApplicationContext();
TestPropertyValues.of(
"zipkin.storage.type:elasticsearch",
"zipkin.storage.elasticsearch.hosts:http://host1:9200")
Expand All @@ -73,7 +69,6 @@ public void providesStorageComponent_whenStorageTypeElasticsearchAndHostsAreUrls

@Test
public void canOverridesProperty_hostsWithList() {
context = new AnnotationConfigApplicationContext();
TestPropertyValues.of(
"zipkin.storage.type:elasticsearch",
"zipkin.storage.elasticsearch.hosts:http://host1:9200,http://host2:9200")
Expand All @@ -87,7 +82,6 @@ public void canOverridesProperty_hostsWithList() {

@Test
public void configuresPipeline() {
context = new AnnotationConfigApplicationContext();
TestPropertyValues.of(
"zipkin.storage.type:elasticsearch",
"zipkin.storage.elasticsearch.hosts:http://host1:9200",
Expand All @@ -101,7 +95,6 @@ public void configuresPipeline() {

@Test
public void configuresMaxRequests() {
context = new AnnotationConfigApplicationContext();
TestPropertyValues.of(
"zipkin.storage.type:elasticsearch",
"zipkin.storage.elasticsearch.hosts:http://host1:9200",
Expand All @@ -116,7 +109,6 @@ public void configuresMaxRequests() {
/** This helps ensure old setups don't break (provided they have http port 9200 open) */
@Test
public void coersesPort9300To9200() {
context = new AnnotationConfigApplicationContext();
TestPropertyValues.of(
"zipkin.storage.type:elasticsearch",
"zipkin.storage.elasticsearch.hosts:host1:9300")
Expand All @@ -129,7 +121,6 @@ public void coersesPort9300To9200() {

@Test
public void httpPrefixOptional() {
context = new AnnotationConfigApplicationContext();
TestPropertyValues.of(
"zipkin.storage.type:elasticsearch",
"zipkin.storage.elasticsearch.hosts:host1:9200")
Expand All @@ -142,7 +133,6 @@ public void httpPrefixOptional() {

@Test
public void defaultsToPort9200() {
context = new AnnotationConfigApplicationContext();
TestPropertyValues.of(
"zipkin.storage.type:elasticsearch",
"zipkin.storage.elasticsearch.hosts:host1")
Expand Down Expand Up @@ -175,7 +165,6 @@ Interceptor two() {
/** Ensures we can wire up network interceptors, such as for logging or authentication */
@Test
public void usesInterceptorsQualifiedWith_zipkinElasticsearchHttp() {
context = new AnnotationConfigApplicationContext();
TestPropertyValues.of(
"zipkin.storage.type:elasticsearch",
"zipkin.storage.elasticsearch.hosts:host1:9200")
Expand All @@ -190,7 +179,6 @@ public void usesInterceptorsQualifiedWith_zipkinElasticsearchHttp() {

@Test
public void timeout_defaultsTo10Seconds() {
context = new AnnotationConfigApplicationContext();
TestPropertyValues.of(
"zipkin.storage.type:elasticsearch",
"zipkin.storage.elasticsearch.hosts:host1:9200")
Expand All @@ -206,7 +194,6 @@ public void timeout_defaultsTo10Seconds() {

@Test
public void timeout_override() {
context = new AnnotationConfigApplicationContext();
int timeout = 30_000;
TestPropertyValues.of(
"zipkin.storage.type:elasticsearch",
Expand All @@ -224,7 +211,6 @@ public void timeout_override() {

@Test
public void strictTraceId_defaultsToTrue() {
context = new AnnotationConfigApplicationContext();
TestPropertyValues.of(
"zipkin.storage.type:elasticsearch",
"zipkin.storage.elasticsearch.hosts:http://host1:9200")
Expand All @@ -236,7 +222,6 @@ public void strictTraceId_defaultsToTrue() {

@Test
public void strictTraceId_canSetToFalse() {
context = new AnnotationConfigApplicationContext();
TestPropertyValues.of(
"zipkin.storage.type:elasticsearch",
"zipkin.storage.elasticsearch.hosts:http://host1:9200",
Expand All @@ -250,7 +235,6 @@ public void strictTraceId_canSetToFalse() {

@Test
public void dailyIndexFormat() {
context = new AnnotationConfigApplicationContext();
TestPropertyValues.of(
"zipkin.storage.type:elasticsearch",
"zipkin.storage.elasticsearch.hosts:http://host1:9200")
Expand All @@ -259,12 +243,11 @@ public void dailyIndexFormat() {
context.refresh();

assertThat(es().indexNameFormatter().formatTypeAndTimestamp("span", 0))
.isEqualTo("zipkin:span-1970-01-01");
.isEqualTo("zipkin*span-1970-01-01");
}

@Test
public void dailyIndexFormat_overridingPrefix() {
context = new AnnotationConfigApplicationContext();
TestPropertyValues.of(
"zipkin.storage.type:elasticsearch",
"zipkin.storage.elasticsearch.hosts:http://host1:9200",
Expand All @@ -274,12 +257,11 @@ public void dailyIndexFormat_overridingPrefix() {
context.refresh();

assertThat(es().indexNameFormatter().formatTypeAndTimestamp("span", 0))
.isEqualTo("zipkin_prod:span-1970-01-01");
.isEqualTo("zipkin_prod*span-1970-01-01");
}

@Test
public void dailyIndexFormat_overridingDateSeparator() {
context = new AnnotationConfigApplicationContext();
TestPropertyValues.of(
"zipkin.storage.type:elasticsearch",
"zipkin.storage.elasticsearch.hosts:http://host1:9200",
Expand All @@ -289,12 +271,11 @@ public void dailyIndexFormat_overridingDateSeparator() {
context.refresh();

assertThat(es().indexNameFormatter().formatTypeAndTimestamp("span", 0))
.isEqualTo("zipkin:span-1970.01.01");
.isEqualTo("zipkin*span-1970.01.01");
}

@Test
public void dailyIndexFormat_overridingDateSeparator_empty() {
context = new AnnotationConfigApplicationContext();
TestPropertyValues.of(
"zipkin.storage.type:elasticsearch",
"zipkin.storage.elasticsearch.hosts:http://host1:9200",
Expand All @@ -304,12 +285,11 @@ public void dailyIndexFormat_overridingDateSeparator_empty() {
context.refresh();

assertThat(es().indexNameFormatter().formatTypeAndTimestamp("span", 0))
.isEqualTo("zipkin:span-19700101");
.isEqualTo("zipkin*span-19700101");
}

@Test
public void dailyIndexFormat_overridingDateSeparator_invalidToBeMultiChar() {
context = new AnnotationConfigApplicationContext();
TestPropertyValues.of(
"zipkin.storage.type:elasticsearch",
"zipkin.storage.elasticsearch.hosts:http://host1:9200",
Expand All @@ -323,7 +303,6 @@ public void dailyIndexFormat_overridingDateSeparator_invalidToBeMultiChar() {

@Test
public void namesLookbackAssignedFromQueryLookback() {
context = new AnnotationConfigApplicationContext();
TestPropertyValues.of(
"zipkin.storage.type:elasticsearch",
"zipkin.storage.elasticsearch.hosts:http://host1:9200",
Expand All @@ -337,7 +316,6 @@ public void namesLookbackAssignedFromQueryLookback() {

@Test
public void doesntProvideBasicAuthInterceptor_whenBasicAuthUserNameandPasswordNotConfigured() {
context = new AnnotationConfigApplicationContext();
TestPropertyValues.of(
"zipkin.storage.type:elasticsearch",
"zipkin.storage.elasticsearch.hosts:http://host1:9200")
Expand All @@ -351,7 +329,6 @@ public void doesntProvideBasicAuthInterceptor_whenBasicAuthUserNameandPasswordNo

@Test
public void providesBasicAuthInterceptor_whenBasicAuthUserNameAndPasswordConfigured() {
context = new AnnotationConfigApplicationContext();
TestPropertyValues.of(
"zipkin.storage.type:elasticsearch",
"zipkin.storage.elasticsearch.hosts:http://host1:9200",
Expand All @@ -368,7 +345,6 @@ public void providesBasicAuthInterceptor_whenBasicAuthUserNameAndPasswordConfigu

@Test
public void searchEnabled_false() {
context = new AnnotationConfigApplicationContext();
TestPropertyValues.of(
"zipkin.storage.type:elasticsearch",
"zipkin.storage.search-enabled:false")
Expand All @@ -381,7 +357,6 @@ public void searchEnabled_false() {

@Test
public void autocompleteKeys_list() {
context = new AnnotationConfigApplicationContext();
TestPropertyValues.of(
"zipkin.storage.type:elasticsearch",
"zipkin.storage.autocomplete-keys:environment")
Expand All @@ -395,7 +370,6 @@ public void autocompleteKeys_list() {

@Test
public void autocompleteTtl() {
context = new AnnotationConfigApplicationContext();
TestPropertyValues.of(
"zipkin.storage.type:elasticsearch",
"zipkin.storage.autocomplete-ttl:60000")
Expand All @@ -409,7 +383,6 @@ public void autocompleteTtl() {

@Test
public void autocompleteCardinality() {
context = new AnnotationConfigApplicationContext();
TestPropertyValues.of(
"zipkin.storage.type:elasticsearch",
"zipkin.storage.autocomplete-cardinality:5000")
Expand Down
15 changes: 8 additions & 7 deletions zipkin-storage/elasticsearch/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
This is is a plugin to the Elasticsearch storage component, which uses
HTTP by way of [OkHttp 3](https://github.com/square/okttp) and
[Moshi](https://github.com/square/moshi). This currently supports 2.x,
5.x and 6.x version families.
5.x, 6.x and 7.x version families.

## Multiple hosts
Most users will supply a DNS name that's mapped to multiple A or AAAA
Expand Down Expand Up @@ -33,7 +33,8 @@ spans. This is mapped to the Elasticsearch date type, so can be used to any date

## Indexes
Spans are stored into daily indices, for example spans with a timestamp
falling on 2016/03/19 will be stored in the index named 'zipkin:span-2016-03-19'.
falling on 2016/03/19 will be stored in the index named 'zipkin:span-2016-03-19'
or 'zipkin-span-2016-03-19' if using Elasticsearch version 7 or higher.
There is no support for TTL through this SpanStore. It is recommended
instead to use [Elastic Curator](https://www.elastic.co/guide/en/elasticsearch/client/curator/current/about.html)
to remove indices older than the point you are interested in.
Expand All @@ -45,9 +46,9 @@ the date separator from '-' to something else.
`ElasticsearchStorage.Builder.index` and `ElasticsearchStorage.Builder.dateSeparator`
control the daily index format.

For example, spans with a timestamp falling on 2016/03/19 end up in the
index 'zipkin:span-2016-03-19'. When the date separator is '.', the index
would be 'zipkin:span-2016.03.19'.
For example, using Elasticsearch 7+, spans with a timestamp falling on
2016/03/19 end up in the index 'zipkin-span-2016-03-19'. When the date
separator is '.', the index would be 'zipkin-span-2016.03.19'.

### String Mapping
The Zipkin api implies aggregation and exact match (keyword) on string
Expand All @@ -63,7 +64,7 @@ The values in `q` are limited to 256 characters and searched as keywords.

You can check these manually like so:
```bash
$ curl -s localhost:9200/zipkin:span-2017-08-11/_search?q=_q:error=500
$ curl -s 'localhost:9200/zipkin*span-2017-08-11/_search?q=_q:error=500'
```

The reason for special casing is around dotted name constraints. Tags
Expand Down Expand Up @@ -103,7 +104,7 @@ your indexes:

```bash
# the output below shows which tokens will match on the trace id supplied.
$ curl -s localhost:9200/zipkin:span-2017-08-22/_analyze -d '{
$ curl -s 'localhost:9200/zipkin*span-2017-08-22/_analyze' -d '{
"text": "48485a3953bb61246b221d5bc9e6496c",
"analyzer": "traceId_analyzer"
}'|jq '.tokens|.[]|.token'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import zipkin2.elasticsearch.internal.HttpBulkIndexer;
import zipkin2.elasticsearch.internal.IndexNameFormatter;
import zipkin2.internal.DelayLimiter;
import zipkin2.internal.Nullable;
import zipkin2.storage.SpanConsumer;

import static zipkin2.elasticsearch.ElasticsearchAutocompleteTags.AUTOCOMPLETE;
Expand All @@ -50,19 +51,26 @@ class ElasticsearchSpanConsumer implements SpanConsumer { // not final for testi
final ElasticsearchStorage es;
final Set<String> autocompleteKeys;
final IndexNameFormatter indexNameFormatter;
final char indexTypeDelimiter;
final boolean searchEnabled;
final DelayLimiter<AutocompleteContext> delayLimiter;

ElasticsearchSpanConsumer(ElasticsearchStorage es) {
this.es = es;
this.autocompleteKeys = new LinkedHashSet<>(es.autocompleteKeys());
this.indexNameFormatter = es.indexNameFormatter();
this.indexTypeDelimiter = es.indexTypeDelimiter();
this.searchEnabled = es.searchEnabled();
this.delayLimiter = DelayLimiter.newBuilder()
.ttl(es.autocompleteTtl())
.cardinality(es.autocompleteCardinality()).build();
}

String formatTypeAndTimestampForInsert(String type, long timestampMillis) {
return indexNameFormatter.formatTypeAndTimestampForInsert(type, indexTypeDelimiter,
timestampMillis);
}

@Override public Call<Void> accept(List<Span> spans) {
if (spans.isEmpty()) return Call.create(null);
BulkSpanIndexer indexer = new BulkSpanIndexer(this);
Expand Down Expand Up @@ -104,16 +112,15 @@ static final class BulkSpanIndexer {
}

void add(long indexTimestamp, Span span, long timestampMillis) {
String index = consumer.indexNameFormatter
.formatTypeAndTimestamp(SPAN, indexTimestamp);
String index = consumer.formatTypeAndTimestampForInsert(SPAN, indexTimestamp);
byte[] document = consumer.searchEnabled
? prefixWithTimestampMillisAndQuery(span, timestampMillis)
: SpanBytesEncoder.JSON_V2.encode(span);
indexer.add(index, SPAN, document, null /* Allow ES to choose an ID */);
}

void addAutocompleteValues(long indexTimestamp, Span span) {
String idx = consumer.indexNameFormatter.formatTypeAndTimestamp(AUTOCOMPLETE, indexTimestamp);
String idx = consumer.formatTypeAndTimestampForInsert(AUTOCOMPLETE, indexTimestamp);
for (Map.Entry<String, String> tag : span.tags().entrySet()) {
int length = tag.getKey().length() + tag.getValue().length() + 1;
if (length > INDEX_CHARS_LIMIT) continue;
Expand Down
Loading

0 comments on commit 762a795

Please sign in to comment.