diff --git a/interop/src/test/java/zipkin/elasticsearch/ElasticsearchScalaDependencyStoreTest.java b/interop/src/test/java/zipkin/elasticsearch/ElasticsearchScalaDependencyStoreTest.java index e79c9d79ac5..0e7c99f147f 100755 --- a/interop/src/test/java/zipkin/elasticsearch/ElasticsearchScalaDependencyStoreTest.java +++ b/interop/src/test/java/zipkin/elasticsearch/ElasticsearchScalaDependencyStoreTest.java @@ -24,6 +24,7 @@ import zipkin.SpanStore; import zipkin.interop.ScalaDependencyStoreAdapter; import zipkin.interop.ScalaSpanStoreAdapter; +import zipkin.spanstore.guava.BlockingGuavaSpanStore; import static zipkin.internal.Util.midnightUTC; @@ -36,7 +37,7 @@ public static void setupDB() { } public DependencyStore store() { - return new ScalaDependencyStoreAdapter(spanStore); + return new ScalaDependencyStoreAdapter(new BlockingGuavaSpanStore(spanStore)); } @Override diff --git a/interop/src/test/java/zipkin/elasticsearch/ElasticsearchScalaSpanStoreTest.java b/interop/src/test/java/zipkin/elasticsearch/ElasticsearchScalaSpanStoreTest.java index f4a42ee77ce..246b25ee268 100644 --- a/interop/src/test/java/zipkin/elasticsearch/ElasticsearchScalaSpanStoreTest.java +++ b/interop/src/test/java/zipkin/elasticsearch/ElasticsearchScalaSpanStoreTest.java @@ -17,6 +17,7 @@ import com.twitter.zipkin.storage.SpanStoreSpec; import org.junit.BeforeClass; import zipkin.interop.ScalaSpanStoreAdapter; +import zipkin.spanstore.guava.BlockingGuavaSpanStore; public class ElasticsearchScalaSpanStoreTest extends SpanStoreSpec { private static ElasticsearchSpanStore spanStore; @@ -27,7 +28,7 @@ public static void setupDB() { } public SpanStore store() { - return new ScalaSpanStoreAdapter(spanStore); + return new ScalaSpanStoreAdapter(new BlockingGuavaSpanStore(spanStore)); } public void clear() { diff --git a/interop/src/test/java/zipkin/elasticsearch/ElasticsearchTestGraph.java b/interop/src/test/java/zipkin/elasticsearch/ElasticsearchTestGraph.java old mode 100644 new mode 100755 index c331f15fef4..24fdaeedaa3 --- a/interop/src/test/java/zipkin/elasticsearch/ElasticsearchTestGraph.java +++ b/interop/src/test/java/zipkin/elasticsearch/ElasticsearchTestGraph.java @@ -15,6 +15,7 @@ import org.elasticsearch.client.transport.NoNodeAvailableException; import org.junit.AssumptionViolatedException; +import zipkin.spanstore.guava.BlockingGuavaSpanStore; enum ElasticsearchTestGraph { INSTANCE; @@ -23,7 +24,8 @@ enum ElasticsearchTestGraph { static { // Avoid race-conditions in travis by forcing read-your-writes consistency. - ElasticsearchSpanConsumer.BLOCK_ON_FUTURES = true; + BlockingGuavaSpanStore.BLOCK_ON_ACCEPT = true; + ElasticsearchSpanConsumer.FLUSH_ON_WRITES = true; } private AssumptionViolatedException ex; diff --git a/pom.xml b/pom.xml index a8e9a5aa50e..8977bdd9fee 100755 --- a/pom.xml +++ b/pom.xml @@ -51,7 +51,9 @@ 1.3.6 2.2.1 1.7.19 + 19.0 4.12 + 1.10.19 3.3.0 1.15 @@ -154,6 +156,12 @@ ${project.version} + + ${project.groupId} + spanstore-guava + ${project.version} + + ${project.groupId} transport-kafka diff --git a/zipkin-server/src/main/java/zipkin/server/ZipkinServerConfiguration.java b/zipkin-server/src/main/java/zipkin/server/ZipkinServerConfiguration.java index 8ebf847c0ed..4eea97b7c82 100644 --- a/zipkin-server/src/main/java/zipkin/server/ZipkinServerConfiguration.java +++ b/zipkin-server/src/main/java/zipkin/server/ZipkinServerConfiguration.java @@ -49,6 +49,7 @@ import zipkin.kafka.KafkaConfig; import zipkin.kafka.KafkaTransport; import zipkin.server.brave.TraceWritesSpanStore; +import zipkin.spanstore.guava.BlockingGuavaSpanStore; @Configuration @EnableConfigurationProperties(ZipkinServerProperties.class) @@ -150,7 +151,7 @@ static class ElasticsearchConfiguration { .hosts(elasticsearch.getHosts()) .index(elasticsearch.getIndex()) .build(); - return new ElasticsearchSpanStore(config); + return new BlockingGuavaSpanStore(new ElasticsearchSpanStore(config)); } } diff --git a/zipkin-spanstores/cassandra/pom.xml b/zipkin-spanstores/cassandra/pom.xml index 8841efe8769..4b3021f396b 100644 --- a/zipkin-spanstores/cassandra/pom.xml +++ b/zipkin-spanstores/cassandra/pom.xml @@ -29,7 +29,6 @@ ${project.basedir}/.. 1.38.0 - 1.10.19 diff --git a/zipkin-spanstores/elasticsearch/pom.xml b/zipkin-spanstores/elasticsearch/pom.xml index d9cec33993d..55689db87fe 100755 --- a/zipkin-spanstores/elasticsearch/pom.xml +++ b/zipkin-spanstores/elasticsearch/pom.xml @@ -38,6 +38,10 @@ ${project.groupId} zipkin + + ${project.groupId} + spanstore-guava + org.elasticsearch diff --git a/zipkin-spanstores/elasticsearch/src/main/java/zipkin/elasticsearch/ElasticListenableFuture.java b/zipkin-spanstores/elasticsearch/src/main/java/zipkin/elasticsearch/ElasticListenableFuture.java new file mode 100755 index 00000000000..c014455a2bc --- /dev/null +++ b/zipkin-spanstores/elasticsearch/src/main/java/zipkin/elasticsearch/ElasticListenableFuture.java @@ -0,0 +1,37 @@ +/** + * Copyright 2015-2016 The OpenZipkin Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ + +package zipkin.elasticsearch; + +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.SettableFuture; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.ListenableActionFuture; + +class ElasticListenableFuture { + + static ListenableFuture of(ListenableActionFuture elasticFuture) { + final SettableFuture future = SettableFuture.create(); + elasticFuture.addListener(new ActionListener() { + @Override public void onResponse(T t) { + future.set(t); + } + + @Override public void onFailure(Throwable e) { + future.setException(e); + } + }); + return future; + } +} diff --git a/zipkin-spanstores/elasticsearch/src/main/java/zipkin/elasticsearch/ElasticsearchSpanConsumer.java b/zipkin-spanstores/elasticsearch/src/main/java/zipkin/elasticsearch/ElasticsearchSpanConsumer.java index 8595be90faf..d85022e9bc8 100755 --- a/zipkin-spanstores/elasticsearch/src/main/java/zipkin/elasticsearch/ElasticsearchSpanConsumer.java +++ b/zipkin-spanstores/elasticsearch/src/main/java/zipkin/elasticsearch/ElasticsearchSpanConsumer.java @@ -13,40 +13,34 @@ */ package zipkin.elasticsearch; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Function; +import com.google.common.util.concurrent.AsyncFunction; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; import java.util.List; import java.util.concurrent.TimeUnit; import org.elasticsearch.action.ListenableActionFuture; -import org.elasticsearch.action.admin.indices.flush.FlushRequest; import org.elasticsearch.action.bulk.BulkRequestBuilder; -import org.elasticsearch.action.bulk.BulkResponse; import org.elasticsearch.action.index.IndexRequestBuilder; import org.elasticsearch.client.Client; import zipkin.Codec; import zipkin.Span; -import zipkin.SpanConsumer; import zipkin.internal.ApplyTimestampAndDuration; import zipkin.internal.JsonCodec; +import zipkin.spanstore.guava.GuavaSpanConsumer; // Extracted for readability -final class ElasticsearchSpanConsumer implements SpanConsumer { - - static final JsonCodec JSON_CODEC = new JsonCodec(); +final class ElasticsearchSpanConsumer implements GuavaSpanConsumer { /** - * Internal flag that allows you read-your-writes consistency during tests. - * - *

This is internal as collection endpoints are usually in different threads or not in the same - * process as query ones. Special-casing this allows tests to pass without changing {@link - * SpanConsumer#accept}. - * - *

Why not just change {@link SpanConsumer#accept} now? {@link SpanConsumer#accept} may indeed - * need to change, but when that occurs, we'd want to choose something that is widely supportable, - * and serving a specific use case. That api might not be a future, for example. Future is - * difficult, for example, properly supporting and testing cancel. Further, there are other async - * models such as callbacks that could be more supportable. Regardless, this work is best delayed - * until there's a worthwhile use-case vs up-fronting only due to tests, and prematurely choosing - * Future results. + * Internal flag that allows you read-your-writes consistency during tests. With Elasticsearch, + * it is not sufficient to block on the {@link #accept(List)} future since the index also needs + * to be flushed. */ - static boolean BLOCK_ON_FUTURES; + @VisibleForTesting + static boolean FLUSH_ON_WRITES; + + static final JsonCodec JSON_CODEC = new JsonCodec(); private final Client client; private final IndexNameFormatter indexNameFormatter; @@ -57,16 +51,34 @@ final class ElasticsearchSpanConsumer implements SpanConsumer { } @Override - public void accept(List spans) { + public ListenableFuture accept(List spans) { BulkRequestBuilder request = client.prepareBulk(); for (Span span : spans) { request.add(createSpanIndexRequest(ApplyTimestampAndDuration.apply(span))); } - ListenableActionFuture future = request.execute(); - if (BLOCK_ON_FUTURES) { - future.actionGet(); - client.admin().indices().flush(new FlushRequest()).actionGet(); + ListenableFuture future = toVoidFuture(request.execute()); + if (FLUSH_ON_WRITES) { + future = Futures.transformAsync( + future, + new AsyncFunction() { + @Override public ListenableFuture apply(Void input) throws Exception { + return toVoidFuture(client.admin().indices() + .prepareFlush(indexNameFormatter.catchAll()) + .execute()); + } + }); } + return future; + } + + private static ListenableFuture toVoidFuture(ListenableActionFuture elasticFuture) { + return Futures.transform( + ElasticListenableFuture.of(elasticFuture), + new Function() { + @Override public Void apply(T input) { + return null; + } + }); } private IndexRequestBuilder createSpanIndexRequest(Span span) { diff --git a/zipkin-spanstores/elasticsearch/src/main/java/zipkin/elasticsearch/ElasticsearchSpanStore.java b/zipkin-spanstores/elasticsearch/src/main/java/zipkin/elasticsearch/ElasticsearchSpanStore.java index c4d6e2a26cf..f8508797941 100755 --- a/zipkin-spanstores/elasticsearch/src/main/java/zipkin/elasticsearch/ElasticsearchSpanStore.java +++ b/zipkin-spanstores/elasticsearch/src/main/java/zipkin/elasticsearch/ElasticsearchSpanStore.java @@ -14,11 +14,15 @@ package zipkin.elasticsearch; import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Function; import com.google.common.collect.ArrayListMultimap; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Iterables; import com.google.common.net.HostAndPort; +import com.google.common.util.concurrent.AsyncFunction; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; import java.net.InetAddress; import java.net.UnknownHostException; import java.util.ArrayList; @@ -62,12 +66,13 @@ import zipkin.DependencyLink; import zipkin.QueryRequest; import zipkin.Span; -import zipkin.SpanStore; import zipkin.internal.CorrectForClockSkew; import zipkin.internal.MergeById; import zipkin.internal.Nullable; import zipkin.internal.Util; +import zipkin.spanstore.guava.GuavaSpanStore; +import static com.google.common.util.concurrent.Futures.immediateFuture; import static org.elasticsearch.index.query.QueryBuilders.boolQuery; import static org.elasticsearch.index.query.QueryBuilders.matchAllQuery; import static org.elasticsearch.index.query.QueryBuilders.nestedQuery; @@ -75,7 +80,7 @@ import static org.elasticsearch.index.query.QueryBuilders.termQuery; import static org.elasticsearch.index.query.QueryBuilders.termsQuery; -public class ElasticsearchSpanStore implements SpanStore { +public class ElasticsearchSpanStore implements GuavaSpanStore { private static final long ONE_DAY_IN_MILLIS = TimeUnit.DAYS.toMillis(1); @@ -93,11 +98,11 @@ public ElasticsearchSpanStore(ElasticsearchConfig config) { checkForIndexTemplate(); } - @Override public void accept(List spans) { - spanConsumer.accept(spans); + @Override public ListenableFuture accept(List spans) { + return spanConsumer.accept(spans); } - @Override public List> getTraces(QueryRequest request) { + @Override public ListenableFuture>> getTraces(QueryRequest request) { long endMillis = request.endTs; long beginMillis = endMillis - request.lookback; @@ -139,7 +144,7 @@ public ElasticsearchSpanStore(ElasticsearchConfig config) { List strings = computeIndices(beginMillis, endMillis); - String[] indices = strings.toArray(new String[strings.size()]); + final String[] indices = strings.toArray(new String[strings.size()]); // We need to filter to traces that contain at least one span that matches the request, // but we need to order by timestamp of the first span, regardless of if it matched the // filter or not. Normal queries usually will apply a filter first, meaning we wouldn't be @@ -164,14 +169,24 @@ public ElasticsearchSpanStore(ElasticsearchConfig config) { .script(new Script("_count > 0", ScriptType.INLINE, "expression", null))) .order(Order.aggregation("timestamps_agg", false)) .size(request.limit)); - SearchResponse response = elasticRequest.execute().actionGet(); + return Futures.transformAsync( + ElasticListenableFuture.of(elasticRequest.execute()), + new AsyncFunction>>() { + @Override public ListenableFuture>> apply(SearchResponse response) + throws Exception { + return convertTraceAggregationResponse(response, indices); + } + }); + } + private ListenableFuture>> convertTraceAggregationResponse( + SearchResponse response, String[] indices) { if (response.getAggregations() == null) { - return Collections.emptyList(); + return immediateFuture(Collections.>emptyList()); } Terms traceIdsAgg = response.getAggregations().get("traceId_agg"); if (traceIdsAgg == null) { - return Collections.emptyList(); + return immediateFuture(Collections.>emptyList()); } List traceIds = new ArrayList<>(); for (Terms.Bucket bucket : traceIdsAgg.getBuckets()) { @@ -180,24 +195,35 @@ public ElasticsearchSpanStore(ElasticsearchConfig config) { return getTracesByIds(traceIds, indices); } - @Override public List getTrace(long id) { - return Iterables.getFirst(getTracesByIds(ImmutableList.of(id), indexNameFormatter.catchAll()), - null); + @Override public ListenableFuture> getTrace(long id) { + return Futures.transform( + getTracesByIds(ImmutableList.of(id), indexNameFormatter.catchAll()), + new Function>, List>() { + @Override public List apply(List> traces) { + return Iterables.getFirst(traces, null); + } + }); } - @Override public List getRawTrace(long traceId) { + @Override public ListenableFuture> getRawTrace(long traceId) { SearchRequestBuilder elasticRequest = client.prepareSearch(indexNameFormatter.catchAll()) .setTypes(ElasticsearchConstants.SPAN) .setQuery(termQuery("traceId",String.format("%016x", traceId))); - SearchResponse response = elasticRequest.execute().actionGet(); - ImmutableList.Builder trace = ImmutableList.builder(); - for (SearchHit hit : response.getHits()) { - trace.add(Codec.JSON.readSpan(hit.getSourceRef().toBytes())); - } - return trace.build(); + return Futures.transform( + ElasticListenableFuture.of(elasticRequest.execute()), + new Function>() { + @Override public List apply(SearchResponse response) { + ImmutableList.Builder < Span > trace = ImmutableList.builder(); + for (SearchHit hit : response.getHits()) { + trace.add(Codec.JSON.readSpan(hit.getSourceRef().toBytes())); + } + return trace.build(); + } + }); } - private List> getTracesByIds(Collection traceIds, String... indices) { + private ListenableFuture>> getTracesByIds( + final Collection traceIds, String... indices) { List traceIdsStr = new ArrayList<>(traceIds.size()); for (long traceId : traceIds) { traceIdsStr.add(String.format("%016x", traceId)); @@ -213,7 +239,17 @@ private List> getTracesByIds(Collection traceIds, String... ind .addSort(SortBuilders.fieldSort("timestamp") .order(SortOrder.ASC) .unmappedType("long")); - SearchResponse response = elasticRequest.execute().actionGet(); + return Futures.transform( + ElasticListenableFuture.of(elasticRequest.execute()), + new Function>>() { + @Override public List> apply(SearchResponse response) { + return convertTracesResponse(traceIds, response); + } + }); + } + + private List> convertTracesResponse( + Collection traceIds, SearchResponse response) { ArrayListMultimap groupedSpans = ArrayListMultimap.create(); for (SearchHit hit : response.getHits()) { Span span = Codec.JSON.readSpan(hit.getSourceRef().toBytes()); @@ -235,7 +271,7 @@ private List> getTracesByIds(Collection traceIds, String... ind } @Override - public List getServiceNames() { + public ListenableFuture> getServiceNames() { SearchRequestBuilder elasticRequest = client.prepareSearch(indexNameFormatter.catchAll()) .setTypes(ElasticsearchConstants.SPAN) @@ -247,7 +283,16 @@ public List getServiceNames() { .path("binaryAnnotations") .subAggregation(AggregationBuilders.terms("binaryAnnotationsServiceName_agg") .field("binaryAnnotations.endpoint.serviceName"))); - SearchResponse response = elasticRequest.execute().actionGet(); + return Futures.transform( + ElasticListenableFuture.of(elasticRequest.execute()), + new Function>() { + @Override public List apply(SearchResponse response) { + return convertServiceNamesResponse(response); + } + }); + } + + private List convertServiceNamesResponse(SearchResponse response) { if (response.getAggregations() == null) { return Collections.emptyList(); } @@ -276,9 +321,9 @@ public List getServiceNames() { } @Override - public List getSpanNames(String serviceName) { + public ListenableFuture> getSpanNames(String serviceName) { if (Strings.isNullOrEmpty(serviceName)) { - return Collections.emptyList(); + return immediateFuture(Collections.emptyList()); } serviceName = serviceName.toLowerCase(); QueryBuilder filter = boolQuery() @@ -291,7 +336,16 @@ public List getSpanNames(String serviceName) { .addAggregation(AggregationBuilders.terms("name_agg") .order(Order.term(true)) .field("name")); - SearchResponse response = elasticRequest.execute().actionGet(); + return Futures.transform( + ElasticListenableFuture.of(elasticRequest.execute()), + new Function>() { + @Override public List apply(SearchResponse response) { + return convertSpanNameResponse(response); + } + }); + } + + private List convertSpanNameResponse(SearchResponse response) { Terms namesAgg = response.getAggregations().get("name_agg"); if (namesAgg == null) { return Collections.emptyList(); @@ -304,7 +358,8 @@ public List getSpanNames(String serviceName) { } @Override - public List getDependencies(long endMillis, @Nullable Long lookback) { + public ListenableFuture> getDependencies( + long endMillis, @Nullable Long lookback) { long beginMillis = lookback != null ? endMillis - lookback : 0; // We just return all dependencies in the days that fall within endTs and lookback as // dependency links themselves don't have timestamps. @@ -320,7 +375,16 @@ public List getDependencies(long endMillis, @Nullable Long lookb .subAggregation(AggregationBuilders.sum("callCount_agg") .field("callCount"))) .setQuery(matchAllQuery()); - SearchResponse response = elasticRequest.execute().actionGet(); + return Futures.transform( + ElasticListenableFuture.of(elasticRequest.execute()), + new Function>() { + @Override public List apply(SearchResponse response) { + return convertDependenciesResponse(response); + } + }); + } + + private List convertDependenciesResponse(SearchResponse response) { if (response.getAggregations() == null) { return Collections.emptyList(); } diff --git a/zipkin-spanstores/elasticsearch/src/test/java/zipkin/elasticsearch/ElasticsearchDependenciesTest.java b/zipkin-spanstores/elasticsearch/src/test/java/zipkin/elasticsearch/ElasticsearchDependenciesTest.java index c4bae6b1f76..cee0ebfda98 100755 --- a/zipkin-spanstores/elasticsearch/src/test/java/zipkin/elasticsearch/ElasticsearchDependenciesTest.java +++ b/zipkin-spanstores/elasticsearch/src/test/java/zipkin/elasticsearch/ElasticsearchDependenciesTest.java @@ -20,18 +20,19 @@ import zipkin.InMemorySpanStore; import zipkin.Span; import zipkin.SpanStore; +import zipkin.spanstore.guava.BlockingGuavaSpanStore; import static zipkin.internal.Util.midnightUTC; -public class ElasticsearchDependenciesTest extends DependenciesTest { +public class ElasticsearchDependenciesTest extends DependenciesTest { public ElasticsearchDependenciesTest() { - this.store = ElasticsearchTestGraph.INSTANCE.spanStore(); + this.store = new BlockingGuavaSpanStore(ElasticsearchTestGraph.INSTANCE.spanStore()); } @Override public void clear() { - store.clear(); + ElasticsearchTestGraph.INSTANCE.spanStore().clear(); } /** diff --git a/zipkin-spanstores/elasticsearch/src/test/java/zipkin/elasticsearch/ElasticsearchSpanStoreTest.java b/zipkin-spanstores/elasticsearch/src/test/java/zipkin/elasticsearch/ElasticsearchSpanStoreTest.java index c1a13e2bdaa..c44572d2bba 100644 --- a/zipkin-spanstores/elasticsearch/src/test/java/zipkin/elasticsearch/ElasticsearchSpanStoreTest.java +++ b/zipkin-spanstores/elasticsearch/src/test/java/zipkin/elasticsearch/ElasticsearchSpanStoreTest.java @@ -13,16 +13,18 @@ */ package zipkin.elasticsearch; +import zipkin.SpanStore; import zipkin.SpanStoreTest; +import zipkin.spanstore.guava.BlockingGuavaSpanStore; -public class ElasticsearchSpanStoreTest extends SpanStoreTest { +public class ElasticsearchSpanStoreTest extends SpanStoreTest { public ElasticsearchSpanStoreTest() { - this.store = ElasticsearchTestGraph.INSTANCE.spanStore(); + this.store = new BlockingGuavaSpanStore(ElasticsearchTestGraph.INSTANCE.spanStore()); } @Override public void clear() { - store.clear(); + ElasticsearchTestGraph.INSTANCE.spanStore().clear(); } } diff --git a/zipkin-spanstores/elasticsearch/src/test/java/zipkin/elasticsearch/ElasticsearchTestGraph.java b/zipkin-spanstores/elasticsearch/src/test/java/zipkin/elasticsearch/ElasticsearchTestGraph.java old mode 100644 new mode 100755 index 25c8928edf3..342c9dbdeca --- a/zipkin-spanstores/elasticsearch/src/test/java/zipkin/elasticsearch/ElasticsearchTestGraph.java +++ b/zipkin-spanstores/elasticsearch/src/test/java/zipkin/elasticsearch/ElasticsearchTestGraph.java @@ -15,6 +15,7 @@ import org.elasticsearch.client.transport.NoNodeAvailableException; import org.junit.AssumptionViolatedException; +import zipkin.spanstore.guava.BlockingGuavaSpanStore; enum ElasticsearchTestGraph { INSTANCE; @@ -22,7 +23,8 @@ enum ElasticsearchTestGraph { static final ElasticsearchConfig CONFIG = new ElasticsearchConfig.Builder().build(); static { - ElasticsearchSpanConsumer.BLOCK_ON_FUTURES = true; + BlockingGuavaSpanStore.BLOCK_ON_ACCEPT = true; + ElasticsearchSpanConsumer.FLUSH_ON_WRITES = true; } private AssumptionViolatedException ex; diff --git a/zipkin-spanstores/guava/pom.xml b/zipkin-spanstores/guava/pom.xml new file mode 100644 index 00000000000..9176f4a67c9 --- /dev/null +++ b/zipkin-spanstores/guava/pom.xml @@ -0,0 +1,62 @@ + + + + + zipkin-spanstores + io.zipkin.java + 0.9.4-SNAPSHOT + + 4.0.0 + + spanstore-guava + SpanStore: Guava support library + + + 1.7 + 1.7 + ${project.basedir}/.. + + + + + ${project.groupId} + zipkin + + + + com.google.guava + guava + ${guava.version} + + + + org.mockito + mockito-core + ${mockito.version} + test + + + + org.hamcrest + hamcrest-core + 1.3 + test + + + diff --git a/zipkin-spanstores/guava/src/main/java/zipkin/spanstore/guava/BlockingGuavaSpanStore.java b/zipkin-spanstores/guava/src/main/java/zipkin/spanstore/guava/BlockingGuavaSpanStore.java new file mode 100755 index 00000000000..b4700f79171 --- /dev/null +++ b/zipkin-spanstores/guava/src/main/java/zipkin/spanstore/guava/BlockingGuavaSpanStore.java @@ -0,0 +1,87 @@ +/** + * Copyright 2015-2016 The OpenZipkin Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package zipkin.spanstore.guava; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.util.concurrent.ListenableFuture; +import java.util.List; +import zipkin.DependencyLink; +import zipkin.QueryRequest; +import zipkin.Span; +import zipkin.SpanStore; +import zipkin.internal.Nullable; + +import static com.google.common.util.concurrent.Futures.getUnchecked; + +/** + * A {@link SpanStore} implementation that can take a {@link GuavaSpanStore} and call its methods + * with blocking, for use in callers that need a normal {@link SpanStore}. + */ +public class BlockingGuavaSpanStore implements SpanStore { + /** + * Internal flag that allows you read-your-writes consistency during tests. + * + *

This is internal as collection endpoints are usually in different threads or not in the same + * process as query ones. Special-casing this allows tests to pass without changing {@link + * GuavaSpanConsumer#accept}. + * + *

Why not just change {@link GuavaSpanConsumer#accept} now? {@link GuavaSpanConsumer#accept} + * may indeed need to change, but when that occurs, we'd want to choose something that is widely + * supportable, and serving a specific use case. That api might not be a future, for example. + * Future is difficult, for example, properly supporting and testing cancel. Further, there are + * other async models such as callbacks that could be more supportable. Regardless, this work is + * best delayed until there's a worthwhile use-case vs up-fronting only due to tests, and + * prematurely choosing Future results. + */ + @VisibleForTesting + public static boolean BLOCK_ON_ACCEPT; + + private final GuavaSpanStore delegate; + + public BlockingGuavaSpanStore(GuavaSpanStore delegate) { + this.delegate = delegate; + } + + // Only method that does not actually block even in synchronous spanstores. + @Override public void accept(List spans) { + ListenableFuture future = delegate.accept(spans); + if (BLOCK_ON_ACCEPT) { + getUnchecked(future); + } + } + + @Override public List> getTraces(QueryRequest request) { + return getUnchecked(delegate.getTraces(request)); + } + + @Override public List getTrace(long id) { + return getUnchecked(delegate.getTrace(id)); + } + + @Override public List getRawTrace(long traceId) { + return getUnchecked(delegate.getRawTrace(traceId)); + } + + @Override public List getServiceNames() { + return getUnchecked(delegate.getServiceNames()); + } + + @Override public List getSpanNames(String serviceName) { + return getUnchecked(delegate.getSpanNames(serviceName)); + } + + @Override public List getDependencies(long endTs, @Nullable Long lookback) { + return getUnchecked(delegate.getDependencies(endTs, lookback)); + } +} diff --git a/zipkin-spanstores/guava/src/main/java/zipkin/spanstore/guava/GuavaSpanConsumer.java b/zipkin-spanstores/guava/src/main/java/zipkin/spanstore/guava/GuavaSpanConsumer.java new file mode 100755 index 00000000000..23b36b23af3 --- /dev/null +++ b/zipkin-spanstores/guava/src/main/java/zipkin/spanstore/guava/GuavaSpanConsumer.java @@ -0,0 +1,31 @@ +/** + * Copyright 2015-2016 The OpenZipkin Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package zipkin.spanstore.guava; + +import com.google.common.util.concurrent.ListenableFuture; +import java.util.List; +import zipkin.Span; + +/** + * An interface that is equivalent to {@link zipkin.SpanConsumer} but exposes methods as + * {@link ListenableFuture} to allow asynchronous composition. + */ +// @FunctionalInterface +public interface GuavaSpanConsumer { + + /** + * Version of {@link zipkin.SpanConsumer#accept} that returns a {@link ListenableFuture}. + */ + ListenableFuture accept(List spans); +} diff --git a/zipkin-spanstores/guava/src/main/java/zipkin/spanstore/guava/GuavaSpanStore.java b/zipkin-spanstores/guava/src/main/java/zipkin/spanstore/guava/GuavaSpanStore.java new file mode 100755 index 00000000000..b91ff579210 --- /dev/null +++ b/zipkin-spanstores/guava/src/main/java/zipkin/spanstore/guava/GuavaSpanStore.java @@ -0,0 +1,66 @@ +/** + * Copyright 2015-2016 The OpenZipkin Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package zipkin.spanstore.guava; + +import com.google.common.util.concurrent.ListenableFuture; +import java.util.List; +import zipkin.DependencyLink; +import zipkin.QueryRequest; +import zipkin.Span; +import zipkin.internal.Nullable; + +/** + * An interface that is equivalent to {@link zipkin.SpanStore} but exposes methods as + * {@link ListenableFuture} to allow asynchronous composition. + * + * @see zipkin.SpanStore + */ +public interface GuavaSpanStore extends GuavaSpanConsumer { + + /** + * Version of {@link zipkin.SpanStore#accept} that returns {@link ListenableFuture}. + */ + @Override + ListenableFuture accept(List spans); + + /** + * Version of {@link zipkin.SpanStore#getTraces} that returns {@link ListenableFuture}. + */ + ListenableFuture>> getTraces(QueryRequest request); + + /** + * Version of {@link zipkin.SpanStore#getTrace} that returns {@link ListenableFuture}. + */ + ListenableFuture> getTrace(long id); + + /** + * Version of {@link zipkin.SpanStore#getRawTrace} that returns {@link ListenableFuture}. + */ + ListenableFuture> getRawTrace(long traceId); + + /** + * Version of {@link zipkin.SpanStore#getServiceNames} that returns {@link ListenableFuture}. + */ + ListenableFuture> getServiceNames(); + + /** + * Version of {@link zipkin.SpanStore#getSpanNames} that returns {@link ListenableFuture}. + */ + ListenableFuture> getSpanNames(String serviceName); + + /** + * Version of {@link zipkin.SpanStore#getDependencies} that returns {@link ListenableFuture}. + */ + ListenableFuture> getDependencies(long endTs, @Nullable Long lookback); +} diff --git a/zipkin-spanstores/guava/src/test/java/zipkin/spanstore/guava/BlockingGuavaSpanStoreTest.java b/zipkin-spanstores/guava/src/test/java/zipkin/spanstore/guava/BlockingGuavaSpanStoreTest.java new file mode 100755 index 00000000000..80835618ffe --- /dev/null +++ b/zipkin-spanstores/guava/src/test/java/zipkin/spanstore/guava/BlockingGuavaSpanStoreTest.java @@ -0,0 +1,200 @@ +/** + * Copyright 2015-2016 The OpenZipkin Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package zipkin.spanstore.guava; + +import com.google.common.collect.ImmutableList; +import java.util.Arrays; +import java.util.List; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; +import org.mockito.Mock; +import org.mockito.junit.MockitoJUnit; +import org.mockito.junit.MockitoRule; +import zipkin.Annotation; +import zipkin.BinaryAnnotation; +import zipkin.DependencyLink; +import zipkin.Endpoint; +import zipkin.QueryRequest; +import zipkin.Span; + +import static com.google.common.util.concurrent.Futures.immediateFuture; +import static java.util.Arrays.asList; +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.Mockito.when; + +public class BlockingGuavaSpanStoreTest { + + long spanId = 456; + long today = System.currentTimeMillis(); + Endpoint ep = Endpoint.create("service", 127 << 24 | 1, 8080); + + Annotation ann1 = Annotation.create((today + 1) * 1000, "cs", ep); + Annotation ann2 = Annotation.create((today + 2) * 1000, "sr", null); + Annotation ann3 = Annotation.create((today + 10) * 1000, "custom", ep); + Annotation ann4 = Annotation.create((today + 20) * 1000, "custom", ep); + Annotation ann5 = Annotation.create((today + 5) * 1000, "custom", ep); + Annotation ann6 = Annotation.create((today + 6) * 1000, "custom", ep); + Annotation ann7 = Annotation.create((today + 7) * 1000, "custom", ep); + Annotation ann8 = Annotation.create((today + 8) * 1000, "custom", ep); + + Span span1 = new Span.Builder() + .traceId(123) + .name("methodcall") + .id(spanId) + .timestamp(ann1.timestamp).duration(9000L) + .annotations(asList(ann1, ann3)) + .addBinaryAnnotation(BinaryAnnotation.create("BAH", "BEH", ep)).build(); + + Span span2 = new Span.Builder() + .traceId(456) + .name("methodcall") + .id(spanId) + .timestamp(ann2.timestamp) + .addAnnotation(ann2) + .addBinaryAnnotation(BinaryAnnotation.create("BAH2", "BEH2", ep)).build(); + + Span span3 = new Span.Builder() + .traceId(789) + .name("methodcall") + .id(spanId) + .timestamp(ann2.timestamp).duration(18000L) + .annotations(asList(ann2, ann3, ann4)) + .addBinaryAnnotation(BinaryAnnotation.create("BAH2", "BEH2", ep)).build(); + + Span span4 = new Span.Builder() + .traceId(999) + .name("methodcall") + .id(spanId) + .timestamp(ann6.timestamp).duration(1000L) + .annotations(asList(ann6, ann7)).build(); + + Span span5 = new Span.Builder() + .traceId(999) + .name("methodcall") + .id(spanId) + .timestamp(ann5.timestamp).duration(3000L) + .annotations(asList(ann5, ann8)) + .addBinaryAnnotation(BinaryAnnotation.create("BAH2", "BEH2", ep)).build(); + + List trace1 = ImmutableList.of(span1, span2, span3); + + List trace2 = ImmutableList.of(span4, span5); + + List> traces = ImmutableList.of(trace1, trace2); + + List deps = ImmutableList.of( + new DependencyLink.Builder().parent("zipkin-web").child("zipkin-query").callCount(1).build(), + new DependencyLink.Builder().parent("zipkin-query").child("zipkin-foo").callCount(10).build() + ); + + @Rule + public MockitoRule mocks = MockitoJUnit.rule(); + + @Rule + public ExpectedException thrown = ExpectedException.none(); + + @Mock + private GuavaSpanStore delegate; + + private BlockingGuavaSpanStore spanStore; + + @Before + public void setUp() { + spanStore = new BlockingGuavaSpanStore(delegate); + } + + @Test + public void getTraces_success() { + QueryRequest request = new QueryRequest.Builder("service").endTs(1000L).build(); + when(delegate.getTraces(request)).thenReturn(immediateFuture(traces)); + assertThat(spanStore.getTraces(request)).containsExactlyElementsOf(traces); + } + + @Test + public void getTraces_exception() { + QueryRequest request = new QueryRequest.Builder("service").endTs(1000L).build(); + when(delegate.getTraces(request)).thenThrow(new IllegalStateException("failed")); + thrown.expect(IllegalStateException.class);; + spanStore.getTraces(request); + } + + @Test + public void getTrace_success() { + when(delegate.getTrace(1L)).thenReturn(immediateFuture(trace1)); + assertThat(spanStore.getTrace(1L)).containsExactlyElementsOf(trace1); + } + + @Test + public void getTrace_exception() { + when(delegate.getTrace(1L)).thenThrow(new IllegalStateException("failed")); + thrown.expect(IllegalStateException.class);; + spanStore.getTrace(1L); + } + + @Test + public void getRawTrace_success() { + when(delegate.getRawTrace(1L)).thenReturn(immediateFuture(trace1)); + assertThat(spanStore.getRawTrace(1L)).containsExactlyElementsOf(trace1); + } + + @Test + public void getRawTrace_exception() { + when(delegate.getRawTrace(1L)).thenThrow(new IllegalStateException("failed")); + thrown.expect(IllegalStateException.class);; + spanStore.getRawTrace(1L); + } + + @Test + public void getServiceNamees_success() { + when(delegate.getServiceNames()) + .thenReturn(immediateFuture(Arrays.asList("service1", "service2"))); + assertThat(spanStore.getServiceNames()).containsExactly("service1", "service2"); + } + + @Test + public void getServiceNames_exception() { + when(delegate.getServiceNames()).thenThrow(new IllegalStateException("failed")); + thrown.expect(IllegalStateException.class);; + spanStore.getServiceNames(); + } + + @Test + public void getSpanNames_success() { + when(delegate.getSpanNames("service")).thenReturn(immediateFuture( + Arrays.asList("span1", "span2"))); + assertThat(spanStore.getSpanNames("service")).containsExactly("span1", "span2"); + } + + @Test + public void getSpanNames_exception() { + when(delegate.getSpanNames("service")).thenThrow(new IllegalStateException("failed")); + thrown.expect(IllegalStateException.class);; + spanStore.getSpanNames("service"); + } + + @Test + public void getDependencies_success() { + when(delegate.getDependencies(1L, 0L)).thenReturn(immediateFuture(deps)); + assertThat(spanStore.getDependencies(1L, 0L)).containsExactlyElementsOf(deps); + } + + @Test + public void getDependencies_exception() { + when(delegate.getDependencies(1L, 0L)).thenThrow(new IllegalStateException("failed")); + thrown.expect(IllegalStateException.class);; + spanStore.getDependencies(1L, 0L); + } +} diff --git a/zipkin-spanstores/pom.xml b/zipkin-spanstores/pom.xml index 5be14495239..e3e3eae9dcb 100644 --- a/zipkin-spanstores/pom.xml +++ b/zipkin-spanstores/pom.xml @@ -31,5 +31,6 @@ cassandra jdbc elasticsearch + guava