diff --git a/zipkin-autoconfigure/storage-elasticsearch-http/src/test/java/zipkin/storage/elasticsearch/http/ZipkinElasticsearchHttpStorageAutoConfigurationTest.java b/zipkin-autoconfigure/storage-elasticsearch-http/src/test/java/zipkin/storage/elasticsearch/http/ZipkinElasticsearchHttpStorageAutoConfigurationTest.java
index d7e6fb55eb5..112c731e86d 100644
--- a/zipkin-autoconfigure/storage-elasticsearch-http/src/test/java/zipkin/storage/elasticsearch/http/ZipkinElasticsearchHttpStorageAutoConfigurationTest.java
+++ b/zipkin-autoconfigure/storage-elasticsearch-http/src/test/java/zipkin/storage/elasticsearch/http/ZipkinElasticsearchHttpStorageAutoConfigurationTest.java
@@ -34,6 +34,7 @@
import static org.assertj.core.api.Assertions.assertThat;
import static org.springframework.boot.test.util.EnvironmentTestUtils.addEnvironment;
+import static zipkin.storage.elasticsearch.http.ElasticsearchHttpSpanStore.SPAN;
public class ZipkinElasticsearchHttpStorageAutoConfigurationTest {
@@ -246,8 +247,8 @@ public void dailyIndexFormat() {
ZipkinElasticsearchHttpStorageAutoConfiguration.class);
context.refresh();
- assertThat(es().indexNameFormatter().indexNameForTimestamp(0))
- .isEqualTo("zipkin-1970-01-01");
+ assertThat(es().indexNameFormatter().formatTypeAndTimestamp(SPAN, 0))
+ .isEqualTo("zipkin:span-1970-01-01");
}
@Test
@@ -262,8 +263,8 @@ public void dailyIndexFormat_overridingPrefix() {
ZipkinElasticsearchHttpStorageAutoConfiguration.class);
context.refresh();
- assertThat(es().indexNameFormatter().indexNameForTimestamp(0))
- .isEqualTo("zipkin_prod-1970-01-01");
+ assertThat(es().indexNameFormatter().formatTypeAndTimestamp(SPAN, 0))
+ .isEqualTo("zipkin_prod:span-1970-01-01");
}
@Test
@@ -278,8 +279,8 @@ public void dailyIndexFormat_overridingDateSeparator() {
ZipkinElasticsearchHttpStorageAutoConfiguration.class);
context.refresh();
- assertThat(es().indexNameFormatter().indexNameForTimestamp(0))
- .isEqualTo("zipkin-1970.01.01");
+ assertThat(es().indexNameFormatter().formatTypeAndTimestamp(SPAN, 0))
+ .isEqualTo("zipkin:span-1970.01.01");
}
@Test
diff --git a/zipkin-storage/cassandra3/src/test/java/zipkin/storage/cassandra3/integration/CassandraTest.java b/zipkin-storage/cassandra3/src/test/java/zipkin/storage/cassandra3/integration/CassandraTest.java
index c731e97befd..0339aaaa059 100644
--- a/zipkin-storage/cassandra3/src/test/java/zipkin/storage/cassandra3/integration/CassandraTest.java
+++ b/zipkin-storage/cassandra3/src/test/java/zipkin/storage/cassandra3/integration/CassandraTest.java
@@ -30,7 +30,7 @@ public class CassandraTest {
@ClassRule
public static LazyCassandra3Storage storage =
- new LazyCassandra3Storage("cassandra:3.10", "test_zipkin3");
+ new LazyCassandra3Storage("openzipkin/zipkin-cassandra:1.29.1", "test_zipkin3");
public static class DependenciesTest extends CassandraDependenciesTest {
@Override protected Cassandra3Storage storage() {
diff --git a/zipkin-storage/elasticsearch-http/README.md b/zipkin-storage/elasticsearch-http/README.md
index 3815f4b1490..58ebe8e51ce 100644
--- a/zipkin-storage/elasticsearch-http/README.md
+++ b/zipkin-storage/elasticsearch-http/README.md
@@ -2,8 +2,8 @@
This is is a plugin to the Elasticsearch storage component, which uses
HTTP by way of [OkHttp 3](https://github.com/square/okttp) and
-[Moshi](https://github.com/square/moshi). This currently supports both
-2.x and 5.x version families.
+[Moshi](https://github.com/square/moshi). This currently supports 2.x
+through 6.x version families.
## Multiple hosts
Most users will supply a DNS name that's mapped to multiple A or AAAA
@@ -23,7 +23,7 @@ Here are some examples:
## Indexes
Spans are stored into daily indices, for example spans with a timestamp
-falling on 2016/03/19 will be stored in the index named 'zipkin-2016-03-19'.
+falling on 2016/03/19 will be stored in the index named 'zipkin:span-2016-03-19'.
There is no support for TTL through this SpanStore. It is recommended
instead to use [Elastic Curator](https://www.elastic.co/guide/en/elasticsearch/client/curator/current/about.html)
to remove indices older than the point you are interested in.
@@ -36,8 +36,8 @@ the date separator from '-' to something else.
control the daily index format.
For example, spans with a timestamp falling on 2016/03/19 end up in the
-index 'zipkin-2016-03-19'. When the date separator is '.', the index
-would be 'zipkin-2016.03.19'.
+index 'zipkin:span-2016-03-19'. When the date separator is '.', the index
+would be 'zipkin:span-2016.03.19'.
### String Mapping
The Zipkin api implies aggregation and exact match (keyword) on string
@@ -82,7 +82,7 @@ your indexes:
```bash
# the output below shows which tokens will match on the trace id supplied.
-$ curl -s localhost:9200/test_zipkin_http-2016-10-26/_analyze -d '{
+$ curl -s localhost:9200/test_zipkin_http:span-2016-10-26/_analyze -d '{
"text": "48485a3953bb61246b221d5bc9e6496c",
"analyzer": "traceId_analyzer"
}'|jq '.tokens|.[]|.token'
@@ -90,46 +90,6 @@ $ curl -s localhost:9200/test_zipkin_http-2016-10-26/_analyze -d '{
"6b221d5bc9e6496c"
```
-### Span and service Names
-Zipkin defines span and service names as lowercase. At write time, any
-mixed case span or service names are downcased. If writing a custom
-collector in a different language, make sure you write span and service
-names in lowercase. Also, if there are any custom query tools, ensure
-inputs are downcased.
-
-Span and service name queries default to look back 24hrs (2 index days).
-This can be controlled by `ElasticsearchHttpStorage.Builder.namesLookback`
-
-#### Index format
-Starting with Zipkin 1.23, service and span names are written to the
-same daily indexes as spans and dependency links as the document type
-"servicespan". This was added for performance reasons as formerly search
-was using relatively expensive nested queries.
-
-The documents themselves represent service and span name pairs. Only one
-document is present per daily index. This is to keep the documents from
-repeating at a multiplier of span count, which also simplifies query.
-This deduplication is enforced at write time by using an ID convention
-of the service and span name. Ex. `id = MyServiceName|MySpanName`
-
-The document is a simple structure, like:
-```json
-{
- "serviceName": "MyServiceName",
- "spanName": "MySpanName",
-}
-```
-
-The document does replicate data in the ID, but it is needed as you
-cannot query based on an ID expression.
-
-#### Notes for data written prior to Zipkin 1.23
-Before Zipkin 1.23, service and span names were nested queries against
-the span type. This was an expensive operation, which resulted in high
-latency particularly when the UI loads. When the "servicespan" type is
-missing from an index, or there's no results returned, a fallback nested
-query is invoked.
-
## Customizing the ingest pipeline
When using Elasticsearch 5.x, you can setup an [ingest pipeline](https://www.elastic.co/guide/en/elasticsearch/reference/master/pipeline.html)
diff --git a/zipkin-storage/elasticsearch-http/pom.xml b/zipkin-storage/elasticsearch-http/pom.xml
index 90a626ff0c0..cbe5bf1b2c7 100644
--- a/zipkin-storage/elasticsearch-http/pom.xml
+++ b/zipkin-storage/elasticsearch-http/pom.xml
@@ -92,5 +92,19 @@
testcontainerstest
+
+
+ org.mockito
+ mockito-core
+ ${mockito.version}
+ test
+
+
+
+ org.hamcrest
+ hamcrest-core
+ ${hamcrest.version}
+ test
+
diff --git a/zipkin-storage/elasticsearch-http/src/main/java/zipkin/storage/elasticsearch/http/ElasticsearchHttpSpanConsumer.java b/zipkin-storage/elasticsearch-http/src/main/java/zipkin/storage/elasticsearch/http/ElasticsearchHttpSpanConsumer.java
index 301a8fe6fcf..4649dc1e9da 100644
--- a/zipkin-storage/elasticsearch-http/src/main/java/zipkin/storage/elasticsearch/http/ElasticsearchHttpSpanConsumer.java
+++ b/zipkin-storage/elasticsearch-http/src/main/java/zipkin/storage/elasticsearch/http/ElasticsearchHttpSpanConsumer.java
@@ -13,25 +13,21 @@
*/
package zipkin.storage.elasticsearch.http;
-import com.squareup.moshi.JsonWriter;
import java.io.IOException;
-import java.util.LinkedHashMap;
-import java.util.LinkedHashSet;
import java.util.List;
-import java.util.Map;
-import java.util.Set;
import java.util.concurrent.TimeUnit;
-import okio.Buffer;
-import zipkin.Codec;
import zipkin.Span;
-import zipkin.internal.Pair;
+import zipkin.internal.Nullable;
+import zipkin.internal.Span2;
+import zipkin.internal.Span2Codec;
+import zipkin.internal.Span2Converter;
import zipkin.storage.AsyncSpanConsumer;
import zipkin.storage.Callback;
import static zipkin.internal.ApplyTimestampAndDuration.guessTimestamp;
import static zipkin.internal.Util.UTF_8;
import static zipkin.internal.Util.propagateIfFatal;
-import static zipkin.storage.elasticsearch.http.ElasticsearchHttpSpanStore.SERVICE_SPAN;
+import static zipkin.storage.elasticsearch.http.ElasticsearchHttpSpanStore.SPAN;
class ElasticsearchHttpSpanConsumer implements AsyncSpanConsumer { // not final for testing
@@ -49,11 +45,8 @@ class ElasticsearchHttpSpanConsumer implements AsyncSpanConsumer { // not final
return;
}
try {
- HttpBulkIndexer indexer = new HttpBulkIndexer("index-span", es);
- Map>> indexToServiceSpans = indexSpans(indexer, spans);
- if (!indexToServiceSpans.isEmpty()) {
- indexNames(indexer, indexToServiceSpans);
- }
+ BulkSpanIndexer indexer = newBulkSpanIndexer(es);
+ indexSpans(indexer, spans);
indexer.execute(callback);
} catch (Throwable t) {
propagateIfFatal(t);
@@ -61,64 +54,55 @@ class ElasticsearchHttpSpanConsumer implements AsyncSpanConsumer { // not final
}
}
- /** Indexes spans and returns a mapping of indexes that may need a names update */
- Map>> indexSpans(HttpBulkIndexer indexer, List spans) {
- Map>> indexToServiceSpans = new LinkedHashMap<>();
+ void indexSpans(BulkSpanIndexer indexer, List spans) throws IOException {
for (Span span : spans) {
Long timestamp = guessTimestamp(span);
- Long timestampMillis;
- String index; // which index to store this span into
+ long indexTimestamp = 0L; // which index to store this span into
+ Long spanTimestamp;
if (timestamp != null) {
- timestampMillis = TimeUnit.MICROSECONDS.toMillis(timestamp);
- index = indexNameFormatter.indexNameForTimestamp(timestampMillis);
+ indexTimestamp = spanTimestamp = TimeUnit.MICROSECONDS.toMillis(timestamp);
} else {
- timestampMillis = null;
+ spanTimestamp = null;
// guessTimestamp is made for determining the span's authoritative timestamp. When choosing
// the index bucket, any annotation is better than using current time.
- Long indexTimestamp = null;
for (int i = 0, length = span.annotations.size(); i < length; i++) {
indexTimestamp = span.annotations.get(i).timestamp / 1000;
break;
}
- if (indexTimestamp == null) indexTimestamp = System.currentTimeMillis();
- index = indexNameFormatter.indexNameForTimestamp(indexTimestamp);
+ if (indexTimestamp == 0L) indexTimestamp = System.currentTimeMillis();
}
- if (!span.name.isEmpty()) putServiceSpans(indexToServiceSpans, index, span);
- byte[] document = Codec.JSON.writeSpan(span);
- if (timestampMillis != null) document = prefixWithTimestampMillis(document, timestampMillis);
- indexer.add(index, ElasticsearchHttpSpanStore.SPAN, document, null /* Allow ES to choose an ID */);
+ indexer.add(indexTimestamp, span, spanTimestamp);
}
- return indexToServiceSpans;
}
- void putServiceSpans(Map>> indexToServiceSpans, String index, Span s) {
- Set> serviceSpans = indexToServiceSpans.get(index);
- if (serviceSpans == null) indexToServiceSpans.put(index, serviceSpans = new LinkedHashSet<>());
- for (String serviceName : s.serviceNames()) {
- serviceSpans.add(Pair.create(serviceName, s.name));
- }
+
+ BulkSpanIndexer newBulkSpanIndexer(ElasticsearchHttpStorage es) {
+ return new BulkSpanIndexer(es);
}
- /**
- * Adds service and span names to the pending batch. The id is "serviceName|spanName" to prevent
- * a large order of duplicates ending up in the daily index. This also means queries do not need
- * to deduplicate.
- */
- void indexNames(HttpBulkIndexer indexer, Map>> indexToServiceSpans)
- throws IOException {
- Buffer buffer = new Buffer();
- for (Map.Entry>> entry : indexToServiceSpans.entrySet()) {
- String index = entry.getKey();
- for (Pair serviceSpan : entry.getValue()) {
- JsonWriter writer = JsonWriter.of(buffer);
- writer.beginObject();
- writer.name("serviceName").value(serviceSpan._1);
- writer.name("spanName").value(serviceSpan._2);
- writer.endObject();
- byte[] document = buffer.readByteArray();
- indexer.add(index, SERVICE_SPAN, document, serviceSpan._1 + "|" + serviceSpan._2);
+ static class BulkSpanIndexer {
+ final HttpBulkIndexer indexer;
+ final IndexNameFormatter indexNameFormatter;
+
+ BulkSpanIndexer(ElasticsearchHttpStorage es) {
+ this.indexer = new HttpBulkIndexer("index-span", es);
+ this.indexNameFormatter = es.indexNameFormatter();
+ }
+
+ void add(long indexTimestamp, Span span, @Nullable Long timestampMillis) {
+ String index = indexNameFormatter.formatTypeAndTimestamp(SPAN, indexTimestamp);
+ for (Span2 span2 : Span2Converter.fromSpan(span)) {
+ byte[] document = Span2Codec.JSON.writeSpan(span2);
+ if (timestampMillis != null) {
+ document = prefixWithTimestampMillis(document, timestampMillis);
+ }
+ indexer.add(index, SPAN, document, null /* Allow ES to choose an ID */);
}
}
+
+ void execute(Callback callback) throws IOException {
+ indexer.execute(callback);
+ }
}
private static final byte[] TIMESTAMP_MILLIS_PREFIX = "{\"timestamp_millis\":".getBytes(UTF_8);
diff --git a/zipkin-storage/elasticsearch-http/src/main/java/zipkin/storage/elasticsearch/http/ElasticsearchHttpSpanStore.java b/zipkin-storage/elasticsearch-http/src/main/java/zipkin/storage/elasticsearch/http/ElasticsearchHttpSpanStore.java
index 13e097e276d..c69946a7d2f 100644
--- a/zipkin-storage/elasticsearch-http/src/main/java/zipkin/storage/elasticsearch/http/ElasticsearchHttpSpanStore.java
+++ b/zipkin-storage/elasticsearch-http/src/main/java/zipkin/storage/elasticsearch/http/ElasticsearchHttpSpanStore.java
@@ -15,7 +15,6 @@
import java.util.Collections;
import java.util.Iterator;
-import java.util.LinkedHashMap;
import java.util.List;
import java.util.Locale;
import java.util.Map;
@@ -39,18 +38,17 @@
final class ElasticsearchHttpSpanStore implements AsyncSpanStore {
static final String SPAN = "span";
- static final String DEPENDENCY_LINK = "dependencylink";
- static final String SERVICE_SPAN = "servicespan";
+ static final String DEPENDENCY = "dependency";
final SearchCallFactory search;
- final String[] allIndices;
+ final String[] allSpanIndices;
final IndexNameFormatter indexNameFormatter;
final boolean strictTraceId;
final int namesLookback;
ElasticsearchHttpSpanStore(ElasticsearchHttpStorage es) {
this.search = new SearchCallFactory(es.http());
- this.allIndices = new String[] {es.indexNameFormatter().allIndices()};
+ this.allSpanIndices = new String[] {es.indexNameFormatter().formatType(SPAN)};
this.indexNameFormatter = es.indexNameFormatter();
this.strictTraceId = es.strictTraceId();
this.namesLookback = es.namesLookback();
@@ -63,10 +61,7 @@ final class ElasticsearchHttpSpanStore implements AsyncSpanStore {
SearchRequest.Filters filters = new SearchRequest.Filters();
filters.addRange("timestamp_millis", beginMillis, endMillis);
if (request.serviceName != null) {
- filters.addNestedTerms(asList(
- "annotations.endpoint.serviceName",
- "binaryAnnotations.endpoint.serviceName"
- ), request.serviceName);
+ filters.addTerm("localEndpoint.serviceName", request.serviceName);
}
if (request.spanName != null) {
@@ -74,28 +69,11 @@ final class ElasticsearchHttpSpanStore implements AsyncSpanStore {
}
for (String annotation : request.annotations) {
- Map annotationValues = new LinkedHashMap<>();
- annotationValues.put("annotations.value", annotation);
- Map binaryAnnotationKeys = new LinkedHashMap<>();
- binaryAnnotationKeys.put("binaryAnnotations.key", annotation);
- if (request.serviceName != null) {
- annotationValues.put("annotations.endpoint.serviceName", request.serviceName);
- binaryAnnotationKeys.put("binaryAnnotations.endpoint.serviceName", request.serviceName);
- }
- filters.addNestedTerms(annotationValues, binaryAnnotationKeys);
+ filters.should().addTerm("annotations.value", annotation).addExists("tags." + annotation);
}
for (Map.Entry kv : request.binaryAnnotations.entrySet()) {
- // In our index template, we make sure the binaryAnnotation value is indexed as string,
- // meaning non-string values won't even be indexed at all. This means that we can only
- // match string values here, which happens to be exactly what we want.
- Map nestedTerms = new LinkedHashMap<>();
- nestedTerms.put("binaryAnnotations.key", kv.getKey());
- nestedTerms.put("binaryAnnotations.value", kv.getValue());
- if (request.serviceName != null) {
- nestedTerms.put("binaryAnnotations.endpoint.serviceName", request.serviceName);
- }
- filters.addNestedTerms(nestedTerms);
+ filters.addTerm("tags." + kv.getKey(), kv.getValue());
}
if (request.minDuration != null) {
@@ -113,8 +91,8 @@ final class ElasticsearchHttpSpanStore implements AsyncSpanStore {
.addSubAggregation(Aggregation.min("timestamp_millis"))
.orderBy("timestamp_millis", "desc");
- List indices = indexNameFormatter.indexNamePatternsForRange(beginMillis, endMillis);
- SearchRequest esRequest = SearchRequest.forIndicesAndType(indices, SPAN)
+ List indices = indexNameFormatter.formatTypeAndRange(SPAN, beginMillis, endMillis);
+ SearchRequest esRequest = SearchRequest.create(indices)
.filters(filters).addAggregation(traceIdTimestamp);
HttpCall> traceIdsCall = search.newCall(esRequest, BodyConverters.SORTED_KEYS);
@@ -146,8 +124,7 @@ final class ElasticsearchHttpSpanStore implements AsyncSpanStore {
callback.onSuccess(Collections.emptyList());
return;
}
- SearchRequest request = SearchRequest.forIndicesAndType(indices, SPAN)
- .terms("traceId", traceIds);
+ SearchRequest request = SearchRequest.create(indices).terms("traceId", traceIds);
search.newCall(request, BodyConverters.SPANS).submit(successCallback);
}
@@ -182,39 +159,26 @@ final class ElasticsearchHttpSpanStore implements AsyncSpanStore {
public void getRawTrace(long traceIdHigh, long traceIdLow, Callback> callback) {
String traceIdHex = Util.toLowerHex(strictTraceId ? traceIdHigh : 0L, traceIdLow);
- SearchRequest request = SearchRequest.forIndicesAndType(asList(allIndices), SPAN)
+ SearchRequest request = SearchRequest.create(asList(allSpanIndices))
.term("traceId", traceIdHex);
search.newCall(request, BodyConverters.NULLABLE_SPANS).submit(callback);
}
@Override public void getServiceNames(Callback> callback) {
- long endMillis = System.currentTimeMillis();
- long beginMillis = endMillis - namesLookback;
-
- List indices = indexNameFormatter.indexNamePatternsForRange(beginMillis, endMillis);
- SearchRequest request = SearchRequest.forIndicesAndType(indices, SERVICE_SPAN)
- .addAggregation(Aggregation.terms("serviceName", Integer.MAX_VALUE));
-
- search.newCall(request, BodyConverters.SORTED_KEYS).submit(new Callback>() {
- @Override public void onSuccess(List value) {
- if (!value.isEmpty()) callback.onSuccess(value);
-
- // Special cased code until sites update their collectors. What this does is do a more
- // expensive nested query to get service names when the servicespan type returns nothing.
- SearchRequest.Filters filters = new SearchRequest.Filters();
- filters.addRange("timestamp_millis", beginMillis, endMillis);
- SearchRequest request = SearchRequest.forIndicesAndType(indices, SPAN)
- .filters(filters)
- .addAggregation(Aggregation.nestedTerms("annotations.endpoint.serviceName"))
- .addAggregation(Aggregation.nestedTerms("binaryAnnotations.endpoint.serviceName"));
- search.newCall(request, BodyConverters.SORTED_KEYS).submit(callback);
- }
+ long endMillis = System.currentTimeMillis();
+ long beginMillis = endMillis - namesLookback;
- @Override public void onError(Throwable t) {
- callback.onError(t);
- }
- });
+ List indices = indexNameFormatter.formatTypeAndRange(SPAN, beginMillis, endMillis);
+ // Service name queries include both local and remote endpoints. This is different than
+ // Span name, as a span name can only be on a local endpoint.
+ SearchRequest.Filters filters = new SearchRequest.Filters();
+ filters.addRange("timestamp_millis", beginMillis, endMillis);
+ SearchRequest request = SearchRequest.create(indices)
+ .filters(filters)
+ .addAggregation(Aggregation.terms("localEndpoint.serviceName", Integer.MAX_VALUE))
+ .addAggregation(Aggregation.terms("remoteEndpoint.serviceName", Integer.MAX_VALUE));
+ search.newCall(request, BodyConverters.SORTED_KEYS).submit(callback);
}
@Override public void getSpanNames(String serviceName, Callback> callback) {
@@ -223,37 +187,20 @@ public void getRawTrace(long traceIdHigh, long traceIdLow, Callback>
return;
}
- long endMillis = System.currentTimeMillis();
- long beginMillis = endMillis - namesLookback;
-
- List indices = indexNameFormatter.indexNamePatternsForRange(beginMillis, endMillis);
-
- SearchRequest request = SearchRequest.forIndicesAndType(indices, SERVICE_SPAN)
- .term("serviceName", serviceName.toLowerCase(Locale.ROOT))
- .addAggregation(Aggregation.terms("spanName", Integer.MAX_VALUE));
-
- search.newCall(request, BodyConverters.SORTED_KEYS).submit(new Callback>() {
- @Override public void onSuccess(List value) {
- if (!value.isEmpty()) callback.onSuccess(value);
-
- // Special cased code until sites update their collectors. What this does is do a more
- // expensive nested query to get span names when the servicespan type returns nothing.
- SearchRequest.Filters filters = new SearchRequest.Filters();
- filters.addRange("timestamp_millis", beginMillis, endMillis);
- filters.addNestedTerms(asList(
- "annotations.endpoint.serviceName",
- "binaryAnnotations.endpoint.serviceName"
- ), serviceName.toLowerCase(Locale.ROOT));
- SearchRequest request = SearchRequest.forIndicesAndType(indices, SPAN)
- .filters(filters)
- .addAggregation(Aggregation.terms("name", Integer.MAX_VALUE));
- search.newCall(request, BodyConverters.SORTED_KEYS).submit(callback);
- }
+ long endMillis = System.currentTimeMillis();
+ long beginMillis = endMillis - namesLookback;
- @Override public void onError(Throwable t) {
- callback.onError(t);
- }
- });
+ List indices = indexNameFormatter.formatTypeAndRange(SPAN, beginMillis, endMillis);
+
+ // A span name is only valid on a local endpoint, as a span name is defined locally
+ SearchRequest.Filters filters = new SearchRequest.Filters()
+ .addRange("timestamp_millis", beginMillis, endMillis)
+ .addTerm("localEndpoint.serviceName", serviceName.toLowerCase(Locale.ROOT));
+
+ SearchRequest request = SearchRequest.create(indices)
+ .filters(filters)
+ .addAggregation(Aggregation.terms("name", Integer.MAX_VALUE));
+ search.newCall(request, BodyConverters.SORTED_KEYS).submit(callback);
}
@Override public void getDependencies(long endTs, @Nullable Long lookback,
@@ -262,13 +209,7 @@ public void getRawTrace(long traceIdHigh, long traceIdLow, Callback>
long beginMillis = lookback != null ? endTs - lookback : 0;
// We just return all dependencies in the days that fall within endTs and lookback as
// dependency links themselves don't have timestamps.
- List indices = indexNameFormatter.indexNamePatternsForRange(beginMillis, endTs);
- getDependencies(indices, callback);
- }
-
- void getDependencies(List indices, Callback> callback) {
- SearchRequest request = SearchRequest.forIndicesAndType(indices, DEPENDENCY_LINK);
-
- search.newCall(request, BodyConverters.DEPENDENCY_LINKS).submit(callback);
+ List indices = indexNameFormatter.formatTypeAndRange(DEPENDENCY, beginMillis, endTs);
+ search.newCall(SearchRequest.create(indices), BodyConverters.DEPENDENCY_LINKS).submit(callback);
}
}
diff --git a/zipkin-storage/elasticsearch-http/src/main/java/zipkin/storage/elasticsearch/http/ElasticsearchHttpStorage.java b/zipkin-storage/elasticsearch-http/src/main/java/zipkin/storage/elasticsearch/http/ElasticsearchHttpStorage.java
index bea932014e1..30e5c3fbcee 100644
--- a/zipkin-storage/elasticsearch-http/src/main/java/zipkin/storage/elasticsearch/http/ElasticsearchHttpStorage.java
+++ b/zipkin-storage/elasticsearch-http/src/main/java/zipkin/storage/elasticsearch/http/ElasticsearchHttpStorage.java
@@ -18,7 +18,9 @@
import com.squareup.moshi.JsonReader;
import java.io.IOException;
import java.util.Collections;
+import java.util.LinkedHashSet;
import java.util.List;
+import java.util.Set;
import okhttp3.HttpUrl;
import okhttp3.MediaType;
import okhttp3.OkHttpClient;
@@ -31,10 +33,14 @@
import zipkin.storage.SpanStore;
import zipkin.storage.StorageAdapters;
import zipkin.storage.StorageComponent;
+import zipkin.storage.elasticsearch.http.internal.LenientDoubleCallbackAsyncSpanStore;
import zipkin.storage.elasticsearch.http.internal.client.HttpCall;
+import static zipkin.internal.Util.checkArgument;
import static zipkin.internal.Util.checkNotNull;
import static zipkin.moshi.JsonReaders.enterPath;
+import static zipkin.storage.elasticsearch.http.ElasticsearchHttpSpanStore.DEPENDENCY;
+import static zipkin.storage.elasticsearch.http.ElasticsearchHttpSpanStore.SPAN;
@AutoValue
public abstract class ElasticsearchHttpStorage implements StorageComponent {
@@ -61,7 +67,10 @@ public static Builder builder(OkHttpClient client) {
.indexReplicas(1)
.namesLookback(86400000)
.shutdownClientOnClose(false)
- .flushOnWrites(false);
+ .flushOnWrites(false)
+ .singleTypeIndexingEnabled(
+ Boolean.valueOf(System.getenv("ES_EXPERIMENTAL_SPAN2"))
+ );
}
public static Builder builder() {
@@ -133,7 +142,8 @@ public final Builder index(String index) {
* The date separator to use when generating daily index names. Defaults to '-'.
*
*
By default, spans with a timestamp falling on 2016/03/19 end up in the index
- * 'zipkin-2016-03-19'. When the date separator is '.', the index would be 'zipkin-2016.03.19'.
+ * 'zipkin:span-2016-03-19'. When the date separator is '.', the index would be
+ * 'zipkin:span-2016.03.19'.
*/
public final Builder dateSeparator(char dateSeparator) {
indexNameFormatterBuilder().dateSeparator(dateSeparator);
@@ -161,6 +171,9 @@ public final Builder dateSeparator(char dateSeparator) {
*/
public abstract Builder indexReplicas(int indexReplicas);
+ /** intentionally hidden for now */
+ abstract Builder singleTypeIndexingEnabled(boolean singleTypeIndexingEnabled);
+
@Override public abstract Builder strictTraceId(boolean strictTraceId);
@Override public abstract ElasticsearchHttpStorage build();
@@ -193,25 +206,45 @@ public final Builder dateSeparator(char dateSeparator) {
abstract int namesLookback();
+ abstract boolean singleTypeIndexingEnabled();
+
@Override public SpanStore spanStore() {
return StorageAdapters.asyncToBlocking(asyncSpanStore());
}
- @Override
- public AsyncSpanStore asyncSpanStore() {
- ensureIndexTemplate();
- return new ElasticsearchHttpSpanStore(this);
+ @Override public AsyncSpanStore asyncSpanStore() {
+ float version = ensureIndexTemplates().version();
+ if (version >= 6) { // then multi-type (legacy) index isn't possible
+ return new ElasticsearchHttpSpanStore(this);
+ } else if (version < 2.4 || !singleTypeIndexingEnabled()) { // don't fan out queries unnecessarily
+ return new LegacyElasticsearchHttpSpanStore(this);
+ } else { // fan out queries as we don't know if old legacy collectors are in use
+ return new LenientDoubleCallbackAsyncSpanStore(
+ new ElasticsearchHttpSpanStore(this),
+ new LegacyElasticsearchHttpSpanStore(this)
+ );
+ }
}
- @Override
- public AsyncSpanConsumer asyncSpanConsumer() {
- ensureIndexTemplate();
- return new ElasticsearchHttpSpanConsumer(this);
+ @Override public AsyncSpanConsumer asyncSpanConsumer() {
+ // We only write once, so we detect which approach we should take
+ if (shouldUseSingleTypeIndexing(ensureIndexTemplates())) {
+ return new ElasticsearchHttpSpanConsumer(this);
+ } else {
+ return new LegacyElasticsearchHttpSpanConsumer(this);
+ }
}
/** This is a blocking call, only used in tests. */
void clear() throws IOException {
- clear(indexNameFormatter().allIndices());
+ Set toClear = new LinkedHashSet<>();
+ if (shouldUseSingleTypeIndexing(ensureIndexTemplates())) {
+ toClear.add(indexNameFormatter().formatType(SPAN));
+ toClear.add(indexNameFormatter().formatType(DEPENDENCY));
+ } else {
+ toClear.add(indexNameFormatter().formatType(null));
+ }
+ for (String index : toClear) clear(index);
}
void clear(String index) throws IOException {
@@ -236,7 +269,7 @@ static void flush(HttpCall.Factory factory, String index) throws IOException {
/** This is blocking so that we can determine if the cluster is healthy or not */
@Override public CheckResult check() {
- return ensureClusterReady(indexNameFormatter().allIndices());
+ return ensureClusterReady(indexNameFormatter().formatType(SPAN));
}
CheckResult ensureClusterReady(String index) {
@@ -261,15 +294,26 @@ CheckResult ensureClusterReady(String index) {
}
}
- @Memoized // since there's a network call required to get the version
- String indexTemplate() {
- return new VersionSpecificTemplate(this).get(http());
+ @Memoized // since we don't want overlapping calls to apply the index templates
+ IndexTemplates ensureIndexTemplates() {
+ String index = indexNameFormatter().index();
+ IndexTemplates templates = new VersionSpecificTemplates(this).get(http());
+ if (shouldUseSingleTypeIndexing(templates)) {
+ EnsureIndexTemplate.apply(http(), index + ":" + SPAN + "_template", templates.span());
+ EnsureIndexTemplate.apply(http(), index + ":" + DEPENDENCY + "_template",
+ templates.dependency());
+ } else { // TODO: remove when we stop writing span1 format
+ checkArgument(templates.legacy() != null,
+ "multiple type template is null: version=%s, singleTypeIndexingEnabled=%s",
+ templates.version(), singleTypeIndexingEnabled());
+ EnsureIndexTemplate.apply(http(), index + "_template", templates.legacy());
+ }
+ return templates;
}
- @Memoized // since we don't want overlapping calls to apply the index template
- boolean ensureIndexTemplate() {
- EnsureIndexTemplate.apply(http(), indexNameFormatter().index() + "_template", indexTemplate());
- return true; // as Memoized cannot return void
+ private boolean shouldUseSingleTypeIndexing(IndexTemplates templates) {
+ return (templates.span() != null && singleTypeIndexingEnabled())
+ || templates.version() >= 6;
}
@Memoized // hosts resolution might imply a network call, and we might make a new okhttp instance
diff --git a/zipkin-storage/elasticsearch-http/src/main/java/zipkin/storage/elasticsearch/http/HttpBulkIndexer.java b/zipkin-storage/elasticsearch-http/src/main/java/zipkin/storage/elasticsearch/http/HttpBulkIndexer.java
index 969166aa412..90d7810cef2 100644
--- a/zipkin-storage/elasticsearch-http/src/main/java/zipkin/storage/elasticsearch/http/HttpBulkIndexer.java
+++ b/zipkin-storage/elasticsearch-http/src/main/java/zipkin/storage/elasticsearch/http/HttpBulkIndexer.java
@@ -55,6 +55,7 @@ void add(String index, String typeName, byte[] document, @Nullable String id) {
void writeIndexMetadata(String index, String typeName, @Nullable String id) {
if (flushOnWrites) indices.add(index);
body.writeUtf8("{\"index\":{\"_index\":\"").writeUtf8(index).writeByte('"');
+ // the _type parameter is needed for Elasticsearch <6.x
body.writeUtf8(",\"_type\":\"").writeUtf8(typeName).writeByte('"');
if (id != null) {
body.writeUtf8(",\"_id\":\"").writeUtf8(JsonCodec.escape(id)).writeByte('"');
@@ -77,6 +78,10 @@ void execute(Callback callback) {
.post(RequestBody.create(APPLICATION_JSON, body.readByteString())).build();
http.newCall(request, b -> {
+ String content = b.readUtf8();
+ if (content.indexOf("\"errors\":true") != -1) {
+ throw new IllegalStateException(content);
+ }
if (indices.isEmpty()) return null;
ElasticsearchHttpStorage.flush(http, join(indices));
return null;
diff --git a/zipkin-storage/elasticsearch-http/src/main/java/zipkin/storage/elasticsearch/http/IndexNameFormatter.java b/zipkin-storage/elasticsearch-http/src/main/java/zipkin/storage/elasticsearch/http/IndexNameFormatter.java
index 4ae9bb25396..a6a3197b413 100644
--- a/zipkin-storage/elasticsearch-http/src/main/java/zipkin/storage/elasticsearch/http/IndexNameFormatter.java
+++ b/zipkin-storage/elasticsearch-http/src/main/java/zipkin/storage/elasticsearch/http/IndexNameFormatter.java
@@ -23,6 +23,7 @@
import java.util.GregorianCalendar;
import java.util.List;
import java.util.TimeZone;
+import zipkin.internal.Nullable;
import zipkin.internal.Util;
@AutoValue
@@ -72,13 +73,14 @@ final IndexNameFormatter build() {
*
For example, if {@code beginMillis} is 2016-11-30 and {@code endMillis} is 2017-01-02, the
* result will be 2016-11-30, 2016-12-*, 2017-01-01 and 2017-01-02.
*/
- List indexNamePatternsForRange(long beginMillis, long endMillis) {
+ List formatTypeAndRange(@Nullable String type, long beginMillis, long endMillis) {
GregorianCalendar current = midnightUTC(beginMillis);
GregorianCalendar end = midnightUTC(endMillis);
if (current.equals(end)) {
- return Collections.singletonList(indexNameForTimestamp(current.getTimeInMillis()));
+ return Collections.singletonList(formatTypeAndTimestamp(type, current.getTimeInMillis()));
}
+ String prefix = prefix(type);
List indices = new ArrayList<>();
while (current.compareTo(end) <= 0) {
if (current.get(Calendar.MONTH) == 0 && current.get(Calendar.DATE) == 1) {
@@ -86,7 +88,7 @@ List indexNamePatternsForRange(long beginMillis, long endMillis) {
current.set(Calendar.DAY_OF_YEAR, current.getActualMaximum(Calendar.DAY_OF_YEAR));
if (current.compareTo(end) <= 0) {
indices.add(
- String.format("%s-%s%c*", index(), current.get(Calendar.YEAR), dateSeparator()));
+ String.format("%s-%s%c*", prefix, current.get(Calendar.YEAR), dateSeparator()));
current.add(Calendar.DATE, 1); // rollover to next year
continue;
} else {
@@ -96,7 +98,7 @@ List indexNamePatternsForRange(long beginMillis, long endMillis) {
// attempt to compress a month
current.set(Calendar.DATE, current.getActualMaximum(Calendar.DATE));
if (current.compareTo(end) <= 0) {
- indices.add(String.format("%s-%s%c%02d%c*", index(),
+ indices.add(String.format("%s-%s%c%02d%c*", prefix,
current.get(Calendar.YEAR), dateSeparator(),
current.get(Calendar.MONTH) + 1, dateSeparator()
));
@@ -106,7 +108,7 @@ List indexNamePatternsForRange(long beginMillis, long endMillis) {
current.set(Calendar.DATE, 1); // rollback to first of the month
}
}
- indices.add(indexNameForTimestamp(current.getTimeInMillis()));
+ indices.add(formatTypeAndTimestamp(type, current.getTimeInMillis()));
current.add(Calendar.DATE, 1);
}
return indices;
@@ -118,8 +120,12 @@ static GregorianCalendar midnightUTC(long epochMillis) {
return result;
}
- String indexNameForTimestamp(long timestampMillis) {
- return index() + "-" + dateFormat().get().format(new Date(timestampMillis));
+ String formatTypeAndTimestamp(@Nullable String type, long timestampMillis) {
+ return prefix(type) + "-" + dateFormat().get().format(new Date(timestampMillis));
+ }
+
+ private String prefix(@Nullable String type) {
+ return type != null ? index() + ":" + type : index();
}
// for testing
@@ -131,7 +137,7 @@ long parseDate(String timestamp) {
}
}
- String allIndices() {
- return index() + "-*";
+ String formatType(@Nullable String type) {
+ return prefix(type) + "-*";
}
}
diff --git a/zipkin-storage/elasticsearch-http/src/main/java/zipkin/storage/elasticsearch/http/IndexTemplates.java b/zipkin-storage/elasticsearch-http/src/main/java/zipkin/storage/elasticsearch/http/IndexTemplates.java
new file mode 100644
index 00000000000..dede06feac7
--- /dev/null
+++ b/zipkin-storage/elasticsearch-http/src/main/java/zipkin/storage/elasticsearch/http/IndexTemplates.java
@@ -0,0 +1,47 @@
+/**
+ * Copyright 2015-2017 The OpenZipkin Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
+ * in compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package zipkin.storage.elasticsearch.http;
+
+import com.google.auto.value.AutoValue;
+import zipkin.internal.Nullable;
+
+@AutoValue
+abstract class IndexTemplates {
+ static Builder builder() {
+ return new AutoValue_IndexTemplates.Builder();
+ }
+
+ abstract float version();
+
+ /** null when multi-type indexes are not supported */
+ @Nullable abstract String legacy();
+
+ /** null when dots in field names are not supported */
+ @Nullable abstract String span();
+
+ /** null when dots in field names are not supported */
+ @Nullable abstract String dependency();
+
+ @AutoValue.Builder interface Builder {
+ Builder version(float version);
+
+ Builder legacy(@Nullable String legacy);
+
+ Builder span(String span);
+
+ Builder dependency(String dependency);
+
+ IndexTemplates build();
+ }
+}
diff --git a/zipkin-storage/elasticsearch-http/src/main/java/zipkin/storage/elasticsearch/http/JsonAdapters.java b/zipkin-storage/elasticsearch-http/src/main/java/zipkin/storage/elasticsearch/http/JsonAdapters.java
index ecf77867aee..4a46d1df02f 100644
--- a/zipkin-storage/elasticsearch-http/src/main/java/zipkin/storage/elasticsearch/http/JsonAdapters.java
+++ b/zipkin-storage/elasticsearch-http/src/main/java/zipkin/storage/elasticsearch/http/JsonAdapters.java
@@ -25,6 +25,8 @@
import zipkin.DependencyLink;
import zipkin.Endpoint;
import zipkin.Span;
+import zipkin.internal.Span2;
+import zipkin.internal.Span2Converter;
import zipkin.internal.Util;
import static zipkin.internal.Util.UTF_8;
@@ -38,7 +40,7 @@ final class JsonAdapters {
static final JsonAdapter SPAN_ADAPTER = new JsonAdapter() {
@Override
public Span fromJson(JsonReader reader) throws IOException {
- Span.Builder result = Span.builder();
+ Span2.Builder result = Span2.builder();
reader.beginObject();
while (reader.hasNext()) {
String nextName = reader.nextName();
@@ -48,20 +50,19 @@ public Span fromJson(JsonReader reader) throws IOException {
}
switch (nextName) {
case "traceId":
- String traceId = reader.nextString();
- if (traceId.length() == 32) {
- result.traceIdHigh(lowerHexToUnsignedLong(traceId, 0));
- }
- result.traceId(lowerHexToUnsignedLong(traceId));
+ result.traceId(reader.nextString());
break;
- case "name":
- result.name(reader.nextString());
+ case "parentId":
+ result.parentId(reader.nextString());
break;
case "id":
- result.id(Util.lowerHexToUnsignedLong(reader.nextString()));
+ result.id(reader.nextString());
break;
- case "parentId":
- result.parentId(Util.lowerHexToUnsignedLong(reader.nextString()));
+ case "kind":
+ result.kind(Span2.Kind.valueOf(reader.nextString()));
+ break;
+ case "name":
+ result.name(reader.nextString());
break;
case "timestamp":
result.timestamp(reader.nextLong());
@@ -69,29 +70,39 @@ public Span fromJson(JsonReader reader) throws IOException {
case "duration":
result.duration(reader.nextLong());
break;
+ case "localEndpoint":
+ result.localEndpoint(ENDPOINT_ADAPTER.fromJson(reader));
+ break;
+ case "remoteEndpoint":
+ result.remoteEndpoint(ENDPOINT_ADAPTER.fromJson(reader));
+ break;
case "annotations":
reader.beginArray();
while (reader.hasNext()) {
- result.addAnnotation(ANNOTATION_ADAPTER.fromJson(reader));
+ Annotation a = ANNOTATION_ADAPTER.fromJson(reader);
+ result.addAnnotation(a.timestamp, a.value);
}
reader.endArray();
break;
- case "binaryAnnotations":
- reader.beginArray();
+ case "tags":
+ reader.beginObject();
while (reader.hasNext()) {
- result.addBinaryAnnotation(BINARY_ANNOTATION_ADAPTER.fromJson(reader));
+ result.putTag(reader.nextName(), reader.nextString());
}
- reader.endArray();
+ reader.endObject();
break;
case "debug":
result.debug(reader.nextBoolean());
break;
+ case "shared":
+ result.shared(reader.nextBoolean());
+ break;
default:
reader.skipValue();
}
}
reader.endObject();
- return result.build();
+ return Span2Converter.toSpan(result.build());
}
@Override
diff --git a/zipkin-storage/elasticsearch-http/src/main/java/zipkin/storage/elasticsearch/http/LegacyElasticsearchHttpSpanConsumer.java b/zipkin-storage/elasticsearch-http/src/main/java/zipkin/storage/elasticsearch/http/LegacyElasticsearchHttpSpanConsumer.java
new file mode 100644
index 00000000000..126d66361e3
--- /dev/null
+++ b/zipkin-storage/elasticsearch-http/src/main/java/zipkin/storage/elasticsearch/http/LegacyElasticsearchHttpSpanConsumer.java
@@ -0,0 +1,100 @@
+/**
+ * Copyright 2015-2017 The OpenZipkin Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
+ * in compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package zipkin.storage.elasticsearch.http;
+
+import com.squareup.moshi.JsonWriter;
+import java.io.IOException;
+import java.util.LinkedHashMap;
+import java.util.LinkedHashSet;
+import java.util.Map;
+import java.util.Set;
+import okio.Buffer;
+import zipkin.Codec;
+import zipkin.Span;
+import zipkin.internal.Nullable;
+import zipkin.internal.Pair;
+import zipkin.storage.Callback;
+
+import static zipkin.storage.elasticsearch.http.ElasticsearchHttpSpanStore.SPAN;
+
+/**
+ * This is the legacy implementation of our span consumer, which notably uses multi-type indexing
+ * and the span v1 model. Multi-type indexing isn't supported on Elasticsearch 6.x. Moreover, the
+ * span v1 model needs nested queries to access service names. This is expensive and requires a
+ * separate type "servicespan" to make performant.
+ */
+// TODO: remove when we stop writing span1 format
+class LegacyElasticsearchHttpSpanConsumer extends ElasticsearchHttpSpanConsumer {
+
+ LegacyElasticsearchHttpSpanConsumer(ElasticsearchHttpStorage es) {
+ super(es);
+ }
+
+ MultiTypeBulkSpanIndexer newBulkSpanIndexer(ElasticsearchHttpStorage es) {
+ return new MultiTypeBulkSpanIndexer(es);
+ }
+
+ static class MultiTypeBulkSpanIndexer extends BulkSpanIndexer {
+ Map>> indexToServiceSpans = new LinkedHashMap<>();
+
+ MultiTypeBulkSpanIndexer(ElasticsearchHttpStorage es) {
+ super(es);
+ }
+
+ @Override void add(long indexTimestamp, Span span, @Nullable Long spanTimestamp) {
+ String type = null; // multi-type index: span isn't a parameter to the index name
+ String index = indexNameFormatter.formatTypeAndTimestamp(type, indexTimestamp);
+ if (!span.name.isEmpty()) putServiceSpans(indexToServiceSpans, index, span);
+ byte[] document = Codec.JSON.writeSpan(span);
+ if (spanTimestamp != null) document = prefixWithTimestampMillis(document, spanTimestamp);
+ indexer.add(index, SPAN, document, null /* Allow ES to choose an ID */);
+ }
+
+ void putServiceSpans(Map>> indexToServiceSpans, String index, Span s) {
+ Set> serviceSpans = indexToServiceSpans.get(index);
+ if (serviceSpans == null) {
+ indexToServiceSpans.put(index, serviceSpans = new LinkedHashSet<>());
+ }
+ for (String serviceName : s.serviceNames()) {
+ serviceSpans.add(Pair.create(serviceName, s.name));
+ }
+ }
+
+ /**
+ * Adds service and span names to the pending batch. The id is "serviceName|spanName" to prevent
+ * a large order of duplicates ending up in the daily index. This also means queries do not need
+ * to deduplicate.
+ */
+ @Override void execute(Callback callback) throws IOException {
+ if (indexToServiceSpans.isEmpty()) {
+ indexer.execute(callback);
+ return;
+ }
+ Buffer buffer = new Buffer();
+ for (Map.Entry>> entry : indexToServiceSpans.entrySet()) {
+ String index = entry.getKey();
+ for (Pair serviceSpan : entry.getValue()) {
+ JsonWriter writer = JsonWriter.of(buffer);
+ writer.beginObject();
+ writer.name("serviceName").value(serviceSpan._1);
+ writer.name("spanName").value(serviceSpan._2);
+ writer.endObject();
+ byte[] document = buffer.readByteArray();
+ indexer.add(index, "servicespan", document, serviceSpan._1 + "|" + serviceSpan._2);
+ }
+ }
+ indexer.execute(callback);
+ }
+ }
+}
diff --git a/zipkin-storage/elasticsearch-http/src/main/java/zipkin/storage/elasticsearch/http/LegacyElasticsearchHttpSpanStore.java b/zipkin-storage/elasticsearch-http/src/main/java/zipkin/storage/elasticsearch/http/LegacyElasticsearchHttpSpanStore.java
new file mode 100644
index 00000000000..daa36d13767
--- /dev/null
+++ b/zipkin-storage/elasticsearch-http/src/main/java/zipkin/storage/elasticsearch/http/LegacyElasticsearchHttpSpanStore.java
@@ -0,0 +1,276 @@
+/**
+ * Copyright 2015-2017 The OpenZipkin Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
+ * in compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package zipkin.storage.elasticsearch.http;
+
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import zipkin.DependencyLink;
+import zipkin.Span;
+import zipkin.internal.CorrectForClockSkew;
+import zipkin.internal.GroupByTraceId;
+import zipkin.internal.MergeById;
+import zipkin.internal.Nullable;
+import zipkin.internal.Util;
+import zipkin.storage.AsyncSpanStore;
+import zipkin.storage.Callback;
+import zipkin.storage.QueryRequest;
+import zipkin.storage.elasticsearch.http.internal.client.Aggregation;
+import zipkin.storage.elasticsearch.http.internal.client.HttpCall;
+import zipkin.storage.elasticsearch.http.internal.client.HttpCall.BodyConverter;
+import zipkin.storage.elasticsearch.http.internal.client.SearchCallFactory;
+import zipkin.storage.elasticsearch.http.internal.client.SearchRequest;
+import zipkin.storage.elasticsearch.http.internal.client.SearchResultConverter;
+
+import static java.util.Arrays.asList;
+
+
+final class LegacyElasticsearchHttpSpanStore implements AsyncSpanStore {
+ static final String SPAN = "span";
+ static final String DEPENDENCY_LINK = "dependencylink";
+ static final String SERVICE_SPAN = "servicespan";
+ static final BodyConverter> SPANS =
+ SearchResultConverter.create(LegacyJsonAdapters.SPAN_ADAPTER);
+ static final BodyConverter> NULLABLE_SPANS =
+ SearchResultConverter.create(LegacyJsonAdapters.SPAN_ADAPTER).defaultToNull();
+
+ final SearchCallFactory search;
+ final String[] allIndices;
+ final IndexNameFormatter indexNameFormatter;
+ final boolean strictTraceId;
+ final int namesLookback;
+
+ LegacyElasticsearchHttpSpanStore(ElasticsearchHttpStorage es) {
+ this.search = new SearchCallFactory(es.http());
+ this.allIndices = new String[] {es.indexNameFormatter().formatType(null)};
+ this.indexNameFormatter = es.indexNameFormatter();
+ this.strictTraceId = es.strictTraceId();
+ this.namesLookback = es.namesLookback();
+ }
+
+ @Override public void getTraces(QueryRequest request, Callback>> callback) {
+ long beginMillis = request.endTs - request.lookback;
+ long endMillis = request.endTs;
+
+ SearchRequest.Filters filters = new SearchRequest.Filters();
+ filters.addRange("timestamp_millis", beginMillis, endMillis);
+ if (request.serviceName != null) {
+ filters.addNestedTerms(asList(
+ "annotations.endpoint.serviceName",
+ "binaryAnnotations.endpoint.serviceName"
+ ), request.serviceName);
+ }
+
+ if (request.spanName != null) {
+ filters.addTerm("name", request.spanName);
+ }
+
+ for (String annotation : request.annotations) {
+ Map annotationValues = new LinkedHashMap<>();
+ annotationValues.put("annotations.value", annotation);
+ Map binaryAnnotationKeys = new LinkedHashMap<>();
+ binaryAnnotationKeys.put("binaryAnnotations.key", annotation);
+ if (request.serviceName != null) {
+ annotationValues.put("annotations.endpoint.serviceName", request.serviceName);
+ binaryAnnotationKeys.put("binaryAnnotations.endpoint.serviceName", request.serviceName);
+ }
+ filters.addNestedTerms(annotationValues, binaryAnnotationKeys);
+ }
+
+ for (Map.Entry kv : request.binaryAnnotations.entrySet()) {
+ // In our index template, we make sure the binaryAnnotation value is indexed as string,
+ // meaning non-string values won't even be indexed at all. This means that we can only
+ // match string values here, which happens to be exactly what we want.
+ Map nestedTerms = new LinkedHashMap<>();
+ nestedTerms.put("binaryAnnotations.key", kv.getKey());
+ nestedTerms.put("binaryAnnotations.value", kv.getValue());
+ if (request.serviceName != null) {
+ nestedTerms.put("binaryAnnotations.endpoint.serviceName", request.serviceName);
+ }
+ filters.addNestedTerms(nestedTerms);
+ }
+
+ if (request.minDuration != null) {
+ filters.addRange("duration", request.minDuration, request.maxDuration);
+ }
+
+ // We need to filter to traces that contain at least one span that matches the request,
+ // but the zipkin API is supposed to order traces by first span, regardless of if it was
+ // filtered or not. This is not possible without either multiple, heavyweight queries
+ // or complex multiple indexing, defeating much of the elegance of using elasticsearch for this.
+ // So we fudge and order on the first span among the filtered spans - in practice, there should
+ // be no significant difference in user experience since span start times are usually very
+ // close to each other in human time.
+ Aggregation traceIdTimestamp = Aggregation.terms("traceId", request.limit)
+ .addSubAggregation(Aggregation.min("timestamp_millis"))
+ .orderBy("timestamp_millis", "desc");
+
+ List indices = indexNameFormatter.formatTypeAndRange(null, beginMillis, endMillis);
+ SearchRequest esRequest = SearchRequest.create(indices, SPAN)
+ .filters(filters).addAggregation(traceIdTimestamp);
+
+ HttpCall> traceIdsCall = search.newCall(esRequest, BodyConverters.SORTED_KEYS);
+
+ // When we receive span results, we need to group them by trace ID
+ Callback> successCallback = new Callback>() {
+ @Override public void onSuccess(List input) {
+ List> traces = GroupByTraceId.apply(input, strictTraceId, true);
+
+ // Due to tokenization of the trace ID, our matches are imprecise on Span.traceIdHigh
+ for (Iterator> trace = traces.iterator(); trace.hasNext(); ) {
+ List next = trace.next();
+ if (next.get(0).traceIdHigh != 0 && !request.test(next)) {
+ trace.remove();
+ }
+ }
+ callback.onSuccess(traces);
+ }
+
+ @Override public void onError(Throwable t) {
+ callback.onError(t);
+ }
+ };
+
+ // Fire off the query to get spans once we have trace ids
+ traceIdsCall.submit(new Callback>() {
+ @Override public void onSuccess(@Nullable List traceIds) {
+ if (traceIds == null || traceIds.isEmpty()) {
+ callback.onSuccess(Collections.emptyList());
+ return;
+ }
+ SearchRequest request = SearchRequest.create(indices, SPAN).terms("traceId", traceIds);
+ search.newCall(request, SPANS).submit(successCallback);
+ }
+
+ @Override public void onError(Throwable t) {
+ callback.onError(t);
+ }
+ });
+ }
+
+ @Override public void getTrace(long id, Callback> callback) {
+ getTrace(0L, id, callback);
+ }
+
+ @Override public void getTrace(long traceIdHigh, long traceIdLow, Callback> callback) {
+ getRawTrace(traceIdHigh, traceIdLow, new Callback>() {
+ @Override public void onSuccess(@Nullable List value) {
+ List result = CorrectForClockSkew.apply(MergeById.apply(value));
+ callback.onSuccess(result.isEmpty() ? null : result);
+ }
+
+ @Override public void onError(Throwable t) {
+ callback.onError(t);
+ }
+ });
+ }
+
+ @Override public void getRawTrace(long traceId, Callback> callback) {
+ getRawTrace(0L, traceId, callback);
+ }
+
+ @Override
+ public void getRawTrace(long traceIdHigh, long traceIdLow, Callback> callback) {
+ String traceIdHex = Util.toLowerHex(strictTraceId ? traceIdHigh : 0L, traceIdLow);
+
+ SearchRequest request = SearchRequest.create(asList(allIndices), SPAN)
+ .term("traceId", traceIdHex);
+
+ search.newCall(request, NULLABLE_SPANS).submit(callback);
+ }
+
+ @Override public void getServiceNames(Callback> callback) {
+ long endMillis = System.currentTimeMillis();
+ long beginMillis = endMillis - namesLookback;
+
+ List indices = indexNameFormatter.formatTypeAndRange(null, beginMillis, endMillis);
+ SearchRequest request = SearchRequest.create(indices, SERVICE_SPAN)
+ .addAggregation(Aggregation.terms("serviceName", Integer.MAX_VALUE));
+
+ search.newCall(request, BodyConverters.SORTED_KEYS).submit(new Callback>() {
+ @Override public void onSuccess(List value) {
+ if (!value.isEmpty()) callback.onSuccess(value);
+
+ // Special cased code until sites update their collectors. What this does is do a more
+ // expensive nested query to get service names when the servicespan type returns nothing.
+ SearchRequest.Filters filters = new SearchRequest.Filters();
+ filters.addRange("timestamp_millis", beginMillis, endMillis);
+ SearchRequest request = SearchRequest.create(indices, SPAN).filters(filters)
+ .addAggregation(Aggregation.nestedTerms("annotations.endpoint.serviceName"))
+ .addAggregation(Aggregation.nestedTerms("binaryAnnotations.endpoint.serviceName"));
+ search.newCall(request, BodyConverters.SORTED_KEYS).submit(callback);
+ }
+
+ @Override public void onError(Throwable t) {
+ callback.onError(t);
+ }
+ });
+ }
+
+ @Override public void getSpanNames(String serviceName, Callback> callback) {
+ if (serviceName == null || "".equals(serviceName)) {
+ callback.onSuccess(Collections.emptyList());
+ return;
+ }
+
+ long endMillis = System.currentTimeMillis();
+ long beginMillis = endMillis - namesLookback;
+
+ List indices = indexNameFormatter.formatTypeAndRange(null, beginMillis, endMillis);
+ SearchRequest request = SearchRequest.create(indices, SERVICE_SPAN)
+ .term("serviceName", serviceName.toLowerCase(Locale.ROOT))
+ .addAggregation(Aggregation.terms("spanName", Integer.MAX_VALUE));
+
+ search.newCall(request, BodyConverters.SORTED_KEYS).submit(new Callback>() {
+ @Override public void onSuccess(List value) {
+ if (!value.isEmpty()) callback.onSuccess(value);
+
+ // Special cased code until sites update their collectors. What this does is do a more
+ // expensive nested query to get span names when the servicespan type returns nothing.
+ SearchRequest.Filters filters = new SearchRequest.Filters();
+ filters.addRange("timestamp_millis", beginMillis, endMillis);
+ filters.addNestedTerms(asList(
+ "annotations.endpoint.serviceName",
+ "binaryAnnotations.endpoint.serviceName"
+ ), serviceName.toLowerCase(Locale.ROOT));
+ SearchRequest request = SearchRequest.create(indices, SPAN).filters(filters)
+ .addAggregation(Aggregation.terms("name", Integer.MAX_VALUE));
+ search.newCall(request, BodyConverters.SORTED_KEYS).submit(callback);
+ }
+
+ @Override public void onError(Throwable t) {
+ callback.onError(t);
+ }
+ });
+ }
+
+ @Override public void getDependencies(long endTs, @Nullable Long lookback,
+ Callback> callback) {
+
+ long beginMillis = lookback != null ? endTs - lookback : 0;
+ // We just return all dependencies in the days that fall within endTs and lookback as
+ // dependency links themselves don't have timestamps.
+ List indices = indexNameFormatter.formatTypeAndRange(null, beginMillis, endTs);
+ getDependencies(indices, callback);
+ }
+
+ void getDependencies(List indices, Callback> callback) {
+ SearchRequest request = SearchRequest.create(indices, DEPENDENCY_LINK);
+
+ search.newCall(request, BodyConverters.DEPENDENCY_LINKS).submit(callback);
+ }
+}
diff --git a/zipkin-storage/elasticsearch-http/src/main/java/zipkin/storage/elasticsearch/http/LegacyJsonAdapters.java b/zipkin-storage/elasticsearch-http/src/main/java/zipkin/storage/elasticsearch/http/LegacyJsonAdapters.java
new file mode 100644
index 00000000000..c3168e82b44
--- /dev/null
+++ b/zipkin-storage/elasticsearch-http/src/main/java/zipkin/storage/elasticsearch/http/LegacyJsonAdapters.java
@@ -0,0 +1,101 @@
+/**
+ * Copyright 2015-2017 The OpenZipkin Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
+ * in compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package zipkin.storage.elasticsearch.http;
+
+import com.squareup.moshi.JsonAdapter;
+import com.squareup.moshi.JsonDataException;
+import com.squareup.moshi.JsonReader;
+import com.squareup.moshi.JsonWriter;
+import java.io.IOException;
+import okio.Buffer;
+import okio.ByteString;
+import zipkin.Annotation;
+import zipkin.BinaryAnnotation;
+import zipkin.DependencyLink;
+import zipkin.Endpoint;
+import zipkin.Span;
+import zipkin.internal.Span2;
+import zipkin.internal.Span2Converter;
+import zipkin.internal.Util;
+
+import static zipkin.internal.Util.UTF_8;
+import static zipkin.internal.Util.lowerHexToUnsignedLong;
+
+final class LegacyJsonAdapters {
+ static final JsonAdapter SPAN_ADAPTER = new JsonAdapter() {
+ @Override
+ public Span fromJson(JsonReader reader) throws IOException {
+ Span.Builder result = Span.builder();
+ reader.beginObject();
+ while (reader.hasNext()) {
+ String nextName = reader.nextName();
+ if (reader.peek() == JsonReader.Token.NULL) {
+ reader.skipValue();
+ continue;
+ }
+ switch (nextName) {
+ case "traceId":
+ String traceId = reader.nextString();
+ if (traceId.length() == 32) {
+ result.traceIdHigh(lowerHexToUnsignedLong(traceId, 0));
+ }
+ result.traceId(lowerHexToUnsignedLong(traceId));
+ break;
+ case "name":
+ result.name(reader.nextString());
+ break;
+ case "id":
+ result.id(Util.lowerHexToUnsignedLong(reader.nextString()));
+ break;
+ case "parentId":
+ result.parentId(Util.lowerHexToUnsignedLong(reader.nextString()));
+ break;
+ case "timestamp":
+ result.timestamp(reader.nextLong());
+ break;
+ case "duration":
+ result.duration(reader.nextLong());
+ break;
+ case "annotations":
+ reader.beginArray();
+ while (reader.hasNext()) {
+ result.addAnnotation(JsonAdapters.ANNOTATION_ADAPTER.fromJson(reader));
+ }
+ reader.endArray();
+ break;
+ case "binaryAnnotations":
+ reader.beginArray();
+ while (reader.hasNext()) {
+ result.addBinaryAnnotation(JsonAdapters.BINARY_ANNOTATION_ADAPTER.fromJson(reader));
+ }
+ reader.endArray();
+ break;
+ case "debug":
+ result.debug(reader.nextBoolean());
+ break;
+ default:
+ reader.skipValue();
+ }
+ }
+ reader.endObject();
+ return result.build();
+ }
+
+ @Override
+ public void toJson(JsonWriter writer, Span value) throws IOException {
+ throw new UnsupportedOperationException();
+ }
+ };
+}
+
diff --git a/zipkin-storage/elasticsearch-http/src/main/java/zipkin/storage/elasticsearch/http/VersionSpecificTemplate.java b/zipkin-storage/elasticsearch-http/src/main/java/zipkin/storage/elasticsearch/http/VersionSpecificTemplate.java
deleted file mode 100644
index 7c9c893a810..00000000000
--- a/zipkin-storage/elasticsearch-http/src/main/java/zipkin/storage/elasticsearch/http/VersionSpecificTemplate.java
+++ /dev/null
@@ -1,143 +0,0 @@
-/**
- * Copyright 2015-2017 The OpenZipkin Authors
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
- * in compliance with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License
- * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
- * or implied. See the License for the specific language governing permissions and limitations under
- * the License.
- */
-package zipkin.storage.elasticsearch.http;
-
-import com.squareup.moshi.JsonReader;
-import okhttp3.Request;
-import zipkin.storage.elasticsearch.http.internal.client.HttpCall;
-
-import static zipkin.moshi.JsonReaders.enterPath;
-
-/** Ensures the index template exists and saves off the version */
-final class VersionSpecificTemplate {
- final String indexTemplate;
-
- VersionSpecificTemplate(ElasticsearchHttpStorage es) {
- this.indexTemplate = INDEX_TEMPLATE
- .replace("${__INDEX__}", es.indexNameFormatter().index())
- .replace("${__NUMBER_OF_SHARDS__}", String.valueOf(es.indexShards()))
- .replace("${__NUMBER_OF_REPLICAS__}", String.valueOf(es.indexReplicas()))
- .replace("${__TRACE_ID_MAPPING__}", es.strictTraceId()
- ? "{ KEYWORD }" : "{ \"type\": \"string\", \"analyzer\": \"traceId_analyzer\" }");
- }
-
- /** Templatized due to version differences. Only fields used in search are declared */
- static final String INDEX_TEMPLATE = "{\n"
- + " \"template\": \"${__INDEX__}-*\",\n"
- + " \"settings\": {\n"
- + " \"index.number_of_shards\": ${__NUMBER_OF_SHARDS__},\n"
- + " \"index.number_of_replicas\": ${__NUMBER_OF_REPLICAS__},\n"
- + " \"index.requests.cache.enable\": true,\n"
- + " \"index.mapper.dynamic\": false,\n"
- + " \"analysis\": {\n"
- + " \"analyzer\": {\n"
- + " \"traceId_analyzer\": {\n"
- + " \"type\": \"custom\",\n"
- + " \"tokenizer\": \"keyword\",\n"
- + " \"filter\": \"traceId_filter\"\n"
- + " }\n"
- + " },\n"
- + " \"filter\": {\n"
- + " \"traceId_filter\": {\n"
- + " \"type\": \"pattern_capture\",\n"
- + " \"patterns\": [\"([0-9a-f]{1,16})$\"],\n"
- + " \"preserve_original\": true\n"
- + " }\n"
- + " }\n"
- + " }\n"
- + " },\n"
- + " \"mappings\": {\n"
- + " \"_default_\": {\n"
- + " \"_all\": {\n"
- + " \"enabled\": false\n"
- + " }\n"
- + " },\n"
- + " \"" + ElasticsearchHttpSpanStore.SPAN + "\": {\n"
- + " \"properties\": {\n"
- + " \"traceId\": ${__TRACE_ID_MAPPING__},\n"
- + " \"name\": { KEYWORD },\n"
- + " \"timestamp_millis\": {\n"
- + " \"type\": \"date\",\n"
- + " \"format\": \"epoch_millis\"\n"
- + " },\n"
- + " \"duration\": { \"type\": \"long\" },\n"
- + " \"annotations\": {\n"
- + " \"type\": \"nested\",\n"
- + " \"dynamic\": false,\n"
- + " \"properties\": {\n"
- + " \"value\": { KEYWORD },\n"
- + " \"endpoint\": {\n"
- + " \"type\": \"object\",\n"
- + " \"dynamic\": false,\n"
- + " \"properties\": { \"serviceName\": { KEYWORD } }\n"
- + " }\n"
- + " }\n"
- + " },\n"
- + " \"binaryAnnotations\": {\n"
- + " \"type\": \"nested\",\n"
- + " \"dynamic\": false,\n"
- + " \"properties\": {\n"
- + " \"key\": { KEYWORD },\n"
- + " \"value\": { KEYWORD },\n"
- + " \"endpoint\": {\n"
- + " \"type\": \"object\",\n"
- + " \"dynamic\": false,\n"
- + " \"properties\": { \"serviceName\": { KEYWORD } }\n"
- + " }\n"
- + " }\n"
- + " }\n"
- + " }\n"
- + " },\n"
- + " \"" + ElasticsearchHttpSpanStore.DEPENDENCY_LINK + "\": { \"enabled\": false },\n"
- + " \"" + ElasticsearchHttpSpanStore.SERVICE_SPAN + "\": {\n"
- + " \"properties\": {\n"
- + " \"serviceName\": { KEYWORD },\n"
- + " \"spanName\": { KEYWORD }\n"
- + " }\n"
- + " }\n"
- + " }\n"
- + "}";
-
- /** Returns a version-specific index template */
- String get(HttpCall.Factory callFactory) {
- String version = getVersion(callFactory);
- return versionSpecificTemplate(version);
- }
-
- static String getVersion(HttpCall.Factory callFactory) {
- Request getNode = new Request.Builder().url(callFactory.baseUrl).tag("get-node").build();
-
- return callFactory.execute(getNode, b -> {
- JsonReader version = enterPath(JsonReader.of(b), "version", "number");
- if (version == null) throw new IllegalStateException(".version.number not in response");
- return version.nextString();
- });
- }
-
- private String versionSpecificTemplate(String version) {
- if (version.startsWith("2")) {
- return indexTemplate
- .replace("KEYWORD",
- "\"type\": \"string\", \"ignore_above\": 256, \"norms\": {\"enabled\": false }, \"index\": \"not_analyzed\"");
- } else if (version.startsWith("5")) {
- return indexTemplate
- .replace("KEYWORD",
- "\"type\": \"keyword\", \"ignore_above\": 256, \"norms\": false")
- .replace("\"analyzer\": \"traceId_analyzer\" }",
- "\"fielddata\": \"true\", \"analyzer\": \"traceId_analyzer\" }");
- } else {
- throw new IllegalStateException("Elasticsearch 2.x and 5.x are supported, was: " + version);
- }
- }
-}
diff --git a/zipkin-storage/elasticsearch-http/src/main/java/zipkin/storage/elasticsearch/http/VersionSpecificTemplates.java b/zipkin-storage/elasticsearch-http/src/main/java/zipkin/storage/elasticsearch/http/VersionSpecificTemplates.java
new file mode 100644
index 00000000000..06e2ef21cfc
--- /dev/null
+++ b/zipkin-storage/elasticsearch-http/src/main/java/zipkin/storage/elasticsearch/http/VersionSpecificTemplates.java
@@ -0,0 +1,285 @@
+/**
+ * Copyright 2015-2017 The OpenZipkin Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
+ * in compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package zipkin.storage.elasticsearch.http;
+
+import com.squareup.moshi.JsonReader;
+import java.util.logging.Logger;
+import okhttp3.Request;
+import zipkin.storage.elasticsearch.http.internal.client.HttpCall;
+
+import static zipkin.moshi.JsonReaders.enterPath;
+import static zipkin.storage.elasticsearch.http.ElasticsearchHttpSpanStore.DEPENDENCY;
+import static zipkin.storage.elasticsearch.http.ElasticsearchHttpSpanStore.SPAN;
+
+/** Returns a version-specific span and dependency index template */
+final class VersionSpecificTemplates {
+ static final Logger LOG = Logger.getLogger(VersionSpecificTemplates.class.getName());
+
+ // TODO: remove when we stop writing span1 format
+ final String legacyIndexTemplate;
+ final String spanIndexTemplate;
+ final String dependencyIndexTemplate;
+
+ VersionSpecificTemplates(ElasticsearchHttpStorage es) {
+ this.legacyIndexTemplate = LEGACY_INDEX_TEMPLATE
+ .replace("${__INDEX__}", es.indexNameFormatter().index())
+ .replace("${__NUMBER_OF_SHARDS__}", String.valueOf(es.indexShards()))
+ .replace("${__NUMBER_OF_REPLICAS__}", String.valueOf(es.indexReplicas()))
+ .replace("${__TRACE_ID_MAPPING__}", es.strictTraceId()
+ ? "{ KEYWORD }" : "{ \"type\": \"STRING\", \"analyzer\": \"traceId_analyzer\" }");
+ this.spanIndexTemplate = SPAN_INDEX_TEMPLATE
+ .replace("${__INDEX__}", es.indexNameFormatter().index())
+ .replace("${__NUMBER_OF_SHARDS__}", String.valueOf(es.indexShards()))
+ .replace("${__NUMBER_OF_REPLICAS__}", String.valueOf(es.indexReplicas()))
+ .replace("${__TRACE_ID_MAPPING__}", es.strictTraceId()
+ ? "{ KEYWORD }" : "{ \"type\": \"STRING\", \"analyzer\": \"traceId_analyzer\" }");
+ this.dependencyIndexTemplate = DEPENDENCY_INDEX_TEMPLATE
+ .replace("${__INDEX__}", es.indexNameFormatter().index())
+ .replace("${__NUMBER_OF_SHARDS__}", String.valueOf(es.indexShards()))
+ .replace("${__NUMBER_OF_REPLICAS__}", String.valueOf(es.indexReplicas()));
+ }
+
+ /** Templatized due to version differences. Only fields used in search are declared */
+ static final String LEGACY_INDEX_TEMPLATE = "{\n"
+ + " \"template\": \"${__INDEX__}-*\",\n"
+ + " \"settings\": {\n"
+ + " \"index.number_of_shards\": ${__NUMBER_OF_SHARDS__},\n"
+ + " \"index.number_of_replicas\": ${__NUMBER_OF_REPLICAS__},\n"
+ + " \"index.requests.cache.enable\": true,\n"
+ + " \"index.mapper.dynamic\": false,\n"
+ + " \"analysis\": {\n"
+ + " \"analyzer\": {\n"
+ + " \"traceId_analyzer\": {\n"
+ + " \"type\": \"custom\",\n"
+ + " \"tokenizer\": \"keyword\",\n"
+ + " \"filter\": \"traceId_filter\"\n"
+ + " }\n"
+ + " },\n"
+ + " \"filter\": {\n"
+ + " \"traceId_filter\": {\n"
+ + " \"type\": \"pattern_capture\",\n"
+ + " \"patterns\": [\"([0-9a-f]{1,16})$\"],\n"
+ + " \"preserve_original\": true\n"
+ + " }\n"
+ + " }\n"
+ + " }\n"
+ + " },\n"
+ + " \"mappings\": {\n"
+ + " \"_default_\": {\n"
+ + " \"_all\": {\n"
+ + " \"enabled\": false\n"
+ + " }\n"
+ + " },\n"
+ + " \"span\": {\n"
+ + " \"properties\": {\n"
+ + " \"traceId\": ${__TRACE_ID_MAPPING__},\n"
+ + " \"name\": { KEYWORD },\n"
+ + " \"timestamp_millis\": {\n"
+ + " \"type\": \"date\",\n"
+ + " \"format\": \"epoch_millis\"\n"
+ + " },\n"
+ + " \"duration\": { \"type\": \"long\" },\n"
+ + " \"annotations\": {\n"
+ + " \"type\": \"nested\",\n"
+ + " \"dynamic\": false,\n"
+ + " \"properties\": {\n"
+ + " \"value\": { KEYWORD },\n"
+ + " \"endpoint\": {\n"
+ + " \"type\": \"object\",\n"
+ + " \"dynamic\": false,\n"
+ + " \"properties\": { \"serviceName\": { KEYWORD } }\n"
+ + " }\n"
+ + " }\n"
+ + " },\n"
+ + " \"binaryAnnotations\": {\n"
+ + " \"type\": \"nested\",\n"
+ + " \"dynamic\": false,\n"
+ + " \"properties\": {\n"
+ + " \"key\": { KEYWORD },\n"
+ + " \"value\": { KEYWORD },\n"
+ + " \"endpoint\": {\n"
+ + " \"type\": \"object\",\n"
+ + " \"dynamic\": false,\n"
+ + " \"properties\": { \"serviceName\": { KEYWORD } }\n"
+ + " }\n"
+ + " }\n"
+ + " }\n"
+ + " }\n"
+ + " },\n"
+ + " \"dependencylink\": { \"enabled\": false },\n"
+ + " \"servicespan\": {\n"
+ + " \"properties\": {\n"
+ + " \"serviceName\": { KEYWORD },\n"
+ + " \"spanName\": { KEYWORD }\n"
+ + " }\n"
+ + " }\n"
+ + " }\n"
+ + "}";
+
+ /** Templatized due to version differences. Only fields used in search are declared */
+ static final String SPAN_INDEX_TEMPLATE = "{\n"
+ + " \"TEMPLATE\": \"${__INDEX__}:" + SPAN + "-*\",\n"
+ + " \"settings\": {\n"
+ + " \"index.number_of_shards\": ${__NUMBER_OF_SHARDS__},\n"
+ + " \"index.number_of_replicas\": ${__NUMBER_OF_REPLICAS__},\n"
+ + " \"index.requests.cache.enable\": true,\n"
+ + " \"index.mapper.dynamic\": false,\n"
+ + " \"analysis\": {\n"
+ + " \"analyzer\": {\n"
+ + " \"traceId_analyzer\": {\n"
+ + " \"type\": \"custom\",\n"
+ + " \"tokenizer\": \"keyword\",\n"
+ + " \"filter\": \"traceId_filter\"\n"
+ + " }\n"
+ + " },\n"
+ + " \"filter\": {\n"
+ + " \"traceId_filter\": {\n"
+ + " \"type\": \"pattern_capture\",\n"
+ + " \"patterns\": [\"([0-9a-f]{1,16})$\"],\n"
+ + " \"preserve_original\": true\n"
+ + " }\n"
+ + " }\n"
+ + " }\n"
+ + " },\n"
+ + " \"mappings\": {\n"
+ + " \"_default_\": {\n"
+ + " \"dynamic_templates\": [\n"
+ + " {\n"
+ + " \"strings\": {\n"
+ + " \"mapping\": {\n"
+ + " KEYWORD,\n"
+ + " \"ignore_above\": 256\n"
+ + " },\n"
+ + " \"match_mapping_type\": \"string\",\n"
+ + " \"match\": \"*\"\n"
+ + " }\n"
+ + " }\n"
+ + " ]\n"
+ + " },\n"
+ + " \"" + SPAN + "\": {\n"
+ + " \"properties\": {\n"
+ + " \"traceId\": ${__TRACE_ID_MAPPING__},\n"
+ + " \"name\": { KEYWORD },\n"
+ + " \"localEndpoint\": {\n"
+ + " \"type\": \"object\",\n"
+ + " \"dynamic\": false,\n"
+ + " \"properties\": { \"serviceName\": { KEYWORD } }\n"
+ + " },\n"
+ + " \"remoteEndpoint\": {\n"
+ + " \"type\": \"object\",\n"
+ + " \"dynamic\": false,\n"
+ + " \"properties\": { \"serviceName\": { KEYWORD } }\n"
+ + " },\n"
+ + " \"timestamp_millis\": {\n"
+ + " \"type\": \"date\",\n"
+ + " \"format\": \"epoch_millis\"\n"
+ + " },\n"
+ + " \"duration\": { \"type\": \"long\" },\n"
+ + " \"annotations\": {\n"
+ + " \"type\": \"object\",\n"
+ + " \"dynamic\": false,\n"
+ + " \"properties\": {\n"
+ + " \"value\": { KEYWORD }\n"
+ + " }\n"
+ + " },\n"
+ + " \"tags\": {\n"
+ + " \"type\": \"object\",\n"
+ + " \"dynamic\": true\n"
+ + " }\n"
+ + " }\n"
+ + " }\n"
+ + " }\n"
+ + "}";
+
+ /** Templatized due to version differences. Only fields used in search are declared */
+ static final String DEPENDENCY_INDEX_TEMPLATE = "{\n"
+ + " \"TEMPLATE\": \"${__INDEX__}:" + DEPENDENCY + "-*\",\n"
+ + " \"settings\": {\n"
+ + " \"index.number_of_shards\": ${__NUMBER_OF_SHARDS__},\n"
+ + " \"index.number_of_replicas\": ${__NUMBER_OF_REPLICAS__},\n"
+ + " \"index.requests.cache.enable\": true,\n"
+ + " \"index.mapper.dynamic\": false\n"
+ + " },\n"
+ + " \"mappings\": {\"" + DEPENDENCY + "\": { \"enabled\": false }}\n"
+ + "}";
+
+ IndexTemplates get(HttpCall.Factory callFactory) {
+ float version = getVersion(callFactory);
+ return IndexTemplates.builder()
+ .version(version)
+ .legacy(version < 6 ? versionSpecificLegacyTemplate(version) : null)
+ .span(version > 2.4 ? versionSpecificSpanIndexTemplate(version) : null)
+ .dependency(version > 2.4 ? versionSpecificDependencyLinkIndexTemplate(version) : null)
+ .build();
+ }
+
+ static float getVersion(HttpCall.Factory callFactory) {
+ Request getNode = new Request.Builder().url(callFactory.baseUrl).tag("get-node").build();
+ return callFactory.execute(getNode, b -> {
+ JsonReader version = enterPath(JsonReader.of(b), "version", "number");
+ if (version == null) throw new IllegalStateException(".version.number not in response");
+ String versionString = version.nextString();
+ float result = Float.valueOf(versionString.substring(0, 3));
+ if (result < 2.4) {
+ LOG.warning("Please upgrade to Elasticsearch 2.4 or later. version=" + versionString);
+ }
+ return result;
+ });
+ }
+
+ private String versionSpecificLegacyTemplate(float version) {
+ if (version >= 2 && version < 3) {
+ return legacyIndexTemplate
+ .replace("STRING", "string")
+ .replace("KEYWORD",
+ "\"type\": \"string\", \"ignore_above\": 256, \"norms\": {\"enabled\": false }, \"index\": \"not_analyzed\"");
+ } else if (version >= 5 && version < 6) {
+ return legacyIndexTemplate
+ .replace("STRING", "text")
+ .replace("KEYWORD",
+ "\"type\": \"keyword\", \"ignore_above\": 256, \"norms\": false")
+ .replace("\"analyzer\": \"traceId_analyzer\" }",
+ "\"fielddata\": \"true\", \"analyzer\": \"traceId_analyzer\" }");
+ } else {
+ throw new IllegalStateException(
+ "Elasticsearch 2.x and 5.x support multi-type indexes, was: " + version);
+ }
+ }
+
+ private String versionSpecificSpanIndexTemplate(float version) {
+ if (version >= 2.4 && version < 3) {
+ return spanIndexTemplate
+ .replace("TEMPLATE", "template")
+ .replace("STRING", "string")
+ .replace("KEYWORD",
+ "\"type\": \"string\", \"norms\": {\"enabled\": false }, \"index\": \"not_analyzed\"");
+ } else if (version >= 5) {
+ return spanIndexTemplate
+ .replace("TEMPLATE", version >= 6 ? "index_patterns" : "template")
+ .replace("STRING", "text")
+ .replace("KEYWORD",
+ "\"type\": \"keyword\", \"norms\": false")
+ .replace("\"analyzer\": \"traceId_analyzer\" }",
+ "\"fielddata\": \"true\", \"analyzer\": \"traceId_analyzer\" }");
+ } else {
+ throw new IllegalStateException(
+ "Elasticsearch 2.4+, 5.x and 6.x allow dots in field names, was: " + version);
+ }
+ }
+
+ private String versionSpecificDependencyLinkIndexTemplate(float version) {
+ return dependencyIndexTemplate.replace("TEMPLATE",
+ version >= 6 ? "index_patterns" : "template");
+ }
+}
diff --git a/zipkin-storage/elasticsearch-http/src/main/java/zipkin/storage/elasticsearch/http/internal/LenientDoubleCallback.java b/zipkin-storage/elasticsearch-http/src/main/java/zipkin/storage/elasticsearch/http/internal/LenientDoubleCallback.java
new file mode 100644
index 00000000000..d07dcd118a0
--- /dev/null
+++ b/zipkin-storage/elasticsearch-http/src/main/java/zipkin/storage/elasticsearch/http/internal/LenientDoubleCallback.java
@@ -0,0 +1,55 @@
+/**
+ * Copyright 2015-2017 The OpenZipkin Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
+ * in compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package zipkin.storage.elasticsearch.http.internal;
+
+import java.util.logging.Level;
+import java.util.logging.Logger;
+import zipkin.storage.Callback;
+
+/** Callback that succeeds if at least one value does. The first error is logged. */
+abstract class LenientDoubleCallback implements Callback {
+ final Logger log;
+ final Callback delegate;
+
+ V v;
+ Throwable t;
+
+ LenientDoubleCallback(Logger log, Callback delegate) {
+ this.log = log;
+ this.delegate = delegate;
+ }
+
+ abstract V merge(V v1, V v2);
+
+ @Override synchronized final public void onSuccess(V value) {
+ if (t != null) {
+ delegate.onSuccess(value);
+ } else if (v == null) {
+ v = value;
+ } else {
+ delegate.onSuccess(merge(v, value));
+ }
+ }
+
+ @Override synchronized final public void onError(Throwable throwable) {
+ if (v != null) {
+ delegate.onSuccess(v);
+ } else if (t == null) {
+ log.log(Level.INFO, "first error", throwable);
+ t = throwable;
+ } else {
+ delegate.onError(throwable);
+ }
+ }
+}
diff --git a/zipkin-storage/elasticsearch-http/src/main/java/zipkin/storage/elasticsearch/http/internal/LenientDoubleCallbackAsyncSpanStore.java b/zipkin-storage/elasticsearch-http/src/main/java/zipkin/storage/elasticsearch/http/internal/LenientDoubleCallbackAsyncSpanStore.java
new file mode 100644
index 00000000000..cbb6c76a195
--- /dev/null
+++ b/zipkin-storage/elasticsearch-http/src/main/java/zipkin/storage/elasticsearch/http/internal/LenientDoubleCallbackAsyncSpanStore.java
@@ -0,0 +1,163 @@
+/**
+ * Copyright 2015-2017 The OpenZipkin Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
+ * in compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package zipkin.storage.elasticsearch.http.internal;
+
+import java.util.ArrayList;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.logging.Logger;
+import zipkin.DependencyLink;
+import zipkin.Span;
+import zipkin.internal.DependencyLinker;
+import zipkin.internal.MergeById;
+import zipkin.storage.AsyncSpanStore;
+import zipkin.storage.Callback;
+import zipkin.storage.QueryRequest;
+
+/**
+ * This makes redundant read commands, concatenating results if two answers come back, or accepting
+ * one if there's an error on the other.
+ */
+public final class LenientDoubleCallbackAsyncSpanStore implements AsyncSpanStore {
+ final AsyncSpanStore left;
+ final AsyncSpanStore right;
+
+ public LenientDoubleCallbackAsyncSpanStore(AsyncSpanStore left, AsyncSpanStore right) {
+ this.left = left;
+ this.right = right;
+ }
+
+ @Override public void getTraces(QueryRequest request, Callback>> callback) {
+ GetTracesDoubleCallback doubleCallback = new GetTracesDoubleCallback(callback);
+ left.getTraces(request, doubleCallback);
+ right.getTraces(request, doubleCallback);
+ }
+
+ static final class GetTracesDoubleCallback extends LenientDoubleCallback>> {
+ static final Logger LOG = Logger.getLogger(GetTracesDoubleCallback.class.getName());
+
+ GetTracesDoubleCallback(Callback>> delegate) {
+ super(LOG, delegate);
+ }
+
+ // For simplicity, assumes a trace isn't split across storage
+ @Override List> merge(List> v1, List> v2) {
+ List> result = new ArrayList<>(v1);
+ result.addAll(v2);
+ return result;
+ }
+ }
+
+ @Override @Deprecated public void getTrace(long id, Callback> callback) {
+ getTrace(0L, id, callback);
+ }
+
+ @Override public void getTrace(long traceIdHigh, long traceIdLow, Callback> callback) {
+ GetTraceDoubleCallback doubleCallback = new GetTraceDoubleCallback(callback);
+ left.getTrace(traceIdHigh, traceIdLow, doubleCallback);
+ right.getTrace(traceIdHigh, traceIdLow, doubleCallback);
+ }
+
+ static final class GetTraceDoubleCallback extends LenientDoubleCallback> {
+ static final Logger LOG = Logger.getLogger(GetTraceDoubleCallback.class.getName());
+
+ GetTraceDoubleCallback(Callback> delegate) {
+ super(LOG, delegate);
+ }
+
+ @Override List merge(List v1, List v2) {
+ List result = new ArrayList<>(v1);
+ result.addAll(v2);
+ return MergeById.apply(result);
+ }
+ }
+
+ @Override @Deprecated public void getRawTrace(long traceId, Callback> callback) {
+ getRawTrace(0L, traceId, callback);
+ }
+
+ @Override
+ public void getRawTrace(long traceIdHigh, long traceIdLow, Callback> callback) {
+ GetRawTraceDoubleCallback doubleCallback = new GetRawTraceDoubleCallback(callback);
+ left.getRawTrace(traceIdHigh, traceIdLow, doubleCallback);
+ right.getRawTrace(traceIdHigh, traceIdLow, doubleCallback);
+ }
+
+ static final class GetRawTraceDoubleCallback extends LenientDoubleCallback> {
+ static final Logger LOG = Logger.getLogger(GetRawTraceDoubleCallback.class.getName());
+
+ GetRawTraceDoubleCallback(Callback> delegate) {
+ super(LOG, delegate);
+ }
+
+ @Override List merge(List v1, List v2) {
+ List result = new ArrayList<>(v1);
+ result.addAll(v2);
+ return result; // don't merge as this is raw
+ }
+ }
+
+ @Override public void getServiceNames(Callback> callback) {
+ StringsDoubleCallback doubleCallback = new StringsDoubleCallback(callback);
+ left.getServiceNames(doubleCallback);
+ right.getServiceNames(doubleCallback);
+ }
+
+ static final class StringsDoubleCallback extends LenientDoubleCallback> {
+ static final Logger LOG = Logger.getLogger(StringsDoubleCallback.class.getName());
+
+ StringsDoubleCallback(Callback> delegate) {
+ super(LOG, delegate);
+ }
+
+ @Override List merge(List v1, List v2) {
+ Set result = new LinkedHashSet<>(v1); // retain order
+ result.addAll(v2);
+ return new ArrayList<>(result);
+ }
+ }
+
+ @Override public void getSpanNames(String serviceName, Callback> callback) {
+ StringsDoubleCallback doubleCallback = new StringsDoubleCallback(callback);
+ left.getSpanNames(serviceName, doubleCallback);
+ right.getSpanNames(serviceName, doubleCallback);
+ }
+
+ @Override
+ public void getDependencies(long endTs, Long lookback, Callback> callback) {
+ GetDependenciesDoubleCallback doubleCallback = new GetDependenciesDoubleCallback(callback);
+ left.getDependencies(endTs, lookback, doubleCallback);
+ right.getDependencies(endTs, lookback, doubleCallback);
+ }
+
+ static final class GetDependenciesDoubleCallback
+ extends LenientDoubleCallback> {
+ static final Logger LOG = Logger.getLogger(GetDependenciesDoubleCallback.class.getName());
+
+ GetDependenciesDoubleCallback(Callback> delegate) {
+ super(LOG, delegate);
+ }
+
+ @Override List merge(List v1, List v2) {
+ List concat = new ArrayList<>(v1);
+ concat.addAll(v2);
+ return DependencyLinker.merge(concat);
+ }
+ }
+
+ @Override public String toString() {
+ return "LenientDoubleCallbackAsyncSpanStore(" + left + "," + right + ")";
+ }
+}
diff --git a/zipkin-storage/elasticsearch-http/src/main/java/zipkin/storage/elasticsearch/http/internal/client/HttpCall.java b/zipkin-storage/elasticsearch-http/src/main/java/zipkin/storage/elasticsearch/http/internal/client/HttpCall.java
index a1a546cc78e..72c4e50353e 100644
--- a/zipkin-storage/elasticsearch-http/src/main/java/zipkin/storage/elasticsearch/http/internal/client/HttpCall.java
+++ b/zipkin-storage/elasticsearch-http/src/main/java/zipkin/storage/elasticsearch/http/internal/client/HttpCall.java
@@ -105,7 +105,8 @@ static class CallbackAdapter implements okhttp3.Callback {
if (response.isSuccessful()) {
delegate.onSuccess(bodyConverter.convert(content));
} else {
- delegate.onError(new IllegalStateException("response failed: " + content.readUtf8()));
+ delegate.onError(new IllegalStateException(
+ "response for " + response.request().tag() + " failed: " + content.readUtf8()));
}
} catch (Throwable t) {
propagateIfFatal(t);
diff --git a/zipkin-storage/elasticsearch-http/src/main/java/zipkin/storage/elasticsearch/http/internal/client/SearchCallFactory.java b/zipkin-storage/elasticsearch-http/src/main/java/zipkin/storage/elasticsearch/http/internal/client/SearchCallFactory.java
index d3eb8c4d620..34efac8e675 100644
--- a/zipkin-storage/elasticsearch-http/src/main/java/zipkin/storage/elasticsearch/http/internal/client/SearchCallFactory.java
+++ b/zipkin-storage/elasticsearch-http/src/main/java/zipkin/storage/elasticsearch/http/internal/client/SearchCallFactory.java
@@ -20,6 +20,7 @@
import okhttp3.MediaType;
import okhttp3.Request;
import okhttp3.RequestBody;
+import zipkin.internal.Nullable;
public class SearchCallFactory {
static final MediaType APPLICATION_JSON = MediaType.parse("application/json");
@@ -41,15 +42,14 @@ public HttpCall newCall(SearchRequest request, HttpCall.BodyConverter
}
/** Matches the behavior of {@code IndicesOptions#lenientExpandOpen()} */
- public HttpUrl lenientSearch(List indices, String type) {
- return http.baseUrl.newBuilder()
- .addPathSegment(join(indices))
- .addPathSegment(type)
- .addPathSegment("_search")
- // keep these in alphabetical order as it simplifies amazon signatures!
- .addQueryParameter("allow_no_indices", "true")
- .addQueryParameter("expand_wildcards", "open")
- .addQueryParameter("ignore_unavailable", "true").build();
+ HttpUrl lenientSearch(List indices, @Nullable String type) {
+ HttpUrl.Builder builder = http.baseUrl.newBuilder().addPathSegment(join(indices));
+ if (type != null) builder.addPathSegment(type);
+ return builder.addPathSegment("_search")
+ // keep these in alphabetical order as it simplifies amazon signatures!
+ .addQueryParameter("allow_no_indices", "true")
+ .addQueryParameter("expand_wildcards", "open")
+ .addQueryParameter("ignore_unavailable", "true").build();
}
static String join(List parts) {
diff --git a/zipkin-storage/elasticsearch-http/src/main/java/zipkin/storage/elasticsearch/http/internal/client/SearchRequest.java b/zipkin-storage/elasticsearch-http/src/main/java/zipkin/storage/elasticsearch/http/internal/client/SearchRequest.java
index 1a238fef1b0..48443073fb4 100644
--- a/zipkin-storage/elasticsearch-http/src/main/java/zipkin/storage/elasticsearch/http/internal/client/SearchRequest.java
+++ b/zipkin-storage/elasticsearch-http/src/main/java/zipkin/storage/elasticsearch/http/internal/client/SearchRequest.java
@@ -20,8 +20,18 @@
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
+import zipkin.internal.Nullable;
public final class SearchRequest {
+
+ public static SearchRequest create(List indices) {
+ return new SearchRequest(indices, null);
+ }
+
+ public static SearchRequest create(List indices, String type) {
+ return new SearchRequest(indices, type);
+ }
+
/**
* The maximum results returned in a query. This only affects non-aggregation requests.
*
@@ -32,7 +42,7 @@ public final class SearchRequest {
static final int MAX_RESULT_WINDOW = 10000; // the default elasticsearch allowed limit
transient final List indices;
- transient final String type;
+ @Nullable transient final String type;
Integer size = MAX_RESULT_WINDOW;
Boolean _source;
@@ -44,6 +54,18 @@ public final class SearchRequest {
this.type = type;
}
+ public static class Should extends LinkedList