Skip to content

Commit

Permalink
All in except backport script
Browse files Browse the repository at this point in the history
  • Loading branch information
Adrian Cole committed Apr 13, 2017
1 parent ab2ab63 commit a669071
Show file tree
Hide file tree
Showing 13 changed files with 471 additions and 267 deletions.
30 changes: 30 additions & 0 deletions zipkin-storage/elasticsearch-http/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,36 @@ inputs are downcased.
Span and service name queries default to look back 24hrs (2 index days).
This can be controlled by `ElasticsearchHttpStorage.Builder.namesLookback`

#### Index format
Starting with Zipkin 1.23, service and span names are written to the
same daily indexes as spans and dependency links as the document type
"servicespan". This was added for performance reasons as formerly search
was using relatively expensive nested queries.

The documents themselves represent service and span name pairs. Only one
document is present per daily index. This is to keep the documents from
repeating at a multiplier of span count, which also simplifies query.
This deduplication is enforced at write time by using an ID convention
of the service and span name. Ex. `id = MyServiceName|MySpanName`

The document is a simple structure, like:
```json
{
"serviceName": "MyServiceName",
"spanName": "MySpanName",
}
```

The document does replicate data in the ID, but it is needed as you
cannot query based on an ID expression.

#### Notes for data written prior to Zipkin 1.23
Before Zipkin 1.23, service and span names were nested queries against
the span type. This was an expensive operation, which resulted in high
latency particularly when the UI loads. When the "servicespan" type is
missing from an index, or there's no results returned, a fallback nested
query is invoked.

## Customizing the ingest pipeline

