Skip to content

Commit

Permalink
Indexes service/span names
Browse files Browse the repository at this point in the history
  • Loading branch information
Adrian Cole committed Apr 13, 2017
1 parent ab2ab63 commit 79f8a9b
Show file tree
Hide file tree
Showing 8 changed files with 270 additions and 238 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -13,15 +13,25 @@
*/
package zipkin.storage.elasticsearch.http;

import com.squareup.moshi.JsonWriter;
import java.io.IOException;
import java.util.LinkedHashMap;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import okio.Buffer;
import zipkin.Codec;
import zipkin.Span;
import zipkin.internal.Pair;
import zipkin.storage.AsyncSpanConsumer;
import zipkin.storage.Callback;

import static zipkin.internal.ApplyTimestampAndDuration.guessTimestamp;
import static zipkin.internal.Util.UTF_8;
import static zipkin.internal.Util.propagateIfFatal;
import static zipkin.storage.elasticsearch.http.ElasticsearchHttpSpanStore.SERVICE_SPAN;

final class ElasticsearchHttpSpanConsumer implements AsyncSpanConsumer {

Expand All @@ -39,27 +49,85 @@ final class ElasticsearchHttpSpanConsumer implements AsyncSpanConsumer {
return;
}
try {
indexSpans(new HttpBulkSpanIndexer(es), spans).execute(callback);
HttpBulkIndexer indexer = new HttpBulkIndexer("index-span", es);
Map<String, Set<Pair<String>>> indexToServiceSpans = indexSpans(indexer, spans);
if (!indexToServiceSpans.isEmpty()) {
indexServiceSpans(indexer, indexToServiceSpans);
}
indexer.execute(callback);
} catch (Throwable t) {
propagateIfFatal(t);
callback.onError(t);
}
}

