Skip to content

Commit

Permalink
Adds GuavaSpanStore which exposes span store methods as ListenableFuture
Browse files Browse the repository at this point in the history
GuavaSpanStore (and its blocking adapter) enables experimentation with
asynchronous composition without adding new dependencies to existing
span stores.

This adapts the elasticsearch as a first step.

See #111
  • Loading branch information
Anuraag Agrawal authored and Adrian Cole committed Mar 23, 2016
1 parent 36a3e53 commit 818ea2f
Show file tree
Hide file tree
Showing 19 changed files with 646 additions and 65 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import zipkin.SpanStore;
import zipkin.interop.ScalaDependencyStoreAdapter;
import zipkin.interop.ScalaSpanStoreAdapter;
import zipkin.spanstore.guava.BlockingGuavaSpanStore;

import static zipkin.internal.Util.midnightUTC;

Expand All @@ -36,7 +37,7 @@ public static void setupDB() {
}

public DependencyStore store() {
return new ScalaDependencyStoreAdapter(spanStore);
return new ScalaDependencyStoreAdapter(new BlockingGuavaSpanStore(spanStore));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import com.twitter.zipkin.storage.SpanStoreSpec;
import org.junit.BeforeClass;
import zipkin.interop.ScalaSpanStoreAdapter;
import zipkin.spanstore.guava.BlockingGuavaSpanStore;

public class ElasticsearchScalaSpanStoreTest extends SpanStoreSpec {
private static ElasticsearchSpanStore spanStore;
Expand All @@ -27,7 +28,7 @@ public static void setupDB() {
}

public SpanStore store() {
return new ScalaSpanStoreAdapter(spanStore);
return new ScalaSpanStoreAdapter(new BlockingGuavaSpanStore(spanStore));
}

public void clear() {
Expand Down
4 changes: 3 additions & 1 deletion interop/src/test/java/zipkin/elasticsearch/ElasticsearchTestGraph.java
100644 → 100755
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

import org.elasticsearch.client.transport.NoNodeAvailableException;
import org.junit.AssumptionViolatedException;
import zipkin.spanstore.guava.BlockingGuavaSpanStore;

enum ElasticsearchTestGraph {
INSTANCE;
Expand All @@ -23,7 +24,8 @@ enum ElasticsearchTestGraph {

static {
// Avoid race-conditions in travis by forcing read-your-writes consistency.
ElasticsearchSpanConsumer.BLOCK_ON_FUTURES = true;
BlockingGuavaSpanStore.BLOCK_ON_ACCEPT = true;
ElasticsearchSpanConsumer.FLUSH_ON_WRITES = true;
}

private AssumptionViolatedException ex;
Expand Down
8 changes: 8 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,9 @@
<mariadb-java-client.version>1.3.6</mariadb-java-client.version>
<elasticsearch.version>2.2.1</elasticsearch.version>
<slf4j.version>1.7.19</slf4j.version>
<guava.version>19.0</guava.version>
<junit.version>4.12</junit.version>
<mockito.version>1.10.19</mockito.version>
<assertj.version>3.3.0</assertj.version>

<animal-sniffer-maven-plugin.version>1.15</animal-sniffer-maven-plugin.version>
Expand Down Expand Up @@ -154,6 +156,12 @@
<version>${project.version}</version>
</dependency>

<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>spanstore-guava</artifactId>
<version>${project.version}</version>
</dependency>

<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>transport-kafka</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import zipkin.kafka.KafkaConfig;
import zipkin.kafka.KafkaTransport;
import zipkin.server.brave.TraceWritesSpanStore;
import zipkin.spanstore.guava.BlockingGuavaSpanStore;

@Configuration
@EnableConfigurationProperties(ZipkinServerProperties.class)
Expand Down Expand Up @@ -150,7 +151,7 @@ static class ElasticsearchConfiguration {
.hosts(elasticsearch.getHosts())
.index(elasticsearch.getIndex())
.build();
return new ElasticsearchSpanStore(config);
return new BlockingGuavaSpanStore(new ElasticsearchSpanStore(config));
}
}

Expand Down
1 change: 0 additions & 1 deletion zipkin-spanstores/cassandra/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
<properties>
<main.basedir>${project.basedir}/..</main.basedir>
<zipkin-cassandra-core.version>1.38.0</zipkin-cassandra-core.version>
<mockito.version>1.10.19</mockito.version>
</properties>

<dependencies>
Expand Down
4 changes: 4 additions & 0 deletions zipkin-spanstores/elasticsearch/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,10 @@
<groupId>${project.groupId}</groupId>
<artifactId>zipkin</artifactId>
</dependency>
<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>spanstore-guava</artifactId>
</dependency>

<dependency>
<groupId>org.elasticsearch</groupId>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/**
* Copyright 2015-2016 The OpenZipkin Authors
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/

package zipkin.elasticsearch;

import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ListenableActionFuture;

class ElasticListenableFuture {

static <T> ListenableFuture<T> of(ListenableActionFuture<T> elasticFuture) {
final SettableFuture<T> future = SettableFuture.create();
elasticFuture.addListener(new ActionListener<T>() {
@Override public void onResponse(T t) {
future.set(t);
}

@Override public void onFailure(Throwable e) {
future.setException(e);
}
});
return future;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,40 +13,34 @@
*/
package zipkin.elasticsearch;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Function;
import com.google.common.util.concurrent.AsyncFunction;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import java.util.List;
import java.util.concurrent.TimeUnit;
import org.elasticsearch.action.ListenableActionFuture;
import org.elasticsearch.action.admin.indices.flush.FlushRequest;
import org.elasticsearch.action.bulk.BulkRequestBuilder;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.client.Client;
import zipkin.Codec;
import zipkin.Span;
import zipkin.SpanConsumer;
import zipkin.internal.ApplyTimestampAndDuration;
import zipkin.internal.JsonCodec;
import zipkin.spanstore.guava.GuavaSpanConsumer;

// Extracted for readability
final class ElasticsearchSpanConsumer implements SpanConsumer {

static final JsonCodec JSON_CODEC = new JsonCodec();
final class ElasticsearchSpanConsumer implements GuavaSpanConsumer {
/**
* Internal flag that allows you read-your-writes consistency during tests.
*
* <p>This is internal as collection endpoints are usually in different threads or not in the same
* process as query ones. Special-casing this allows tests to pass without changing {@link
* SpanConsumer#accept}.
*
* <p>Why not just change {@link SpanConsumer#accept} now? {@link SpanConsumer#accept} may indeed
* need to change, but when that occurs, we'd want to choose something that is widely supportable,
* and serving a specific use case. That api might not be a future, for example. Future is
* difficult, for example, properly supporting and testing cancel. Further, there are other async
* models such as callbacks that could be more supportable. Regardless, this work is best delayed
* until there's a worthwhile use-case vs up-fronting only due to tests, and prematurely choosing
* Future results.
* Internal flag that allows you read-your-writes consistency during tests. With Elasticsearch,
* it is not sufficient to block on the {@link #accept(List)} future since the index also needs
* to be flushed.
*/
static boolean BLOCK_ON_FUTURES;
@VisibleForTesting
static boolean FLUSH_ON_WRITES;

static final JsonCodec JSON_CODEC = new JsonCodec();

private final Client client;
private final IndexNameFormatter indexNameFormatter;
Expand All @@ -57,16 +51,34 @@ final class ElasticsearchSpanConsumer implements SpanConsumer {
}

@Override
public void accept(List<Span> spans) {
public ListenableFuture<Void> accept(List<Span> spans) {
BulkRequestBuilder request = client.prepareBulk();
for (Span span : spans) {
request.add(createSpanIndexRequest(ApplyTimestampAndDuration.apply(span)));
}
ListenableActionFuture<BulkResponse> future = request.execute();
if (BLOCK_ON_FUTURES) {
future.actionGet();
client.admin().indices().flush(new FlushRequest()).actionGet();
ListenableFuture<Void> future = toVoidFuture(request.execute());
if (FLUSH_ON_WRITES) {
future = Futures.transformAsync(
future,
new AsyncFunction<Void, Void>() {
@Override public ListenableFuture<Void> apply(Void input) throws Exception {
return toVoidFuture(client.admin().indices()
.prepareFlush(indexNameFormatter.catchAll())
.execute());
}
});
}
return future;
}

private static <T> ListenableFuture<Void> toVoidFuture(ListenableActionFuture<T> elasticFuture) {
return Futures.transform(
ElasticListenableFuture.of(elasticFuture),
new Function<T, Void>() {
@Override public Void apply(T input) {
return null;
}
});
}

private IndexRequestBuilder createSpanIndexRequest(Span span) {
Expand Down
Loading

0 comments on commit 818ea2f

Please sign in to comment.