When using Elasticsearch 5.x, you can setup an [ingest pipeline](https://www.elastic.co/guide/en/elasticsearch/reference/master/pipeline.html)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,17 +13,27 @@
*/
package zipkin.storage.elasticsearch.http;

import com.squareup.moshi.JsonWriter;
import java.io.IOException;
import java.util.LinkedHashMap;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import okio.Buffer;
import zipkin.Codec;
import zipkin.Span;
import zipkin.internal.Pair;
import zipkin.storage.AsyncSpanConsumer;
import zipkin.storage.Callback;

import static zipkin.internal.ApplyTimestampAndDuration.guessTimestamp;
import static zipkin.internal.Util.UTF_8;
import static zipkin.internal.Util.propagateIfFatal;
import static zipkin.storage.elasticsearch.http.ElasticsearchHttpSpanStore.SERVICE_SPAN;

final class ElasticsearchHttpSpanConsumer implements AsyncSpanConsumer {
class ElasticsearchHttpSpanConsumer implements AsyncSpanConsumer { // not final for testing

final ElasticsearchHttpStorage es;
final IndexNameFormatter indexNameFormatter;
Expand All @@ -39,27 +49,90 @@ final class ElasticsearchHttpSpanConsumer implements AsyncSpanConsumer {
return;
}
try {
indexSpans(new HttpBulkSpanIndexer(es), spans).execute(callback);
HttpBulkIndexer indexer = new HttpBulkIndexer("index-span", es);
Map<String, Set<Pair<String>>> indexToServiceSpans = indexSpans(indexer, spans);
if (!indexToServiceSpans.isEmpty()) {
indexNames(indexer, indexToServiceSpans);
}
indexer.execute(callback);
} catch (Throwable t) {
propagateIfFatal(t);
callback.onError(t);
}
}

HttpBulkSpanIndexer indexSpans(HttpBulkSpanIndexer indexer, List<Span> spans) throws IOException {
/** Indexes spans and returns a mapping of indexes that may need a names update */
Map<String, Set<Pair<String>>> indexSpans(HttpBulkIndexer indexer, List<Span> spans) {
Map<String, Set<Pair<String>>> indexToServiceSpans = new LinkedHashMap<>();
for (Span span : spans) {
Long timestamp = guessTimestamp(span);
Long timestampMillis;
String index; // which index to store this span into
if (timestamp != null) {
timestampMillis = TimeUnit.MICROSECONDS.toMillis(timestamp);
index = indexNameFormatter.indexNameForTimestamp(timestampMillis);
if (!span.name.isEmpty()) putServiceSpans(indexToServiceSpans, index, span);
} else {
timestampMillis = null;
index = indexNameFormatter.indexNameForTimestamp(System.currentTimeMillis());
}
indexer.add(index, span, timestampMillis);
byte[] document = Codec.JSON.writeSpan(span);
if (timestampMillis != null) document = prefixWithTimestampMillis(document, timestampMillis);
indexer.add(index, ElasticsearchHttpSpanStore.SPAN, document, null /* Allow ES to choose an ID */);
}
return indexToServiceSpans;
}

void putServiceSpans(Map<String, Set<Pair<String>>> indexToServiceSpans, String index, Span s) {
Set<Pair<String>> serviceSpans = indexToServiceSpans.get(index);
if (serviceSpans == null) indexToServiceSpans.put(index, serviceSpans = new LinkedHashSet<>());
for (String serviceName : s.serviceNames()) {
serviceSpans.add(Pair.create(serviceName, s.name));
}
}

/**
* Adds service and span names to the pending batch. The id is "serviceName|spanName" to prevent
* a large order of duplicates ending up in the daily index. This also means queries do not need
* to deduplicate.
*/
void indexNames(HttpBulkIndexer indexer, Map<String, Set<Pair<String>>> indexToServiceSpans)
throws IOException {
Buffer buffer = new Buffer();
for (Map.Entry<String, Set<Pair<String>>> entry : indexToServiceSpans.entrySet()) {
String index = entry.getKey();
for (Pair<String> serviceSpan : entry.getValue()) {
JsonWriter writer = JsonWriter.of(buffer);
writer.beginObject();
writer.name("serviceName").value(serviceSpan._1);
writer.name("spanName").value(serviceSpan._2);
writer.endObject();
byte[] document = buffer.readByteArray();
indexer.add(index, SERVICE_SPAN, document, serviceSpan._1 + "|" + serviceSpan._2);
}
}
}

private static final byte[] TIMESTAMP_MILLIS_PREFIX = "{\"timestamp_millis\":".getBytes(UTF_8);

/**
* In order to allow systems like Kibana to search by timestamp, we add a field "timestamp_millis"
* when storing. The cheapest way to do this without changing the codec is prefixing it to the
* json. For example. {"traceId":"... becomes {"timestamp_millis":12345,"traceId":"...
*/
static byte[] prefixWithTimestampMillis(byte[] input, long timestampMillis) {
String dateAsString = Long.toString(timestampMillis);
byte[] newSpanBytes =
new byte[TIMESTAMP_MILLIS_PREFIX.length + dateAsString.length() + input.length];
int pos = 0;
System.arraycopy(TIMESTAMP_MILLIS_PREFIX, 0, newSpanBytes, pos, TIMESTAMP_MILLIS_PREFIX.length);
pos += TIMESTAMP_MILLIS_PREFIX.length;
for (int i = 0, length = dateAsString.length(); i < length; i++) {
newSpanBytes[pos++] = (byte) dateAsString.charAt(i);
}
return indexer;
newSpanBytes[pos++] = ',';
// starting at position 1 discards the old head of '{'
System.arraycopy(input, 1, newSpanBytes, pos, input.length - 1);
return newSpanBytes;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ final class ElasticsearchHttpSpanStore implements AsyncSpanStore {

static final String SPAN = "span";
static final String DEPENDENCY_LINK = "dependencylink";
static final String SERVICE_SPAN = "servicespan";

final SearchCallFactory search;
final String[] allIndices;
Expand Down Expand Up @@ -188,14 +189,28 @@ public void getRawTrace(long traceIdHigh, long traceIdLow, Callback<List<Span>>
long beginMillis = endMillis - namesLookback;

List<String> indices = indexNameFormatter.indexNamePatternsForRange(beginMillis, endMillis);
SearchRequest.Filters filters = new SearchRequest.Filters();
filters.addRange("timestamp_millis", beginMillis, endMillis);
SearchRequest request = SearchRequest.forIndicesAndType(indices, SPAN)
.filters(filters)
.addAggregation(Aggregation.nestedTerms("annotations.endpoint.serviceName"))
.addAggregation(Aggregation.nestedTerms("binaryAnnotations.endpoint.serviceName"));
SearchRequest request = SearchRequest.forIndicesAndType(indices, SERVICE_SPAN)
.addAggregation(Aggregation.terms("serviceName", Integer.MAX_VALUE));

search.newCall(request, BodyConverters.SORTED_KEYS).submit(new Callback<List<String>>() {
@Override public void onSuccess(List<String> value) {
if (!value.isEmpty()) callback.onSuccess(value);

// Special cased code until sites update their collectors. What this does is do a more
// expensive nested query to get service names when the servicespan type returns nothing.
SearchRequest.Filters filters = new SearchRequest.Filters();
filters.addRange("timestamp_millis", beginMillis, endMillis);
SearchRequest request = SearchRequest.forIndicesAndType(indices, SPAN)
.filters(filters)
.addAggregation(Aggregation.nestedTerms("annotations.endpoint.serviceName"))
.addAggregation(Aggregation.nestedTerms("binaryAnnotations.endpoint.serviceName"));
search.newCall(request, BodyConverters.SORTED_KEYS).submit(callback);
}

search.newCall(request, BodyConverters.SORTED_KEYS).submit(callback);
@Override public void onError(Throwable t) {
callback.onError(t);
}
});
}

@Override public void getSpanNames(String serviceName, Callback<List<String>> callback) {
Expand All @@ -208,17 +223,33 @@ public void getRawTrace(long traceIdHigh, long traceIdLow, Callback<List<Span>>
long beginMillis = endMillis - namesLookback;

List<String> indices = indexNameFormatter.indexNamePatternsForRange(beginMillis, endMillis);
SearchRequest.Filters filters = new SearchRequest.Filters();
filters.addRange("timestamp_millis", beginMillis, endMillis);
filters.addNestedTerms(asList(
"annotations.endpoint.serviceName",
"binaryAnnotations.endpoint.serviceName"
), serviceName.toLowerCase(Locale.ROOT));
SearchRequest request = SearchRequest.forIndicesAndType(indices, SPAN)
.filters(filters)
.addAggregation(Aggregation.terms("name", Integer.MAX_VALUE));

search.newCall(request, BodyConverters.SORTED_KEYS).submit(callback);

SearchRequest request = SearchRequest.forIndicesAndType(indices, SERVICE_SPAN)
.term("serviceName", serviceName.toLowerCase(Locale.ROOT))
.addAggregation(Aggregation.terms("spanName", Integer.MAX_VALUE));

search.newCall(request, BodyConverters.SORTED_KEYS).submit(new Callback<List<String>>() {
@Override public void onSuccess(List<String> value) {
if (!value.isEmpty()) callback.onSuccess(value);

// Special cased code until sites update their collectors. What this does is do a more
// expensive nested query to get span names when the servicespan type returns nothing.
SearchRequest.Filters filters = new SearchRequest.Filters();
filters.addRange("timestamp_millis", beginMillis, endMillis);
filters.addNestedTerms(asList(
"annotations.endpoint.serviceName",
"binaryAnnotations.endpoint.serviceName"
), serviceName.toLowerCase(Locale.ROOT));
SearchRequest request = SearchRequest.forIndicesAndType(indices, SPAN)
.filters(filters)
.addAggregation(Aggregation.terms("name", Integer.MAX_VALUE));
search.newCall(request, BodyConverters.SORTED_KEYS).submit(callback);
}

@Override public void onError(Throwable t) {
callback.onError(t);
}
});
}

@Override public void getDependencies(long endTs, @Nullable Long lookback,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,7 @@

// See https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-bulk.html
// exposed to re-use for testing writes of dependency links
abstract class HttpBulkIndexer<T> {
final String typeName;
final class HttpBulkIndexer {
final String tag;
final HttpCall.Factory http;
final String pipeline;
Expand All @@ -40,22 +39,20 @@ abstract class HttpBulkIndexer<T> {
final Buffer body = new Buffer();
final Set<String> indices = new LinkedHashSet<>();

HttpBulkIndexer(String typeName, ElasticsearchHttpStorage es) {
this.typeName = typeName;
tag = "index-" + typeName;
HttpBulkIndexer(String tag, ElasticsearchHttpStorage es) {
this.tag = tag;
http = es.http();
pipeline = es.pipeline();
flushOnWrites = es.flushOnWrites();
}

void add(String index, T object, @Nullable String id) {
writeIndexMetadata(index, id);
writeDocument(object);

if (flushOnWrites) indices.add(index);
void add(String index, String typeName, byte[] document, @Nullable String id) {
writeIndexMetadata(index, typeName, id);
writeDocument(document);
}

void writeIndexMetadata(String index, @Nullable String id) {
void writeIndexMetadata(String index, String typeName, @Nullable String id) {
if (flushOnWrites) indices.add(index);
body.writeUtf8("{\"index\":{\"_index\":\"").writeUtf8(index).writeByte('"');
body.writeUtf8(",\"_type\":\"").writeUtf8(typeName).writeByte('"');
if (id != null) {
Expand All @@ -64,13 +61,11 @@ void writeIndexMetadata(String index, @Nullable String id) {
body.writeUtf8("}}\n");
}

void writeDocument(T object) {
body.write(toJsonBytes(object));
void writeDocument(byte[] document) {
body.write(document);
body.writeByte('\n');
}

abstract byte[] toJsonBytes(T object);

/** Creates a bulk request when there is more than one object to store */
void execute(Callback<Void> callback) {
HttpUrl url = pipeline != null
Expand Down

This file was deleted.

Loading

0 comments on commit a669071

Please sign in to comment.