Skip to content

Commit

Permalink
Merge pull request #122 from openzipkin/async-spanstore
Browse files Browse the repository at this point in the history
Adds AsyncSpanStore: an SPI to adapt to async composition libraries
  • Loading branch information
adriancole committed Mar 23, 2016
2 parents 7da5a5c + 20e5033 commit de359e7
Show file tree
Hide file tree
Showing 23 changed files with 1,095 additions and 372 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,9 @@
import zipkin.DependencyLink;
import zipkin.InMemorySpanStore;
import zipkin.SpanStore;
import zipkin.async.BlockingSpanStoreAdapter;
import zipkin.interop.ScalaDependencyStoreAdapter;
import zipkin.interop.ScalaSpanStoreAdapter;
import zipkin.spanstore.guava.BlockingGuavaSpanStore;

import static zipkin.internal.Util.midnightUTC;

Expand All @@ -37,7 +37,7 @@ public static void setupDB() {
}

public DependencyStore store() {
return new ScalaDependencyStoreAdapter(new BlockingGuavaSpanStore(spanStore));
return new ScalaDependencyStoreAdapter(new BlockingSpanStoreAdapter(spanStore));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@
import com.twitter.zipkin.storage.SpanStore;
import com.twitter.zipkin.storage.SpanStoreSpec;
import org.junit.BeforeClass;
import zipkin.async.BlockingSpanStoreAdapter;
import zipkin.interop.ScalaSpanStoreAdapter;
import zipkin.spanstore.guava.BlockingGuavaSpanStore;

public class ElasticsearchScalaSpanStoreTest extends SpanStoreSpec {
private static ElasticsearchSpanStore spanStore;
Expand All @@ -28,7 +28,7 @@ public static void setupDB() {
}

public SpanStore store() {
return new ScalaSpanStoreAdapter(new BlockingGuavaSpanStore(spanStore));
return new ScalaSpanStoreAdapter(new BlockingSpanStoreAdapter(spanStore));
}

public void clear() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@

import org.elasticsearch.client.transport.NoNodeAvailableException;
import org.junit.AssumptionViolatedException;
import zipkin.spanstore.guava.BlockingGuavaSpanStore;
import zipkin.async.BlockingSpanStoreAdapter;

enum ElasticsearchTestGraph {
INSTANCE;
Expand All @@ -24,7 +24,7 @@ enum ElasticsearchTestGraph {

static {
// Avoid race-conditions in travis by forcing read-your-writes consistency.
BlockingGuavaSpanStore.BLOCK_ON_ACCEPT = true;
BlockingSpanStoreAdapter.BLOCK_ON_ACCEPT = true;
ElasticsearchSpanConsumer.FLUSH_ON_WRITES = true;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import zipkin.InMemorySpanStore;
import zipkin.Sampler;
import zipkin.SpanStore;
import zipkin.async.BlockingSpanStoreAdapter;
import zipkin.cassandra.CassandraConfig;
import zipkin.cassandra.CassandraSpanStore;
import zipkin.elasticsearch.ElasticsearchConfig;
Expand All @@ -49,7 +50,6 @@
import zipkin.kafka.KafkaConfig;
import zipkin.kafka.KafkaTransport;
import zipkin.server.brave.TraceWritesSpanStore;
import zipkin.spanstore.guava.BlockingGuavaSpanStore;

@Configuration
@EnableConfigurationProperties(ZipkinServerProperties.class)
Expand Down Expand Up @@ -151,7 +151,7 @@ static class ElasticsearchConfiguration {
.hosts(elasticsearch.getHosts())
.index(elasticsearch.getIndex())
.build();
return new BlockingGuavaSpanStore(new ElasticsearchSpanStore(config));
return new BlockingSpanStoreAdapter(new ElasticsearchSpanStore(config));
}
}

Expand Down
34 changes: 28 additions & 6 deletions zipkin-spanstores/elasticsearch/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,6 @@
<name>SpanStore: Elasticsearch</name>

<properties>
<maven.compiler.source>1.7</maven.compiler.source>
<maven.compiler.target>1.7</maven.compiler.target>
<main.basedir>${project.basedir}/..</main.basedir>
</properties>

Expand All @@ -38,10 +36,6 @@
<groupId>${project.groupId}</groupId>
<artifactId>zipkin</artifactId>
</dependency>
<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>spanstore-guava</artifactId>
</dependency>

<dependency>
<groupId>org.elasticsearch</groupId>
Expand Down Expand Up @@ -69,4 +63,32 @@
<scope>test</scope>
</dependency>
</dependencies>

<build>
<plugins>
<plugin>
<inherited>true</inherited>
<artifactId>maven-compiler-plugin</artifactId>
<executions>
<!-- Ensure main source tree compiles to Java 7 bytecode. -->
<execution>
<id>default-compile</id>
<phase>compile</phase>
<goals>
<goal>compile</goal>
</goals>
<configuration>
<source>1.7</source>
<target>1.7</target>
</configuration>
</execution>
</executions>
</plugin>
<!-- Make sure Java 8 types and methods aren't used -->
<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>animal-sniffer-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
/**
* Copyright 2015-2016 The OpenZipkin Authors
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/

package zipkin.elasticsearch;

import com.google.common.base.Function;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ListenableActionFuture;
import zipkin.async.Callback;

final class ElasticFutures {

static <T> ListenableFuture<T> toGuava(ListenableActionFuture<T> elasticFuture) {
final SettableFuture<T> future = SettableFuture.create();
elasticFuture.addListener(new ActionListener<T>() {
@Override public void onResponse(T t) {
future.set(t);
}

@Override public void onFailure(Throwable e) {
future.setException(e);
}
});
return future;
}

static <A, C> void onComplete(
ListenableActionFuture<A> future,
final Callback<C> callback,
final Function<A, C> successTransformationFn
) {
future.addListener(new ActionListener<A>() {

@Override public void onResponse(A a) {
try {
callback.onSuccess(successTransformationFn.apply(a));
} catch (Error | RuntimeException e) {
callback.onError(e);
}
}

@Override public void onFailure(Throwable e) {
callback.onError(e);
}
});
}

static <A, C> void onComplete(
ListenableFuture<A> future,
final Callback<C> callback,
final Function<A, C> successTransformationFn
) {
Futures.addCallback(future, new FutureCallback<A>() {

@Override public void onSuccess(A result) {
try {
callback.onSuccess(successTransformationFn.apply(result));
} catch (Error | RuntimeException e) {
callback.onError(e);
}
}

@Override public void onFailure(Throwable t) {
callback.onError(t);
}
});
}
}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -14,34 +14,34 @@
package zipkin.elasticsearch;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Function;
import com.google.common.base.Functions;
import com.google.common.util.concurrent.AsyncFunction;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import java.util.List;
import java.util.concurrent.TimeUnit;
import org.elasticsearch.action.ListenableActionFuture;
import org.elasticsearch.action.bulk.BulkRequestBuilder;
import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.client.Client;
import zipkin.Codec;
import zipkin.Span;
import zipkin.async.AsyncSpanConsumer;
import zipkin.async.Callback;
import zipkin.internal.ApplyTimestampAndDuration;
import zipkin.internal.JsonCodec;
import zipkin.spanstore.guava.GuavaSpanConsumer;

import static zipkin.elasticsearch.ElasticFutures.onComplete;
import static zipkin.elasticsearch.ElasticFutures.toGuava;

// Extracted for readability
final class ElasticsearchSpanConsumer implements GuavaSpanConsumer {
final class ElasticsearchSpanConsumer implements AsyncSpanConsumer {
/**
* Internal flag that allows you read-your-writes consistency during tests. With Elasticsearch,
* it is not sufficient to block on the {@link #accept(List)} future since the index also needs
* to be flushed.
* Internal flag that allows you read-your-writes consistency during tests. With Elasticsearch, it
* is not sufficient to block on the {@link #createSpanIndexRequest} future since the index also
* needs to be flushed.
*/
@VisibleForTesting
static boolean FLUSH_ON_WRITES;

static final JsonCodec JSON_CODEC = new JsonCodec();

private final Client client;
private final IndexNameFormatter indexNameFormatter;

Expand All @@ -51,34 +51,23 @@ final class ElasticsearchSpanConsumer implements GuavaSpanConsumer {
}

@Override
public ListenableFuture<Void> accept(List<Span> spans) {
public void accept(List<Span> spans, final Callback<Void> callback) {
BulkRequestBuilder request = client.prepareBulk();
for (Span span : spans) {
request.add(createSpanIndexRequest(ApplyTimestampAndDuration.apply(span)));
}
ListenableFuture<Void> future = toVoidFuture(request.execute());

ListenableFuture future = toGuava(request.execute());
if (FLUSH_ON_WRITES) {
future = Futures.transformAsync(
future,
new AsyncFunction<Void, Void>() {
@Override public ListenableFuture<Void> apply(Void input) throws Exception {
return toVoidFuture(client.admin().indices()
.prepareFlush(indexNameFormatter.catchAll())
.execute());
}
});
future = Futures.transform(future, new AsyncFunction() {
@Override public ListenableFuture<?> apply(Object input) {
return toGuava(client.admin().indices()
.prepareFlush(indexNameFormatter.catchAll())
.execute());
}
});
}
return future;
}

private static <T> ListenableFuture<Void> toVoidFuture(ListenableActionFuture<T> elasticFuture) {
return Futures.transform(
ElasticListenableFuture.of(elasticFuture),
new Function<T, Void>() {
@Override public Void apply(T input) {
return null;
}
});
onComplete(future, callback, Functions.<Void>constant(null));
}

private IndexRequestBuilder createSpanIndexRequest(Span span) {
Expand Down
Loading

0 comments on commit de359e7

Please sign in to comment.