HttpBulkSpanIndexer indexSpans(HttpBulkSpanIndexer indexer, List<Span> spans) throws IOException {
Map<String, Set<Pair<String>>> indexSpans(HttpBulkIndexer indexer, List<Span> spans)
throws IOException {
Map<String, Set<Pair<String>>> indexToServiceSpans = new LinkedHashMap<>();
for (Span span : spans) {
Long timestamp = guessTimestamp(span);
Long timestampMillis;
String index; // which index to store this span into
if (timestamp != null) {
timestampMillis = TimeUnit.MICROSECONDS.toMillis(timestamp);
index = indexNameFormatter.indexNameForTimestamp(timestampMillis);
for (String serviceName : span.serviceNames()) {
if (!span.name.isEmpty()) {
Set<Pair<String>> serviceSpans = indexToServiceSpans.get(index);
if (serviceSpans == null) {
indexToServiceSpans.put(index, serviceSpans = new LinkedHashSet<>());
}
serviceSpans.add(Pair.create(serviceName, span.name));
}
}
} else {
timestampMillis = null;
index = indexNameFormatter.indexNameForTimestamp(System.currentTimeMillis());
}
indexer.add(index, span, timestampMillis);
byte[] document = Codec.JSON.writeSpan(span);
if (timestampMillis != null) document = prefixWithTimestampMillis(document, timestampMillis);
indexer.add(index, ElasticsearchHttpSpanStore.SPAN, document, null /* Allow ES to choose an ID */);
}
return indexToServiceSpans;
}

void indexServiceSpans(HttpBulkIndexer indexer,
Map<String, Set<Pair<String>>> indexToServiceSpans) throws IOException {
Buffer buffer = new Buffer();
for (Map.Entry<String, Set<Pair<String>>> entry : indexToServiceSpans.entrySet()) {
String index = entry.getKey();
for (Pair<String> serviceSpan : entry.getValue()) {
JsonWriter writer = JsonWriter.of(buffer);
writer.beginObject();
writer.name("serviceName").value(serviceSpan._1);
writer.name("spanName").value(serviceSpan._2);
writer.endObject();
byte[] document = buffer.readByteArray();
indexer.add(index, SERVICE_SPAN, document, serviceSpan._1 + "|" + serviceSpan._2);
}
}
}

private static final byte[] TIMESTAMP_MILLIS_PREFIX = "{\"timestamp_millis\":".getBytes(UTF_8);

/**
* In order to allow systems like Kibana to search by timestamp, we add a field "timestamp_millis"
* when storing. The cheapest way to do this without changing the codec is prefixing it to the
* json. For example. {"traceId":"... becomes {"timestamp_millis":12345,"traceId":"...
*/
static byte[] prefixWithTimestampMillis(byte[] input, long timestampMillis) {
String dateAsString = Long.toString(timestampMillis);
byte[] newSpanBytes =
new byte[TIMESTAMP_MILLIS_PREFIX.length + dateAsString.length() + input.length];
int pos = 0;
System.arraycopy(TIMESTAMP_MILLIS_PREFIX, 0, newSpanBytes, pos, TIMESTAMP_MILLIS_PREFIX.length);
pos += TIMESTAMP_MILLIS_PREFIX.length;
for (int i = 0, length = dateAsString.length(); i < length; i++) {
newSpanBytes[pos++] = (byte) dateAsString.charAt(i);
}
return indexer;
newSpanBytes[pos++] = ',';
// starting at position 1 discards the old head of '{'
System.arraycopy(input, 1, newSpanBytes, pos, input.length - 1);
return newSpanBytes;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ final class ElasticsearchHttpSpanStore implements AsyncSpanStore {

static final String SPAN = "span";
static final String DEPENDENCY_LINK = "dependencylink";
static final String SERVICE_SPAN = "servicespan";

final SearchCallFactory search;
final String[] allIndices;
Expand Down Expand Up @@ -188,12 +189,8 @@ public void getRawTrace(long traceIdHigh, long traceIdLow, Callback<List<Span>>
long beginMillis = endMillis - namesLookback;

List<String> indices = indexNameFormatter.indexNamePatternsForRange(beginMillis, endMillis);
SearchRequest.Filters filters = new SearchRequest.Filters();
filters.addRange("timestamp_millis", beginMillis, endMillis);
SearchRequest request = SearchRequest.forIndicesAndType(indices, SPAN)
.filters(filters)
.addAggregation(Aggregation.nestedTerms("annotations.endpoint.serviceName"))
.addAggregation(Aggregation.nestedTerms("binaryAnnotations.endpoint.serviceName"));
SearchRequest request = SearchRequest.forIndicesAndType(indices, SERVICE_SPAN)
.addAggregation(Aggregation.terms("serviceName", Integer.MAX_VALUE));

search.newCall(request, BodyConverters.SORTED_KEYS).submit(callback);
}
Expand All @@ -208,15 +205,10 @@ public void getRawTrace(long traceIdHigh, long traceIdLow, Callback<List<Span>>
long beginMillis = endMillis - namesLookback;

List<String> indices = indexNameFormatter.indexNamePatternsForRange(beginMillis, endMillis);
SearchRequest.Filters filters = new SearchRequest.Filters();
filters.addRange("timestamp_millis", beginMillis, endMillis);
filters.addNestedTerms(asList(
"annotations.endpoint.serviceName",
"binaryAnnotations.endpoint.serviceName"
), serviceName.toLowerCase(Locale.ROOT));
SearchRequest request = SearchRequest.forIndicesAndType(indices, SPAN)
.filters(filters)
.addAggregation(Aggregation.terms("name", Integer.MAX_VALUE));

SearchRequest request = SearchRequest.forIndicesAndType(indices, SERVICE_SPAN)
.term("serviceName", serviceName.toLowerCase(Locale.ROOT))
.addAggregation(Aggregation.terms("spanName", Integer.MAX_VALUE));

search.newCall(request, BodyConverters.SORTED_KEYS).submit(callback);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,7 @@

// See https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-bulk.html
// exposed to re-use for testing writes of dependency links
abstract class HttpBulkIndexer<T> {
final String typeName;
final class HttpBulkIndexer {
final String tag;
final HttpCall.Factory http;
final String pipeline;
Expand All @@ -40,22 +39,20 @@ abstract class HttpBulkIndexer<T> {
final Buffer body = new Buffer();
final Set<String> indices = new LinkedHashSet<>();

HttpBulkIndexer(String typeName, ElasticsearchHttpStorage es) {
this.typeName = typeName;
tag = "index-" + typeName;
HttpBulkIndexer(String tag, ElasticsearchHttpStorage es) {
this.tag = tag;
http = es.http();
pipeline = es.pipeline();
flushOnWrites = es.flushOnWrites();
}

void add(String index, T object, @Nullable String id) {
writeIndexMetadata(index, id);
writeDocument(object);

if (flushOnWrites) indices.add(index);
void add(String index, String typeName, byte[] document, @Nullable String id) {
writeIndexMetadata(index, typeName, id);
writeDocument(document);
}

void writeIndexMetadata(String index, @Nullable String id) {
void writeIndexMetadata(String index, String typeName, @Nullable String id) {
if (flushOnWrites) indices.add(index);
body.writeUtf8("{\"index\":{\"_index\":\"").writeUtf8(index).writeByte('"');
body.writeUtf8(",\"_type\":\"").writeUtf8(typeName).writeByte('"');
if (id != null) {
Expand All @@ -64,13 +61,11 @@ void writeIndexMetadata(String index, @Nullable String id) {
body.writeUtf8("}}\n");
}

void writeDocument(T object) {
body.write(toJsonBytes(object));
void writeDocument(byte[] document) {
body.write(document);
body.writeByte('\n');
}

abstract byte[] toJsonBytes(T object);

/** Creates a bulk request when there is more than one object to store */
void execute(Callback<Void> callback) {
HttpUrl url = pipeline != null
Expand Down

This file was deleted.

Loading

0 comments on commit 79f8a9b

Please sign in to comment.