Skip to content

Commit

Permalink
early spike
Browse files Browse the repository at this point in the history
  • Loading branch information
Adrian Cole committed Apr 12, 2017
1 parent 05025b4 commit e2173ff
Show file tree
Hide file tree
Showing 3 changed files with 94 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -13,15 +13,23 @@
*/
package zipkin.storage.elasticsearch.http;

import com.squareup.moshi.JsonAdapter;
import com.squareup.moshi.Moshi;
import java.io.IOException;
import java.util.LinkedHashMap;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import okio.Buffer;
import zipkin.Span;
import zipkin.storage.AsyncSpanConsumer;
import zipkin.storage.Callback;

import static zipkin.internal.ApplyTimestampAndDuration.guessTimestamp;
import static zipkin.internal.Util.propagateIfFatal;
import static zipkin.storage.elasticsearch.http.ElasticsearchHttpSpanStore.SERVICE_SPAN;

final class ElasticsearchHttpSpanConsumer implements AsyncSpanConsumer {

Expand All @@ -39,27 +47,101 @@ final class ElasticsearchHttpSpanConsumer implements AsyncSpanConsumer {
return;
}
try {
indexSpans(new HttpBulkSpanIndexer(es), spans).execute(callback);
HttpBulkSpanIndexer spanIndexer = new HttpBulkSpanIndexer(es);
Map<String, Set<ServiceSpan>> indexToServiceSpans = indexSpans(spanIndexer, spans);
if (indexToServiceSpans.isEmpty()) {
spanIndexer.execute(callback);
return;
}
HttpBulkIndexer<ServiceSpan> serviceSpanIndexer =
new HttpBulkIndexer<ServiceSpan>(SERVICE_SPAN, es) {
Buffer buffer = new Buffer();

@Override byte[] toJsonBytes(ServiceSpan serviceSpan) {
try {
adapter.toJson(buffer, serviceSpan);
return buffer.readByteArray();
} catch (IOException e) {
throw new RuntimeException(e);
}
}
};
for (Map.Entry<String, Set<ServiceSpan>> entry : indexToServiceSpans.entrySet()) {
String index = entry.getKey();
for (ServiceSpan serviceSpan : entry.getValue()) {
serviceSpanIndexer.add(index, serviceSpan,
serviceSpan.serviceName + "|" + serviceSpan.spanName);
}
}
spanIndexer.execute(new Callback<Void>() {
@Override public void onSuccess(Void value) {
serviceSpanIndexer.execute(callback);
}

@Override public void onError(Throwable t) {
callback.onError(t);
}
});
} catch (Throwable t) {
propagateIfFatal(t);
callback.onError(t);
}
}

HttpBulkSpanIndexer indexSpans(HttpBulkSpanIndexer indexer, List<Span> spans) throws IOException {
static final JsonAdapter<ServiceSpan> adapter =
new Moshi.Builder().build().adapter(ServiceSpan.class);

static final class ServiceSpan {

final String serviceName;
final String spanName;

ServiceSpan(String serviceName, String spanName) {
this.serviceName = serviceName;
this.spanName = spanName;
}

@Override public boolean equals(Object o) {
ServiceSpan that = (ServiceSpan) o;
return (this.serviceName.equals(that.serviceName))
&& (this.spanName.equals(that.spanName));
}

@Override public int hashCode() {
int h = 1;
h *= 1000003;
h ^= serviceName.hashCode();
h *= 1000003;
h ^= spanName.hashCode();
return h;
}
}

Map<String, Set<ServiceSpan>> indexSpans(HttpBulkSpanIndexer indexer, List<Span> spans)
throws IOException {
Map<String, Set<ServiceSpan>> indexToServiceSpans = new LinkedHashMap<>();
for (Span span : spans) {
Long timestamp = guessTimestamp(span);
Long timestampMillis;
String index; // which index to store this span into
if (timestamp != null) {
timestampMillis = TimeUnit.MICROSECONDS.toMillis(timestamp);
index = indexNameFormatter.indexNameForTimestamp(timestampMillis);
for (String serviceName : span.serviceNames()) {
if (!span.name.isEmpty()) {
Set<ServiceSpan> serviceSpans = indexToServiceSpans.get(index);
if (serviceSpans == null) {
indexToServiceSpans.put(index, serviceSpans = new LinkedHashSet<>());
}
serviceSpans.add(new ServiceSpan(serviceName, span.name));
}
}
} else {
timestampMillis = null;
index = indexNameFormatter.indexNameForTimestamp(System.currentTimeMillis());
}
indexer.add(index, span, timestampMillis);
}
return indexer;
return indexToServiceSpans;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ final class ElasticsearchHttpSpanStore implements AsyncSpanStore {

static final String SPAN = "span";
static final String DEPENDENCY_LINK = "dependencylink";
static final String SERVICE_SPAN = "servicespan";

final SearchCallFactory search;
final String[] allIndices;
Expand Down Expand Up @@ -188,12 +189,8 @@ public void getRawTrace(long traceIdHigh, long traceIdLow, Callback<List<Span>>
long beginMillis = endMillis - namesLookback;

List<String> indices = indexNameFormatter.indexNamePatternsForRange(beginMillis, endMillis);
SearchRequest.Filters filters = new SearchRequest.Filters();
filters.addRange("timestamp_millis", beginMillis, endMillis);
SearchRequest request = SearchRequest.forIndicesAndType(indices, SPAN)
.filters(filters)
.addAggregation(Aggregation.nestedTerms("annotations.endpoint.serviceName"))
.addAggregation(Aggregation.nestedTerms("binaryAnnotations.endpoint.serviceName"));
SearchRequest request = SearchRequest.forIndicesAndType(indices, SERVICE_SPAN)
.addAggregation(Aggregation.terms("serviceName", Integer.MAX_VALUE));

search.newCall(request, BodyConverters.SORTED_KEYS).submit(callback);
}
Expand All @@ -208,15 +205,10 @@ public void getRawTrace(long traceIdHigh, long traceIdLow, Callback<List<Span>>
long beginMillis = endMillis - namesLookback;

List<String> indices = indexNameFormatter.indexNamePatternsForRange(beginMillis, endMillis);
SearchRequest.Filters filters = new SearchRequest.Filters();
filters.addRange("timestamp_millis", beginMillis, endMillis);
filters.addNestedTerms(asList(
"annotations.endpoint.serviceName",
"binaryAnnotations.endpoint.serviceName"
), serviceName.toLowerCase(Locale.ROOT));
SearchRequest request = SearchRequest.forIndicesAndType(indices, SPAN)
.filters(filters)
.addAggregation(Aggregation.terms("name", Integer.MAX_VALUE));

SearchRequest request = SearchRequest.forIndicesAndType(indices, SERVICE_SPAN)
.term("serviceName", serviceName.toLowerCase(Locale.ROOT))
.addAggregation(Aggregation.terms("spanName", Integer.MAX_VALUE));

search.newCall(request, BodyConverters.SORTED_KEYS).submit(callback);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@

import static java.util.Arrays.asList;
import static org.assertj.core.api.Assertions.assertThat;
import static zipkin.storage.elasticsearch.http.ElasticsearchHttpSpanStore.SERVICE_SPAN;
import static zipkin.storage.elasticsearch.http.TestResponses.SERVICE_NAMES;
import static zipkin.storage.elasticsearch.http.TestResponses.SPAN_NAMES;

Expand Down Expand Up @@ -83,8 +84,6 @@ private void requestLimitedTo2DaysOfIndices() throws InterruptedException {

RecordedRequest request = es.takeRequest();
assertThat(request.getPath())
.startsWith("/" + indexesToSearch + "/span/_search");
assertThat(request.getBody().readUtf8())
.contains("{\"range\":{\"timestamp_millis\"");
.startsWith("/" + indexesToSearch + "/" + SERVICE_SPAN + "/_search");
}
}

0 comments on commit e2173ff

Please sign in to comment.