From 3fbee819c1266be5193df14350e8136e60761443 Mon Sep 17 00:00:00 2001
From: Adrian Cole <acole@pivotal.io>
Date: Wed, 23 Mar 2016 16:08:45 +0800
Subject: [PATCH 1/2] Enforce Java 7, but not in tests

---
 zipkin-spanstores/elasticsearch/pom.xml | 34 ++++++++++++++++++++-----
 zipkin-spanstores/guava/pom.xml         | 30 ++++++++++++++++++++--
 2 files changed, 56 insertions(+), 8 deletions(-)

diff --git a/zipkin-spanstores/elasticsearch/pom.xml b/zipkin-spanstores/elasticsearch/pom.xml
index 55689db87fe..f30e8381d34 100755
--- a/zipkin-spanstores/elasticsearch/pom.xml
+++ b/zipkin-spanstores/elasticsearch/pom.xml
@@ -28,8 +28,6 @@
   <name>SpanStore: Elasticsearch</name>
 
   <properties>
-    <maven.compiler.source>1.7</maven.compiler.source>
-    <maven.compiler.target>1.7</maven.compiler.target>
     <main.basedir>${project.basedir}/..</main.basedir>
   </properties>
 
@@ -38,10 +36,6 @@
       <groupId>${project.groupId}</groupId>
       <artifactId>zipkin</artifactId>
     </dependency>
-    <dependency>
-      <groupId>${project.groupId}</groupId>
-      <artifactId>spanstore-guava</artifactId>
-    </dependency>
 
     <dependency>
       <groupId>org.elasticsearch</groupId>
@@ -69,4 +63,32 @@
       <scope>test</scope>
     </dependency>
   </dependencies>
+
+  <build>
+    <plugins>
+      <plugin>
+        <inherited>true</inherited>
+        <artifactId>maven-compiler-plugin</artifactId>
+        <executions>
+          <!-- Ensure main source tree compiles to Java 7 bytecode. -->
+          <execution>
+            <id>default-compile</id>
+            <phase>compile</phase>
+            <goals>
+              <goal>compile</goal>
+            </goals>
+            <configuration>
+              <source>1.7</source>
+              <target>1.7</target>
+            </configuration>
+          </execution>
+        </executions>
+      </plugin>
+      <!-- Make sure Java 8 types and methods aren't used -->
+      <plugin>
+        <groupId>org.codehaus.mojo</groupId>
+        <artifactId>animal-sniffer-maven-plugin</artifactId>
+      </plugin>
+    </plugins>
+  </build>
 </project>
diff --git a/zipkin-spanstores/guava/pom.xml b/zipkin-spanstores/guava/pom.xml
index 9176f4a67c9..e1eb8c4c01b 100644
--- a/zipkin-spanstores/guava/pom.xml
+++ b/zipkin-spanstores/guava/pom.xml
@@ -28,8 +28,6 @@
   <name>SpanStore: Guava support library</name>
 
   <properties>
-    <maven.compiler.source>1.7</maven.compiler.source>
-    <maven.compiler.target>1.7</maven.compiler.target>
     <main.basedir>${project.basedir}/..</main.basedir>
   </properties>
 
@@ -59,4 +57,32 @@
       <scope>test</scope>
     </dependency>
   </dependencies>
+
+  <build>
+    <plugins>
+      <plugin>
+        <inherited>true</inherited>
+        <artifactId>maven-compiler-plugin</artifactId>
+        <executions>
+          <!-- Ensure main source tree compiles to Java 7 bytecode. -->
+          <execution>
+            <id>default-compile</id>
+            <phase>compile</phase>
+            <goals>
+              <goal>compile</goal>
+            </goals>
+            <configuration>
+              <source>1.7</source>
+              <target>1.7</target>
+            </configuration>
+          </execution>
+        </executions>
+      </plugin>
+      <!-- Make sure Java 8 types and methods aren't used -->
+      <plugin>
+        <groupId>org.codehaus.mojo</groupId>
+        <artifactId>animal-sniffer-maven-plugin</artifactId>
+      </plugin>
+    </plugins>
+  </build>
 </project>

From 20e50337c463b3d8f8029dcd7c9f00c157864b20 Mon Sep 17 00:00:00 2001
From: Adrian Cole <acole@pivotal.io>
Date: Wed, 23 Mar 2016 16:07:13 +0800
Subject: [PATCH 2/2] Adds AsyncSpanStore: an SPI to adapt to async composition
 libraries

This adds a callback-driven AsyncSpanStore, which can be used to build
futures or other async constructs. This isn't intended to be used
directly by users. Rather, it is an SPI used to intermediate between
libraries that can drift, such as Guava or RxJava.
---
 ...ElasticsearchScalaDependencyStoreTest.java |   4 +-
 .../ElasticsearchScalaSpanStoreTest.java      |   4 +-
 .../elasticsearch/ElasticsearchTestGraph.java |   4 +-
 .../server/ZipkinServerConfiguration.java     |   4 +-
 .../zipkin/elasticsearch/ElasticFutures.java  |  83 +++++
 .../ElasticListenableFuture.java              |  37 ---
 .../ElasticsearchSpanConsumer.java            |  53 ++-
 .../elasticsearch/ElasticsearchSpanStore.java | 314 +++++++++---------
 .../ElasticsearchDependenciesTest.java        |   4 +-
 .../ElasticsearchSpanStoreTest.java           |   4 +-
 .../elasticsearch/ElasticsearchTestGraph.java |   4 +-
 .../guava/AsyncGuavaSpanStoreAdapter.java     |  90 +++++
 .../guava/BlockingGuavaSpanStore.java         |  87 -----
 .../guava/AsyncGuavaSpanStoreAdapterTest.java | 241 ++++++++++++++
 zipkin/pom.xml                                |  15 +
 .../java/zipkin/async/AsyncSpanConsumer.java  |  35 ++
 .../java/zipkin/async/AsyncSpanStore.java     |  68 ++++
 .../async/BlockingSpanStoreAdapter.java       | 135 ++++++++
 .../src/main/java/zipkin/async/Callback.java  |  40 +++
 .../async/BlockingSpanStoreAdapterTest.java   |  95 ++++--
 .../java/zipkin/async/CallbackCaptorTest.java |  82 +++++
 21 files changed, 1039 insertions(+), 364 deletions(-)
 create mode 100755 zipkin-spanstores/elasticsearch/src/main/java/zipkin/elasticsearch/ElasticFutures.java
 delete mode 100755 zipkin-spanstores/elasticsearch/src/main/java/zipkin/elasticsearch/ElasticListenableFuture.java
 create mode 100644 zipkin-spanstores/guava/src/main/java/zipkin/spanstore/guava/AsyncGuavaSpanStoreAdapter.java
 delete mode 100755 zipkin-spanstores/guava/src/main/java/zipkin/spanstore/guava/BlockingGuavaSpanStore.java
 create mode 100755 zipkin-spanstores/guava/src/test/java/zipkin/spanstore/guava/AsyncGuavaSpanStoreAdapterTest.java
 create mode 100644 zipkin/src/main/java/zipkin/async/AsyncSpanConsumer.java
 create mode 100644 zipkin/src/main/java/zipkin/async/AsyncSpanStore.java
 create mode 100644 zipkin/src/main/java/zipkin/async/BlockingSpanStoreAdapter.java
 create mode 100644 zipkin/src/main/java/zipkin/async/Callback.java
 rename zipkin-spanstores/guava/src/test/java/zipkin/spanstore/guava/BlockingGuavaSpanStoreTest.java => zipkin/src/test/java/zipkin/async/BlockingSpanStoreAdapterTest.java (65%)
 mode change 100755 => 100644
 create mode 100644 zipkin/src/test/java/zipkin/async/CallbackCaptorTest.java

diff --git a/interop/src/test/java/zipkin/elasticsearch/ElasticsearchScalaDependencyStoreTest.java b/interop/src/test/java/zipkin/elasticsearch/ElasticsearchScalaDependencyStoreTest.java
index 0e7c99f147f..1bd71a87697 100755
--- a/interop/src/test/java/zipkin/elasticsearch/ElasticsearchScalaDependencyStoreTest.java
+++ b/interop/src/test/java/zipkin/elasticsearch/ElasticsearchScalaDependencyStoreTest.java
@@ -22,9 +22,9 @@
 import zipkin.DependencyLink;
 import zipkin.InMemorySpanStore;
 import zipkin.SpanStore;
+import zipkin.async.BlockingSpanStoreAdapter;
 import zipkin.interop.ScalaDependencyStoreAdapter;
 import zipkin.interop.ScalaSpanStoreAdapter;
-import zipkin.spanstore.guava.BlockingGuavaSpanStore;
 
 import static zipkin.internal.Util.midnightUTC;
 
@@ -37,7 +37,7 @@ public static void setupDB() {
   }
 
   public DependencyStore store() {
-    return new ScalaDependencyStoreAdapter(new BlockingGuavaSpanStore(spanStore));
+    return new ScalaDependencyStoreAdapter(new BlockingSpanStoreAdapter(spanStore));
   }
 
   @Override
diff --git a/interop/src/test/java/zipkin/elasticsearch/ElasticsearchScalaSpanStoreTest.java b/interop/src/test/java/zipkin/elasticsearch/ElasticsearchScalaSpanStoreTest.java
index 246b25ee268..f5cdcaf39b5 100644
--- a/interop/src/test/java/zipkin/elasticsearch/ElasticsearchScalaSpanStoreTest.java
+++ b/interop/src/test/java/zipkin/elasticsearch/ElasticsearchScalaSpanStoreTest.java
@@ -16,8 +16,8 @@
 import com.twitter.zipkin.storage.SpanStore;
 import com.twitter.zipkin.storage.SpanStoreSpec;
 import org.junit.BeforeClass;
+import zipkin.async.BlockingSpanStoreAdapter;
 import zipkin.interop.ScalaSpanStoreAdapter;
-import zipkin.spanstore.guava.BlockingGuavaSpanStore;
 
 public class ElasticsearchScalaSpanStoreTest extends SpanStoreSpec {
   private static ElasticsearchSpanStore spanStore;
@@ -28,7 +28,7 @@ public static void setupDB() {
   }
 
   public SpanStore store() {
-    return new ScalaSpanStoreAdapter(new BlockingGuavaSpanStore(spanStore));
+    return new ScalaSpanStoreAdapter(new BlockingSpanStoreAdapter(spanStore));
   }
 
   public void clear() {
diff --git a/interop/src/test/java/zipkin/elasticsearch/ElasticsearchTestGraph.java b/interop/src/test/java/zipkin/elasticsearch/ElasticsearchTestGraph.java
index 24fdaeedaa3..0386cc6bd10 100755
--- a/interop/src/test/java/zipkin/elasticsearch/ElasticsearchTestGraph.java
+++ b/interop/src/test/java/zipkin/elasticsearch/ElasticsearchTestGraph.java
@@ -15,7 +15,7 @@
 
 import org.elasticsearch.client.transport.NoNodeAvailableException;
 import org.junit.AssumptionViolatedException;
-import zipkin.spanstore.guava.BlockingGuavaSpanStore;
+import zipkin.async.BlockingSpanStoreAdapter;
 
 enum ElasticsearchTestGraph {
   INSTANCE;
@@ -24,7 +24,7 @@ enum ElasticsearchTestGraph {
 
   static {
     // Avoid race-conditions in travis by forcing read-your-writes consistency.
-    BlockingGuavaSpanStore.BLOCK_ON_ACCEPT = true;
+    BlockingSpanStoreAdapter.BLOCK_ON_ACCEPT = true;
     ElasticsearchSpanConsumer.FLUSH_ON_WRITES = true;
   }
 
diff --git a/zipkin-server/src/main/java/zipkin/server/ZipkinServerConfiguration.java b/zipkin-server/src/main/java/zipkin/server/ZipkinServerConfiguration.java
index 4eea97b7c82..5a2e9c99720 100644
--- a/zipkin-server/src/main/java/zipkin/server/ZipkinServerConfiguration.java
+++ b/zipkin-server/src/main/java/zipkin/server/ZipkinServerConfiguration.java
@@ -41,6 +41,7 @@
 import zipkin.InMemorySpanStore;
 import zipkin.Sampler;
 import zipkin.SpanStore;
+import zipkin.async.BlockingSpanStoreAdapter;
 import zipkin.cassandra.CassandraConfig;
 import zipkin.cassandra.CassandraSpanStore;
 import zipkin.elasticsearch.ElasticsearchConfig;
@@ -49,7 +50,6 @@
 import zipkin.kafka.KafkaConfig;
 import zipkin.kafka.KafkaTransport;
 import zipkin.server.brave.TraceWritesSpanStore;
-import zipkin.spanstore.guava.BlockingGuavaSpanStore;
 
 @Configuration
 @EnableConfigurationProperties(ZipkinServerProperties.class)
@@ -151,7 +151,7 @@ static class ElasticsearchConfiguration {
           .hosts(elasticsearch.getHosts())
           .index(elasticsearch.getIndex())
           .build();
-      return new BlockingGuavaSpanStore(new ElasticsearchSpanStore(config));
+      return new BlockingSpanStoreAdapter(new ElasticsearchSpanStore(config));
     }
   }
 
diff --git a/zipkin-spanstores/elasticsearch/src/main/java/zipkin/elasticsearch/ElasticFutures.java b/zipkin-spanstores/elasticsearch/src/main/java/zipkin/elasticsearch/ElasticFutures.java
new file mode 100755
index 00000000000..34fc12d7a28
--- /dev/null
+++ b/zipkin-spanstores/elasticsearch/src/main/java/zipkin/elasticsearch/ElasticFutures.java
@@ -0,0 +1,83 @@
+/**
+ * Copyright 2015-2016 The OpenZipkin Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
+ * in compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package zipkin.elasticsearch;
+
+import com.google.common.base.Function;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.SettableFuture;
+import org.elasticsearch.action.ActionListener;
+import org.elasticsearch.action.ListenableActionFuture;
+import zipkin.async.Callback;
+
+final class ElasticFutures {
+
+  static <T> ListenableFuture<T> toGuava(ListenableActionFuture<T> elasticFuture) {
+    final SettableFuture<T> future = SettableFuture.create();
+    elasticFuture.addListener(new ActionListener<T>() {
+      @Override public void onResponse(T t) {
+        future.set(t);
+      }
+
+      @Override public void onFailure(Throwable e) {
+        future.setException(e);
+      }
+    });
+    return future;
+  }
+
+  static <A, C> void onComplete(
+      ListenableActionFuture<A> future,
+      final Callback<C> callback,
+      final Function<A, C> successTransformationFn
+  ) {
+    future.addListener(new ActionListener<A>() {
+
+      @Override public void onResponse(A a) {
+        try {
+          callback.onSuccess(successTransformationFn.apply(a));
+        } catch (Error | RuntimeException e) {
+          callback.onError(e);
+        }
+      }
+
+      @Override public void onFailure(Throwable e) {
+        callback.onError(e);
+      }
+    });
+  }
+
+  static <A, C> void onComplete(
+      ListenableFuture<A> future,
+      final Callback<C> callback,
+      final Function<A, C> successTransformationFn
+  ) {
+    Futures.addCallback(future, new FutureCallback<A>() {
+
+      @Override public void onSuccess(A result) {
+        try {
+          callback.onSuccess(successTransformationFn.apply(result));
+        } catch (Error | RuntimeException e) {
+          callback.onError(e);
+        }
+      }
+
+      @Override public void onFailure(Throwable t) {
+        callback.onError(t);
+      }
+    });
+  }
+}
diff --git a/zipkin-spanstores/elasticsearch/src/main/java/zipkin/elasticsearch/ElasticListenableFuture.java b/zipkin-spanstores/elasticsearch/src/main/java/zipkin/elasticsearch/ElasticListenableFuture.java
deleted file mode 100755
index c014455a2bc..00000000000
--- a/zipkin-spanstores/elasticsearch/src/main/java/zipkin/elasticsearch/ElasticListenableFuture.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/**
- * Copyright 2015-2016 The OpenZipkin Authors
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
- * in compliance with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License
- * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
- * or implied. See the License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package zipkin.elasticsearch;
-
-import com.google.common.util.concurrent.ListenableFuture;
-import com.google.common.util.concurrent.SettableFuture;
-import org.elasticsearch.action.ActionListener;
-import org.elasticsearch.action.ListenableActionFuture;
-
-class ElasticListenableFuture {
-
-  static <T> ListenableFuture<T> of(ListenableActionFuture<T> elasticFuture) {
-    final SettableFuture<T> future = SettableFuture.create();
-    elasticFuture.addListener(new ActionListener<T>() {
-      @Override public void onResponse(T t) {
-        future.set(t);
-      }
-
-      @Override public void onFailure(Throwable e) {
-        future.setException(e);
-      }
-    });
-    return future;
-  }
-}
diff --git a/zipkin-spanstores/elasticsearch/src/main/java/zipkin/elasticsearch/ElasticsearchSpanConsumer.java b/zipkin-spanstores/elasticsearch/src/main/java/zipkin/elasticsearch/ElasticsearchSpanConsumer.java
index d85022e9bc8..9d1e9fc288f 100755
--- a/zipkin-spanstores/elasticsearch/src/main/java/zipkin/elasticsearch/ElasticsearchSpanConsumer.java
+++ b/zipkin-spanstores/elasticsearch/src/main/java/zipkin/elasticsearch/ElasticsearchSpanConsumer.java
@@ -14,34 +14,34 @@
 package zipkin.elasticsearch;
 
 import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Function;
+import com.google.common.base.Functions;
 import com.google.common.util.concurrent.AsyncFunction;
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
 import java.util.List;
 import java.util.concurrent.TimeUnit;
-import org.elasticsearch.action.ListenableActionFuture;
 import org.elasticsearch.action.bulk.BulkRequestBuilder;
 import org.elasticsearch.action.index.IndexRequestBuilder;
 import org.elasticsearch.client.Client;
 import zipkin.Codec;
 import zipkin.Span;
+import zipkin.async.AsyncSpanConsumer;
+import zipkin.async.Callback;
 import zipkin.internal.ApplyTimestampAndDuration;
-import zipkin.internal.JsonCodec;
-import zipkin.spanstore.guava.GuavaSpanConsumer;
+
+import static zipkin.elasticsearch.ElasticFutures.onComplete;
+import static zipkin.elasticsearch.ElasticFutures.toGuava;
 
 // Extracted for readability
-final class ElasticsearchSpanConsumer implements GuavaSpanConsumer {
+final class ElasticsearchSpanConsumer implements AsyncSpanConsumer {
   /**
-   * Internal flag that allows you read-your-writes consistency during tests. With Elasticsearch,
-   * it is not sufficient to block on the {@link #accept(List)} future since the index also needs
-   * to be flushed.
+   * Internal flag that allows you read-your-writes consistency during tests. With Elasticsearch, it
+   * is not sufficient to block on the {@link #createSpanIndexRequest} future since the index also
+   * needs to be flushed.
    */
   @VisibleForTesting
   static boolean FLUSH_ON_WRITES;
 
-  static final JsonCodec JSON_CODEC = new JsonCodec();
-
   private final Client client;
   private final IndexNameFormatter indexNameFormatter;
 
@@ -51,34 +51,23 @@ final class ElasticsearchSpanConsumer implements GuavaSpanConsumer {
   }
 
   @Override
-  public ListenableFuture<Void> accept(List<Span> spans) {
+  public void accept(List<Span> spans, final Callback<Void> callback) {
     BulkRequestBuilder request = client.prepareBulk();
     for (Span span : spans) {
       request.add(createSpanIndexRequest(ApplyTimestampAndDuration.apply(span)));
     }
-    ListenableFuture<Void> future = toVoidFuture(request.execute());
+
+    ListenableFuture future = toGuava(request.execute());
     if (FLUSH_ON_WRITES) {
-      future = Futures.transformAsync(
-          future,
-          new AsyncFunction<Void, Void>() {
-            @Override public ListenableFuture<Void> apply(Void input) throws Exception {
-              return toVoidFuture(client.admin().indices()
-                  .prepareFlush(indexNameFormatter.catchAll())
-                  .execute());
-            }
-          });
+      future = Futures.transform(future, new AsyncFunction() {
+        @Override public ListenableFuture<?> apply(Object input) {
+          return toGuava(client.admin().indices()
+              .prepareFlush(indexNameFormatter.catchAll())
+              .execute());
+        }
+      });
     }
-    return future;
-  }
-
-  private static <T> ListenableFuture<Void> toVoidFuture(ListenableActionFuture<T> elasticFuture) {
-    return Futures.transform(
-        ElasticListenableFuture.of(elasticFuture),
-        new Function<T, Void>() {
-          @Override public Void apply(T input) {
-            return null;
-          }
-        });
+    onComplete(future, callback, Functions.<Void>constant(null));
   }
 
   private IndexRequestBuilder createSpanIndexRequest(Span span) {
diff --git a/zipkin-spanstores/elasticsearch/src/main/java/zipkin/elasticsearch/ElasticsearchSpanStore.java b/zipkin-spanstores/elasticsearch/src/main/java/zipkin/elasticsearch/ElasticsearchSpanStore.java
index 8ffe2339876..9ca3eab2c9b 100755
--- a/zipkin-spanstores/elasticsearch/src/main/java/zipkin/elasticsearch/ElasticsearchSpanStore.java
+++ b/zipkin-spanstores/elasticsearch/src/main/java/zipkin/elasticsearch/ElasticsearchSpanStore.java
@@ -15,10 +15,11 @@
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Function;
+import com.google.common.base.Functions;
 import com.google.common.collect.ArrayListMultimap;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.Iterables;
+import com.google.common.collect.Ordering;
 import com.google.common.net.HostAndPort;
 import com.google.common.util.concurrent.AsyncFunction;
 import com.google.common.util.concurrent.Futures;
@@ -28,6 +29,7 @@
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.Comparator;
 import java.util.List;
 import java.util.Map;
 import java.util.SortedSet;
@@ -60,29 +62,35 @@
 import org.elasticsearch.search.aggregations.metrics.sum.Sum;
 import org.elasticsearch.search.aggregations.metrics.tophits.TopHits;
 import org.elasticsearch.search.aggregations.pipeline.PipelineAggregatorBuilders;
-import org.elasticsearch.search.sort.SortBuilders;
-import org.elasticsearch.search.sort.SortOrder;
 import zipkin.Codec;
 import zipkin.DependencyLink;
 import zipkin.QueryRequest;
 import zipkin.Span;
+import zipkin.async.AsyncSpanStore;
+import zipkin.async.Callback;
 import zipkin.internal.CorrectForClockSkew;
 import zipkin.internal.MergeById;
 import zipkin.internal.Nullable;
 import zipkin.internal.Util;
-import zipkin.spanstore.guava.GuavaSpanStore;
 
-import static com.google.common.util.concurrent.Futures.immediateFuture;
+import static com.google.common.util.concurrent.Futures.transform;
 import static org.elasticsearch.index.query.QueryBuilders.boolQuery;
 import static org.elasticsearch.index.query.QueryBuilders.matchAllQuery;
 import static org.elasticsearch.index.query.QueryBuilders.nestedQuery;
 import static org.elasticsearch.index.query.QueryBuilders.rangeQuery;
 import static org.elasticsearch.index.query.QueryBuilders.termQuery;
 import static org.elasticsearch.index.query.QueryBuilders.termsQuery;
-
-public class ElasticsearchSpanStore implements GuavaSpanStore {
-
-  private static final long ONE_DAY_IN_MILLIS = TimeUnit.DAYS.toMillis(1);
+import static zipkin.elasticsearch.ElasticFutures.onComplete;
+import static zipkin.elasticsearch.ElasticFutures.toGuava;
+
+public class ElasticsearchSpanStore implements AsyncSpanStore {
+  static final long ONE_DAY_IN_MILLIS = TimeUnit.DAYS.toMillis(1);
+  static final Ordering<List<Span>> TRACE_DESCENDING = Ordering.from(new Comparator<List<Span>>() {
+    @Override
+    public int compare(List<Span> left, List<Span> right) {
+      return right.get(0).compareTo(left.get(0));
+    }
+  });
 
   private final Client client;
   private final IndexNameFormatter indexNameFormatter;
@@ -98,11 +106,12 @@ public ElasticsearchSpanStore(ElasticsearchConfig config) {
     checkForIndexTemplate();
   }
 
-  @Override public ListenableFuture<Void> accept(List<Span> spans) {
-    return spanConsumer.accept(spans);
+  @Override public void accept(List<Span> spans, Callback<Void> callback) {
+    spanConsumer.accept(spans, callback);
   }
 
-  @Override public ListenableFuture<List<List<Span>>> getTraces(QueryRequest request) {
+  @Override
+  public void getTraces(QueryRequest request, final Callback<List<List<Span>>> callback) {
     long endMillis = request.endTs;
     long beginMillis = endMillis - request.lookback;
 
@@ -142,7 +151,6 @@ public ElasticsearchSpanStore(ElasticsearchConfig config) {
       filter.must(durationQuery);
     }
 
-
     List<String> strings = computeIndices(beginMillis, endMillis);
     final String[] indices = strings.toArray(new String[strings.size()]);
     // We need to filter to traces that contain at least one span that matches the request,
@@ -169,61 +177,60 @@ public ElasticsearchSpanStore(ElasticsearchConfig config) {
                         .script(new Script("_count > 0", ScriptType.INLINE, "expression", null)))
                     .order(Order.aggregation("timestamps_agg", false))
                     .size(request.limit));
-    return Futures.transformAsync(
-        ElasticListenableFuture.of(elasticRequest.execute()),
-        new AsyncFunction<SearchResponse, List<List<Span>>>() {
-          @Override public ListenableFuture<List<List<Span>>> apply(SearchResponse response)
-              throws Exception {
-            return convertTraceAggregationResponse(response, indices);
-          }
-        });
+
+    ListenableFuture<SearchResponse> traceIds = toGuava(elasticRequest.execute());
+
+    ListenableFuture<List<List<Span>>> traces =
+        transform(traceIds, new AsyncFunction<SearchResponse, List<List<Span>>>() {
+              @Override public ListenableFuture<List<List<Span>>> apply(SearchResponse input) {
+                if (input.getAggregations() == null
+                    || input.getAggregations().get("traceId_agg") == null) {
+                  return Futures.immediateFuture(Collections.<List<Span>>emptyList());
+                }
+                Terms traceIdsAgg = input.getAggregations().get("traceId_agg");
+                List<Long> traceIds = new ArrayList<>();
+                for (Terms.Bucket bucket : traceIdsAgg.getBuckets()) {
+                  traceIds.add(Util.lowerHexToUnsignedLong(bucket.getKeyAsString()));
+                }
+                return getTracesByIds(traceIds, indices);
+              }
+            }
+        );
+    onComplete(traces, callback, Functions.<List<List<Span>>>identity());
   }
 
-  private ListenableFuture<List<List<Span>>> convertTraceAggregationResponse(
-      SearchResponse response, String[] indices) {
-    if (response.getAggregations() == null) {
-      return immediateFuture(Collections.<List<Span>>emptyList());
-    }
-    Terms traceIdsAgg = response.getAggregations().get("traceId_agg");
-    if (traceIdsAgg == null) {
-      return immediateFuture(Collections.<List<Span>>emptyList());
-    }
-    List<Long> traceIds = new ArrayList<>();
-    for (Terms.Bucket bucket : traceIdsAgg.getBuckets()) {
-      traceIds.add(Util.lowerHexToUnsignedLong(bucket.getKeyAsString()));
-    }
-    return getTracesByIds(traceIds, indices);
+  @Override public void getTrace(long traceId, final Callback<List<Span>> callback) {
+    onComplete(getRawTrace(traceId), callback, new Function<List<Span>, List<Span>>() {
+      @Override public List<Span> apply(List<Span> input) {
+        return input == null ? null : CorrectForClockSkew.apply(MergeById.apply(input));
+      }
+    });
   }
 
-  @Override public ListenableFuture<List<Span>> getTrace(long id) {
-    return Futures.transform(
-        getTracesByIds(ImmutableList.of(id), indexNameFormatter.catchAll()),
-        new Function<List<List<Span>>, List<Span>>() {
-          @Override public List<Span> apply(List<List<Span>> traces) {
-            return Iterables.getFirst(traces, null);
-          }
-        });
+  @Override public void getRawTrace(long traceId, final Callback<List<Span>> callback) {
+    onComplete(getRawTrace(traceId), callback, Functions.<List<Span>>identity());
   }
 
-  @Override public ListenableFuture<List<Span>> getRawTrace(long traceId) {
+  ListenableFuture<List<Span>> getRawTrace(long traceId) {
     SearchRequestBuilder elasticRequest = client.prepareSearch(indexNameFormatter.catchAll())
         .setTypes(ElasticsearchConstants.SPAN)
-      .setQuery(termQuery("traceId",String.format("%016x", traceId)));
-    return Futures.transform(
-        ElasticListenableFuture.of(elasticRequest.execute()),
-        new Function<SearchResponse, List<Span>>() {
-          @Override public List<Span> apply(SearchResponse response) {
-            ImmutableList.Builder < Span > trace = ImmutableList.builder();
-            for (SearchHit hit : response.getHits()) {
-              trace.add(Codec.JSON.readSpan(hit.getSourceRef().toBytes()));
-            }
-            return trace.build();
-          }
-        });
+        .setQuery(termQuery("traceId", String.format("%016x", traceId)));
+
+    return transform(toGuava(elasticRequest.execute()), new Function<SearchResponse, List<Span>>() {
+      @Override public List<Span> apply(SearchResponse response) {
+        if (response.getHits().totalHits() == 0) {
+          return null;
+        }
+        ImmutableList.Builder<Span> trace = ImmutableList.builder();
+        for (SearchHit hit : response.getHits()) {
+          trace.add(Codec.JSON.readSpan(hit.getSourceRef().toBytes()));
+        }
+        return trace.build();
+      }
+    });
   }
 
-  private ListenableFuture<List<List<Span>>> getTracesByIds(
-      final Collection<Long> traceIds, String... indices) {
+  ListenableFuture<List<List<Span>>> getTracesByIds(Collection<Long> traceIds, String[] indices) {
     List<String> traceIdsStr = new ArrayList<>(traceIds.size());
     for (long traceId : traceIds) {
       traceIdsStr.add(String.format("%016x", traceId));
@@ -235,43 +242,30 @@ private ListenableFuture<List<List<Span>>> getTracesByIds(
         // Need to determine whether this is enough by zipkin standards or should
         // increase it in the index template.
         .setSize(10000)
-        .setQuery(termsQuery("traceId", traceIdsStr))
-        .addSort(SortBuilders.fieldSort("timestamp")
-            .order(SortOrder.ASC)
-            .unmappedType("long"));
-    return Futures.transform(
-        ElasticListenableFuture.of(elasticRequest.execute()),
-        new Function<SearchResponse, List<List<Span>>>() {
-          @Override public List<List<Span>> apply(SearchResponse response) {
-            return convertTracesResponse(traceIds, response);
-          }
-        });
+        .setQuery(termsQuery("traceId", traceIdsStr));
+    return Futures.transform(toGuava(elasticRequest.execute()), ConvertTracesResponse.INSTANCE);
   }
 
-  private List<List<Span>> convertTracesResponse(
-      Collection<Long> traceIds, SearchResponse response) {
-    ArrayListMultimap<Long, Span> groupedSpans = ArrayListMultimap.create();
-    for (SearchHit hit : response.getHits()) {
-      Span span = Codec.JSON.readSpan(hit.getSourceRef().toBytes());
-      groupedSpans.put(span.traceId, span);
-    }
-    ImmutableList.Builder<List<Span>> traces = ImmutableList.builder();
-    // We want to return traces in the same order as the parameter.
-    for (Long traceId : traceIds) {
-      if (!groupedSpans.containsKey(traceId)) {
-        // Couldn't find the trace, this will usually only happen when called from getTrace, not
-        // getTraces.
-        continue;
+  enum ConvertTracesResponse implements Function<SearchResponse, List<List<Span>>> {
+    INSTANCE;
+
+    @Override public List<List<Span>> apply(SearchResponse response) {
+
+      ArrayListMultimap<Long, Span> groupedSpans = ArrayListMultimap.create();
+      for (SearchHit hit : response.getHits()) {
+        Span span = Codec.JSON.readSpan(hit.getSourceRef().toBytes());
+        groupedSpans.put(span.traceId, span);
+      }
+      List<List<Span>> result = new ArrayList<>(groupedSpans.size());
+      for (Long traceId : groupedSpans.keySet()) {
+        result.add(CorrectForClockSkew.apply(MergeById.apply(groupedSpans.get(traceId))));
       }
-      traces.add(
-          ImmutableList.copyOf(
-              CorrectForClockSkew.apply(MergeById.apply(groupedSpans.get(traceId)))));
+      return TRACE_DESCENDING.immutableSortedCopy(result);
     }
-    return traces.build();
   }
 
   @Override
-  public ListenableFuture<List<String>> getServiceNames() {
+  public void getServiceNames(final Callback<List<String>> callback) {
     SearchRequestBuilder elasticRequest =
         client.prepareSearch(indexNameFormatter.catchAll())
             .setTypes(ElasticsearchConstants.SPAN)
@@ -283,47 +277,47 @@ public ListenableFuture<List<String>> getServiceNames() {
                 .path("binaryAnnotations")
                 .subAggregation(AggregationBuilders.terms("binaryAnnotationsServiceName_agg")
                     .field("binaryAnnotations.endpoint.serviceName")));
-    return Futures.transform(
-        ElasticListenableFuture.of(elasticRequest.execute()),
-        new Function<SearchResponse, List<String>>() {
-          @Override public List<String> apply(SearchResponse response) {
-            return convertServiceNamesResponse(response);
-          }
-        });
+
+    onComplete(elasticRequest.execute(), callback, ConvertServiceNamesResponse.INSTANCE);
   }
 
-  private List<String> convertServiceNamesResponse(SearchResponse response) {
-    if (response.getAggregations() == null) {
-      return Collections.emptyList();
-    }
-    SortedSet<String> serviceNames = new TreeSet<>();
-    Terms annotationServiceNamesAgg = response.getAggregations().get("annotationServiceName_agg");
-    if (annotationServiceNamesAgg != null) {
-      for (Terms.Bucket bucket : annotationServiceNamesAgg.getBuckets()) {
-        if (!bucket.getKeyAsString().isEmpty()) {
-          serviceNames.add(bucket.getKeyAsString());
-        }
+  enum ConvertServiceNamesResponse implements Function<SearchResponse, List<String>> {
+    INSTANCE;
+
+    @Override public List<String> apply(SearchResponse response) {
+      if (response.getAggregations() == null) {
+        return Collections.emptyList();
       }
-    }
-    Nested binaryAnnotationsAgg = response.getAggregations().get("binaryAnnotations_agg");
-    if (binaryAnnotationsAgg != null && binaryAnnotationsAgg.getAggregations() != null) {
-      Terms binaryAnnotationServiceNamesAgg = binaryAnnotationsAgg.getAggregations()
-          .get("binaryAnnotationsServiceName_agg");
-      if (binaryAnnotationServiceNamesAgg != null) {
-        for (Terms.Bucket bucket : binaryAnnotationServiceNamesAgg.getBuckets()) {
+      SortedSet<String> serviceNames = new TreeSet<>();
+      Terms annotationServiceNamesAgg = response.getAggregations().get("annotationServiceName_agg");
+      if (annotationServiceNamesAgg != null) {
+        for (Terms.Bucket bucket : annotationServiceNamesAgg.getBuckets()) {
           if (!bucket.getKeyAsString().isEmpty()) {
             serviceNames.add(bucket.getKeyAsString());
           }
         }
       }
+      Nested binaryAnnotationsAgg = response.getAggregations().get("binaryAnnotations_agg");
+      if (binaryAnnotationsAgg != null && binaryAnnotationsAgg.getAggregations() != null) {
+        Terms binaryAnnotationServiceNamesAgg = binaryAnnotationsAgg.getAggregations()
+            .get("binaryAnnotationsServiceName_agg");
+        if (binaryAnnotationServiceNamesAgg != null) {
+          for (Terms.Bucket bucket : binaryAnnotationServiceNamesAgg.getBuckets()) {
+            if (!bucket.getKeyAsString().isEmpty()) {
+              serviceNames.add(bucket.getKeyAsString());
+            }
+          }
+        }
+      }
+      return ImmutableList.copyOf(serviceNames);
     }
-    return ImmutableList.copyOf(serviceNames);
   }
 
   @Override
-  public ListenableFuture<List<String>> getSpanNames(String serviceName) {
+  public void getSpanNames(String serviceName, final Callback<List<String>> callback) {
     if (Strings.isNullOrEmpty(serviceName)) {
-      return immediateFuture(Collections.<String>emptyList());
+      callback.onSuccess(Collections.<String>emptyList());
+      return;
     }
     serviceName = serviceName.toLowerCase();
     QueryBuilder filter = boolQuery()
@@ -336,30 +330,28 @@ public ListenableFuture<List<String>> getSpanNames(String serviceName) {
         .addAggregation(AggregationBuilders.terms("name_agg")
             .order(Order.term(true))
             .field("name"));
-    return Futures.transform(
-        ElasticListenableFuture.of(elasticRequest.execute()),
-        new Function<SearchResponse, List<String>>() {
-          @Override public List<String> apply(SearchResponse response) {
-            return convertSpanNameResponse(response);
-          }
-        });
+
+    onComplete(elasticRequest.execute(), callback, ConvertSpanNameResponse.INSTANCE);
   }
 
-  private List<String> convertSpanNameResponse(SearchResponse response) {
-    Terms namesAgg = response.getAggregations().get("name_agg");
-    if (namesAgg == null) {
-      return Collections.emptyList();
-    }
-    ImmutableList.Builder<String> spanNames = ImmutableList.builder();
-    for (Terms.Bucket bucket : namesAgg.getBuckets()) {
-      spanNames.add(bucket.getKeyAsString());
+  enum ConvertSpanNameResponse implements Function<SearchResponse, List<String>> {
+    INSTANCE;
+
+    @Override public List<String> apply(SearchResponse response) {
+      Terms namesAgg = response.getAggregations().get("name_agg");
+      if (namesAgg == null) {
+        return Collections.emptyList();
+      }
+      ImmutableList.Builder<String> spanNames = ImmutableList.builder();
+      for (Terms.Bucket bucket : namesAgg.getBuckets()) {
+        spanNames.add(bucket.getKeyAsString());
+      }
+      return spanNames.build();
     }
-    return spanNames.build();
   }
 
-  @Override
-  public ListenableFuture<List<DependencyLink>> getDependencies(
-      long endMillis, @Nullable Long lookback) {
+  @Override public void getDependencies(long endMillis, @Nullable Long lookback,
+      final Callback<List<DependencyLink>> callback) {
     long beginMillis = lookback != null ? endMillis - lookback : 0;
     // We just return all dependencies in the days that fall within endTs and lookback as
     // dependency links themselves don't have timestamps.
@@ -375,34 +367,33 @@ public ListenableFuture<List<DependencyLink>> getDependencies(
             .subAggregation(AggregationBuilders.sum("callCount_agg")
                 .field("callCount")))
         .setQuery(matchAllQuery());
-    return Futures.transform(
-        ElasticListenableFuture.of(elasticRequest.execute()),
-        new Function<SearchResponse, List<DependencyLink>>() {
-          @Override public List<DependencyLink> apply(SearchResponse response) {
-            return convertDependenciesResponse(response);
-          }
-        });
+
+    onComplete(elasticRequest.execute(), callback, ConvertDependenciesResponse.INSTANCE);
   }
 
-  private List<DependencyLink> convertDependenciesResponse(SearchResponse response) {
-    if (response.getAggregations() == null) {
-      return Collections.emptyList();
-    }
-    Terms parentChildAgg = response.getAggregations().get("parent_child_agg");
-    if (parentChildAgg == null) {
-      return Collections.emptyList();
-    }
-    ImmutableList.Builder<DependencyLink> links = ImmutableList.builder();
-    for (Terms.Bucket bucket : parentChildAgg.getBuckets()) {
-      TopHits hitsAgg = bucket.getAggregations().get("hits_agg");
-      Sum callCountAgg = bucket.getAggregations().get("callCount_agg");
-      // We would have no bucket if there wasn't a hit, so this should always be non-empty.
-      SearchHit hit = hitsAgg.getHits().getAt(0);
-      DependencyLink link = Codec.JSON.readDependencyLink(hit.getSourceRef().toBytes());
-      link = new DependencyLink.Builder(link).callCount((long) callCountAgg.getValue()).build();
-      links.add(link);
+  enum ConvertDependenciesResponse implements Function<SearchResponse, List<DependencyLink>> {
+    INSTANCE;
+
+    @Override public List<DependencyLink> apply(SearchResponse response) {
+      if (response.getAggregations() == null) {
+        return Collections.emptyList();
+      }
+      Terms parentChildAgg = response.getAggregations().get("parent_child_agg");
+      if (parentChildAgg == null) {
+        return Collections.emptyList();
+      }
+      ImmutableList.Builder<DependencyLink> links = ImmutableList.builder();
+      for (Terms.Bucket bucket : parentChildAgg.getBuckets()) {
+        TopHits hitsAgg = bucket.getAggregations().get("hits_agg");
+        Sum callCountAgg = bucket.getAggregations().get("callCount_agg");
+        // We would have no bucket if there wasn't a hit, so this should always be non-empty.
+        SearchHit hit = hitsAgg.getHits().getAt(0);
+        DependencyLink link = Codec.JSON.readDependencyLink(hit.getSourceRef().toBytes());
+        link = new DependencyLink.Builder(link).callCount((long) callCountAgg.getValue()).build();
+        links.add(link);
+      }
+      return links.build();
     }
-    return links.build();
   }
 
   @VisibleForTesting void clear() {
@@ -450,8 +441,7 @@ private void checkForIndexTemplate() {
       return;
     }
     client.admin().indices().putTemplate(
-        new PutIndexTemplateRequest("zipkin_template").source(indexTemplate))
-        .actionGet();
+        new PutIndexTemplateRequest("zipkin_template").source(indexTemplate)).actionGet();
   }
 
   private static Client createClient(List<String> hosts, String clusterName) {
diff --git a/zipkin-spanstores/elasticsearch/src/test/java/zipkin/elasticsearch/ElasticsearchDependenciesTest.java b/zipkin-spanstores/elasticsearch/src/test/java/zipkin/elasticsearch/ElasticsearchDependenciesTest.java
index cee0ebfda98..b8aa211f5ea 100755
--- a/zipkin-spanstores/elasticsearch/src/test/java/zipkin/elasticsearch/ElasticsearchDependenciesTest.java
+++ b/zipkin-spanstores/elasticsearch/src/test/java/zipkin/elasticsearch/ElasticsearchDependenciesTest.java
@@ -20,14 +20,14 @@
 import zipkin.InMemorySpanStore;
 import zipkin.Span;
 import zipkin.SpanStore;
-import zipkin.spanstore.guava.BlockingGuavaSpanStore;
+import zipkin.async.BlockingSpanStoreAdapter;
 
 import static zipkin.internal.Util.midnightUTC;
 
 public class ElasticsearchDependenciesTest extends DependenciesTest<SpanStore> {
 
   public ElasticsearchDependenciesTest() {
-    this.store = new BlockingGuavaSpanStore(ElasticsearchTestGraph.INSTANCE.spanStore());
+    this.store = new BlockingSpanStoreAdapter(ElasticsearchTestGraph.INSTANCE.spanStore());
   }
 
   @Override
diff --git a/zipkin-spanstores/elasticsearch/src/test/java/zipkin/elasticsearch/ElasticsearchSpanStoreTest.java b/zipkin-spanstores/elasticsearch/src/test/java/zipkin/elasticsearch/ElasticsearchSpanStoreTest.java
index c44572d2bba..b09c7a3196f 100644
--- a/zipkin-spanstores/elasticsearch/src/test/java/zipkin/elasticsearch/ElasticsearchSpanStoreTest.java
+++ b/zipkin-spanstores/elasticsearch/src/test/java/zipkin/elasticsearch/ElasticsearchSpanStoreTest.java
@@ -15,12 +15,12 @@
 
 import zipkin.SpanStore;
 import zipkin.SpanStoreTest;
-import zipkin.spanstore.guava.BlockingGuavaSpanStore;
+import zipkin.async.BlockingSpanStoreAdapter;
 
 public class ElasticsearchSpanStoreTest extends SpanStoreTest<SpanStore> {
 
   public ElasticsearchSpanStoreTest() {
-    this.store = new BlockingGuavaSpanStore(ElasticsearchTestGraph.INSTANCE.spanStore());
+    this.store = new BlockingSpanStoreAdapter(ElasticsearchTestGraph.INSTANCE.spanStore());
   }
 
   @Override
diff --git a/zipkin-spanstores/elasticsearch/src/test/java/zipkin/elasticsearch/ElasticsearchTestGraph.java b/zipkin-spanstores/elasticsearch/src/test/java/zipkin/elasticsearch/ElasticsearchTestGraph.java
index 342c9dbdeca..52151792214 100755
--- a/zipkin-spanstores/elasticsearch/src/test/java/zipkin/elasticsearch/ElasticsearchTestGraph.java
+++ b/zipkin-spanstores/elasticsearch/src/test/java/zipkin/elasticsearch/ElasticsearchTestGraph.java
@@ -15,7 +15,7 @@
 
 import org.elasticsearch.client.transport.NoNodeAvailableException;
 import org.junit.AssumptionViolatedException;
-import zipkin.spanstore.guava.BlockingGuavaSpanStore;
+import zipkin.async.BlockingSpanStoreAdapter;
 
 enum ElasticsearchTestGraph {
   INSTANCE;
@@ -23,7 +23,7 @@ enum ElasticsearchTestGraph {
   static final ElasticsearchConfig CONFIG = new ElasticsearchConfig.Builder().build();
 
   static {
-    BlockingGuavaSpanStore.BLOCK_ON_ACCEPT = true;
+    BlockingSpanStoreAdapter.BLOCK_ON_ACCEPT = true;
     ElasticsearchSpanConsumer.FLUSH_ON_WRITES = true;
   }
 
diff --git a/zipkin-spanstores/guava/src/main/java/zipkin/spanstore/guava/AsyncGuavaSpanStoreAdapter.java b/zipkin-spanstores/guava/src/main/java/zipkin/spanstore/guava/AsyncGuavaSpanStoreAdapter.java
new file mode 100644
index 00000000000..156e266fe2f
--- /dev/null
+++ b/zipkin-spanstores/guava/src/main/java/zipkin/spanstore/guava/AsyncGuavaSpanStoreAdapter.java
@@ -0,0 +1,90 @@
+/**
+ * Copyright 2015-2016 The OpenZipkin Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
+ * in compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package zipkin.spanstore.guava;
+
+import com.google.common.util.concurrent.AbstractFuture;
+import com.google.common.util.concurrent.ListenableFuture;
+import java.util.List;
+import zipkin.DependencyLink;
+import zipkin.QueryRequest;
+import zipkin.Span;
+import zipkin.async.AsyncSpanStore;
+import zipkin.async.Callback;
+import zipkin.internal.Nullable;
+
+/**
+ * A {@link GuavaSpanStore} derived from an {@link AsyncSpanStore}. Used by callers who prefer to
+ * compose futures.
+ */
+public final class AsyncGuavaSpanStoreAdapter implements GuavaSpanStore {
+
+  private final AsyncSpanStore delegate;
+
+  public AsyncGuavaSpanStoreAdapter(AsyncSpanStore delegate) {
+    this.delegate = delegate;
+  }
+
+  @Override public ListenableFuture<Void> accept(List<Span> spans) {
+    CallbackListenableFuture<Void> result = new CallbackListenableFuture<>();
+    delegate.accept(spans, result);
+    return result;
+  }
+
+  @Override public ListenableFuture<List<List<Span>>> getTraces(QueryRequest request) {
+    CallbackListenableFuture<List<List<Span>>> result = new CallbackListenableFuture<>();
+    delegate.getTraces(request, result);
+    return result;
+  }
+
+  @Override public ListenableFuture<List<Span>> getTrace(long id) {
+    CallbackListenableFuture<List<Span>> result = new CallbackListenableFuture<>();
+    delegate.getTrace(id, result);
+    return result;
+  }
+
+  @Override public ListenableFuture<List<Span>> getRawTrace(long traceId) {
+    CallbackListenableFuture<List<Span>> result = new CallbackListenableFuture<>();
+    delegate.getRawTrace(traceId, result);
+    return result;
+  }
+
+  @Override public ListenableFuture<List<String>> getServiceNames() {
+    CallbackListenableFuture<List<String>> result = new CallbackListenableFuture<>();
+    delegate.getServiceNames(result);
+    return result;
+  }
+
+  @Override public ListenableFuture<List<String>> getSpanNames(String serviceName) {
+    CallbackListenableFuture<List<String>> result = new CallbackListenableFuture<>();
+    delegate.getSpanNames(serviceName, result);
+    return result;
+  }
+
+  @Override public ListenableFuture<List<DependencyLink>> getDependencies(long endTs,
+      @Nullable Long lookback) {
+    CallbackListenableFuture<List<DependencyLink>> result = new CallbackListenableFuture<>();
+    delegate.getDependencies(endTs, lookback, result);
+    return result;
+  }
+
+  static final class CallbackListenableFuture<V> extends AbstractFuture<V> implements Callback<V> {
+    @Override public void onSuccess(@Nullable V value) {
+      set(value);
+    }
+
+    @Override public void onError(Throwable t) {
+      setException(t);
+    }
+  }
+}
diff --git a/zipkin-spanstores/guava/src/main/java/zipkin/spanstore/guava/BlockingGuavaSpanStore.java b/zipkin-spanstores/guava/src/main/java/zipkin/spanstore/guava/BlockingGuavaSpanStore.java
deleted file mode 100755
index b4700f79171..00000000000
--- a/zipkin-spanstores/guava/src/main/java/zipkin/spanstore/guava/BlockingGuavaSpanStore.java
+++ /dev/null
@@ -1,87 +0,0 @@
-/**
- * Copyright 2015-2016 The OpenZipkin Authors
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
- * in compliance with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License
- * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
- * or implied. See the License for the specific language governing permissions and limitations under
- * the License.
- */
-package zipkin.spanstore.guava;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.util.concurrent.ListenableFuture;
-import java.util.List;
-import zipkin.DependencyLink;
-import zipkin.QueryRequest;
-import zipkin.Span;
-import zipkin.SpanStore;
-import zipkin.internal.Nullable;
-
-import static com.google.common.util.concurrent.Futures.getUnchecked;
-
-/**
- * A {@link SpanStore} implementation that can take a {@link GuavaSpanStore} and call its methods
- * with blocking, for use in callers that need a normal {@link SpanStore}.
- */
-public class BlockingGuavaSpanStore implements SpanStore {
-  /**
-   * Internal flag that allows you read-your-writes consistency during tests.
-   *
-   * <p>This is internal as collection endpoints are usually in different threads or not in the same
-   * process as query ones. Special-casing this allows tests to pass without changing {@link
-   * GuavaSpanConsumer#accept}.
-   *
-   * <p>Why not just change {@link GuavaSpanConsumer#accept} now? {@link GuavaSpanConsumer#accept}
-   * may indeed need to change, but when that occurs, we'd want to choose something that is widely
-   * supportable, and serving a specific use case. That api might not be a future, for example.
-   * Future is difficult, for example, properly supporting and testing cancel. Further, there are
-   * other async models such as callbacks that could be more supportable. Regardless, this work is
-   * best delayed until there's a worthwhile use-case vs up-fronting only due to tests, and
-   * prematurely choosing Future results.
-   */
-  @VisibleForTesting
-  public static boolean BLOCK_ON_ACCEPT;
-
-  private final GuavaSpanStore delegate;
-
-  public BlockingGuavaSpanStore(GuavaSpanStore delegate) {
-    this.delegate = delegate;
-  }
-
-  // Only method that does not actually block even in synchronous spanstores.
-  @Override public void accept(List<Span> spans) {
-    ListenableFuture<Void> future = delegate.accept(spans);
-    if (BLOCK_ON_ACCEPT) {
-      getUnchecked(future);
-    }
-  }
-
-  @Override public List<List<Span>> getTraces(QueryRequest request) {
-    return getUnchecked(delegate.getTraces(request));
-  }
-
-  @Override public List<Span> getTrace(long id) {
-    return getUnchecked(delegate.getTrace(id));
-  }
-
-  @Override public List<Span> getRawTrace(long traceId) {
-    return getUnchecked(delegate.getRawTrace(traceId));
-  }
-
-  @Override public List<String> getServiceNames() {
-    return getUnchecked(delegate.getServiceNames());
-  }
-
-  @Override public List<String> getSpanNames(String serviceName) {
-    return getUnchecked(delegate.getSpanNames(serviceName));
-  }
-
-  @Override public List<DependencyLink> getDependencies(long endTs, @Nullable Long lookback) {
-    return getUnchecked(delegate.getDependencies(endTs, lookback));
-  }
-}
diff --git a/zipkin-spanstores/guava/src/test/java/zipkin/spanstore/guava/AsyncGuavaSpanStoreAdapterTest.java b/zipkin-spanstores/guava/src/test/java/zipkin/spanstore/guava/AsyncGuavaSpanStoreAdapterTest.java
new file mode 100755
index 00000000000..f37b94369f0
--- /dev/null
+++ b/zipkin-spanstores/guava/src/test/java/zipkin/spanstore/guava/AsyncGuavaSpanStoreAdapterTest.java
@@ -0,0 +1,241 @@
+/**
+ * Copyright 2015-2016 The OpenZipkin Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
+ * in compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package zipkin.spanstore.guava;
+
+import com.google.common.collect.ImmutableList;
+import java.util.List;
+import java.util.concurrent.ExecutionException;
+import java.util.function.Consumer;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.mockito.Mock;
+import org.mockito.junit.MockitoJUnit;
+import org.mockito.junit.MockitoRule;
+import org.mockito.stubbing.Answer;
+import zipkin.Annotation;
+import zipkin.BinaryAnnotation;
+import zipkin.DependencyLink;
+import zipkin.Endpoint;
+import zipkin.QueryRequest;
+import zipkin.Span;
+import zipkin.async.AsyncSpanStore;
+import zipkin.async.Callback;
+
+import static java.util.Arrays.asList;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.hamcrest.core.Is.isA;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.doAnswer;
+
+public class AsyncGuavaSpanStoreAdapterTest {
+
+  long spanId = 456;
+  long today = System.currentTimeMillis();
+  Endpoint ep = Endpoint.create("service", 127 << 24 | 1, 8080);
+
+  Annotation ann1 = Annotation.create((today + 1) * 1000, "cs", ep);
+  Annotation ann2 = Annotation.create((today + 2) * 1000, "sr", null);
+  Annotation ann3 = Annotation.create((today + 10) * 1000, "custom", ep);
+  Annotation ann4 = Annotation.create((today + 20) * 1000, "custom", ep);
+  Annotation ann5 = Annotation.create((today + 5) * 1000, "custom", ep);
+  Annotation ann6 = Annotation.create((today + 6) * 1000, "custom", ep);
+  Annotation ann7 = Annotation.create((today + 7) * 1000, "custom", ep);
+  Annotation ann8 = Annotation.create((today + 8) * 1000, "custom", ep);
+
+  Span span1 = new Span.Builder()
+      .traceId(123)
+      .name("methodcall")
+      .id(spanId)
+      .timestamp(ann1.timestamp).duration(9000L)
+      .annotations(asList(ann1, ann3))
+      .addBinaryAnnotation(BinaryAnnotation.create("BAH", "BEH", ep)).build();
+
+  Span span2 = new Span.Builder()
+      .traceId(456)
+      .name("methodcall")
+      .id(spanId)
+      .timestamp(ann2.timestamp)
+      .addAnnotation(ann2)
+      .addBinaryAnnotation(BinaryAnnotation.create("BAH2", "BEH2", ep)).build();
+
+  Span span3 = new Span.Builder()
+      .traceId(789)
+      .name("methodcall")
+      .id(spanId)
+      .timestamp(ann2.timestamp).duration(18000L)
+      .annotations(asList(ann2, ann3, ann4))
+      .addBinaryAnnotation(BinaryAnnotation.create("BAH2", "BEH2", ep)).build();
+
+  Span span4 = new Span.Builder()
+      .traceId(999)
+      .name("methodcall")
+      .id(spanId)
+      .timestamp(ann6.timestamp).duration(1000L)
+      .annotations(asList(ann6, ann7)).build();
+
+  Span span5 = new Span.Builder()
+      .traceId(999)
+      .name("methodcall")
+      .id(spanId)
+      .timestamp(ann5.timestamp).duration(3000L)
+      .annotations(asList(ann5, ann8))
+      .addBinaryAnnotation(BinaryAnnotation.create("BAH2", "BEH2", ep)).build();
+
+  List<Span> trace1 = ImmutableList.of(span1, span2, span3);
+
+  List<Span> trace2 = ImmutableList.of(span4, span5);
+
+  List<List<Span>> traces = ImmutableList.of(trace1, trace2);
+
+  List<DependencyLink> deps = ImmutableList.of(
+      new DependencyLink.Builder().parent("zipkin-web").child("zipkin-query").callCount(1).build(),
+      new DependencyLink.Builder().parent("zipkin-query").child("zipkin-foo").callCount(10).build()
+  );
+
+  @Rule
+  public MockitoRule mocks = MockitoJUnit.rule();
+
+  @Rule
+  public ExpectedException thrown = ExpectedException.none();
+
+  @Mock
+  private AsyncSpanStore delegate;
+
+  private AsyncGuavaSpanStoreAdapter spanStore;
+
+  @Before
+  public void setUp() throws Exception {
+    spanStore = new AsyncGuavaSpanStoreAdapter(delegate);
+  }
+
+  @Test
+  public void getTraces_success() throws Exception {
+    QueryRequest request = new QueryRequest.Builder("service").endTs(1000L).build();
+    doAnswer(answer(c -> c.onSuccess(traces)))
+        .when(delegate).getTraces(eq(request), any(Callback.class));
+
+    assertThat(spanStore.getTraces(request).get()).containsExactlyElementsOf(traces);
+  }
+
+  @Test
+  public void getTraces_exception() throws Exception {
+    QueryRequest request = new QueryRequest.Builder("service").endTs(1000L).build();
+    doAnswer(answer(c -> c.onError(new IllegalStateException("failed"))))
+        .when(delegate).getTraces(eq(request), any(Callback.class));
+
+    thrown.expect(ExecutionException.class);
+    thrown.expectCause(isA(IllegalStateException.class));
+    spanStore.getTraces(request).get();
+  }
+
+  @Test
+  public void getTrace_success() throws Exception {
+    doAnswer(answer(c -> c.onSuccess(trace1)))
+        .when(delegate).getTrace(eq(1L), any(Callback.class));
+
+    assertThat(spanStore.getTrace(1L).get()).containsExactlyElementsOf(trace1);
+  }
+
+  @Test
+  public void getTrace_exception() throws Exception {
+    doAnswer(answer(c -> c.onError(new IllegalStateException("failed"))))
+        .when(delegate).getTrace(eq(1L), any(Callback.class));
+
+    thrown.expect(ExecutionException.class);
+    thrown.expectCause(isA(IllegalStateException.class));
+    spanStore.getTrace(1L).get();
+  }
+
+  @Test
+  public void getRawTrace_success() throws Exception {
+    doAnswer(answer(c -> c.onSuccess(trace1)))
+        .when(delegate).getRawTrace(eq(1L), any(Callback.class));
+
+    assertThat(spanStore.getRawTrace(1L).get()).containsExactlyElementsOf(trace1);
+  }
+
+  @Test
+  public void getRawTrace_exception() throws Exception {
+    doAnswer(answer(c -> c.onError(new IllegalStateException("failed"))))
+        .when(delegate).getRawTrace(eq(1L), any(Callback.class));
+
+    thrown.expect(ExecutionException.class);
+    thrown.expectCause(isA(IllegalStateException.class));
+    spanStore.getRawTrace(1L).get();
+  }
+
+  @Test
+  public void getServiceNames_success() throws Exception {
+    doAnswer(answer(c -> c.onSuccess(asList("service1", "service2"))))
+        .when(delegate).getServiceNames(any(Callback.class));
+
+    assertThat(spanStore.getServiceNames().get()).containsExactly("service1", "service2");
+  }
+
+  @Test
+  public void getServiceNames_exception() throws Exception {
+    doAnswer(answer(c -> c.onError(new IllegalStateException("failed"))))
+        .when(delegate).getServiceNames(any(Callback.class));
+
+    thrown.expect(ExecutionException.class);
+    thrown.expectCause(isA(IllegalStateException.class));
+    spanStore.getServiceNames().get();
+  }
+
+  @Test
+  public void getSpanNames_success() throws Exception {
+    doAnswer(answer(c -> c.onSuccess(asList("span1", "span2"))))
+        .when(delegate).getSpanNames(eq("service"), any(Callback.class));
+
+    assertThat(spanStore.getSpanNames("service").get()).containsExactly("span1", "span2");
+  }
+
+  @Test
+  public void getSpanNames_exception() throws Exception {
+    doAnswer(answer(c -> c.onError(new IllegalStateException("failed"))))
+        .when(delegate).getSpanNames(eq("service"), any(Callback.class));
+
+    thrown.expect(ExecutionException.class);
+    thrown.expectCause(isA(IllegalStateException.class));
+    spanStore.getSpanNames("service").get();
+  }
+
+  @Test
+  public void getDependencies_success() throws Exception {
+    doAnswer(answer(c -> c.onSuccess(deps)))
+        .when(delegate).getDependencies(eq(1L), eq(0L), any(Callback.class));
+
+    assertThat(spanStore.getDependencies(1L, 0L).get()).containsExactlyElementsOf(deps);
+  }
+
+  @Test
+  public void getDependencies_exception() throws Exception {
+    doAnswer(answer(c -> c.onError(new IllegalStateException("failed"))))
+        .when(delegate).getDependencies(eq(1L), eq(0L), any(Callback.class));
+
+    thrown.expect(ExecutionException.class);
+    thrown.expectCause(isA(IllegalStateException.class));
+    spanStore.getDependencies(1L, 0L).get();
+  }
+
+  static <T> Answer answer(Consumer<Callback<T>> onCallback) {
+    return invocation -> {
+      onCallback.accept((Callback) invocation.getArguments()[invocation.getArguments().length - 1]);
+      return null;
+    };
+  }
+}
diff --git a/zipkin/pom.xml b/zipkin/pom.xml
index ed990d03d64..cba272c0804 100644
--- a/zipkin/pom.xml
+++ b/zipkin/pom.xml
@@ -35,6 +35,21 @@
       <groupId>com.squareup.moshi</groupId>
       <artifactId>moshi</artifactId>
     </dependency>
+
+    <dependency>
+      <groupId>org.mockito</groupId>
+      <artifactId>mockito-core</artifactId>
+      <version>${mockito.version}</version>
+      <scope>test</scope>
+    </dependency>
+
+    <!-- To avoid java.lang.NoClassDefFoundError: StacktracePrintingMatcher -->
+    <dependency>
+      <groupId>org.hamcrest</groupId>
+      <artifactId>hamcrest-core</artifactId>
+      <version>1.3</version>
+      <scope>test</scope>
+    </dependency>
   </dependencies>
 
   <build>
diff --git a/zipkin/src/main/java/zipkin/async/AsyncSpanConsumer.java b/zipkin/src/main/java/zipkin/async/AsyncSpanConsumer.java
new file mode 100644
index 00000000000..79ef935dc5e
--- /dev/null
+++ b/zipkin/src/main/java/zipkin/async/AsyncSpanConsumer.java
@@ -0,0 +1,35 @@
+/**
+ * Copyright 2015-2016 The OpenZipkin Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
+ * in compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package zipkin.async;
+
+import java.util.List;
+import zipkin.Span;
+
+/**
+ * An interface that is equivalent to {@link zipkin.SpanConsumer} but accepts a {@link
+ * Callback<Void>} to allow bridging to async libraries.
+ *
+ * <p>Note: This is not considered a user-level Api, rather an Spi that can be used to bind
+ * user-level abstractions such as futures or observables.
+ *
+ * @see zipkin.SpanConsumer
+ */
+// @FunctionalInterface
+public interface AsyncSpanConsumer {
+
+  /**
+   * Version of {@link zipkin.SpanConsumer#accept} that accepts a {@link Callback<Void>}.
+   */
+  void accept(List<Span> spans, Callback<Void> callback);
+}
diff --git a/zipkin/src/main/java/zipkin/async/AsyncSpanStore.java b/zipkin/src/main/java/zipkin/async/AsyncSpanStore.java
new file mode 100644
index 00000000000..56068769d53
--- /dev/null
+++ b/zipkin/src/main/java/zipkin/async/AsyncSpanStore.java
@@ -0,0 +1,68 @@
+/**
+ * Copyright 2015-2016 The OpenZipkin Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
+ * in compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package zipkin.async;
+
+import java.util.List;
+import zipkin.DependencyLink;
+import zipkin.QueryRequest;
+import zipkin.Span;
+import zipkin.internal.Nullable;
+
+/**
+ * An interface that is equivalent to {@link zipkin.SpanStore} but accepts callbacks to allow
+ * bridging to async libraries.
+ *
+ * <p>Note: This is not considered a user-level Api, rather an Spi that can be used to bind
+ * user-level abstractions such as futures or observables.
+ *
+ * @see zipkin.SpanStore
+ */
+public interface AsyncSpanStore extends AsyncSpanConsumer {
+
+  /**
+   * Version of {@link zipkin.SpanStore#accept} that accepts {@link Callback<Void>}.
+   */
+  @Override void accept(List<Span> spans, Callback<Void> callback);
+
+  /**
+   * Version of {@link zipkin.SpanStore#getTraces} that accepts {@link Callback}.
+   */
+  void getTraces(QueryRequest request, Callback<List<List<Span>>> callback);
+
+  /**
+   * Version of {@link zipkin.SpanStore#getTrace} that accepts {@link Callback}.
+   */
+  void getTrace(long id, Callback<List<Span>> callback);
+
+  /**
+   * Version of {@link zipkin.SpanStore#getRawTrace} that accepts {@link Callback}.
+   */
+  void getRawTrace(long traceId, Callback<List<Span>> callback);
+
+  /**
+   * Version of {@link zipkin.SpanStore#getServiceNames} that accepts {@link Callback}.
+   */
+  void getServiceNames(Callback<List<String>> callback);
+
+  /**
+   * Version of {@link zipkin.SpanStore#getSpanNames} that accepts {@link Callback}.
+   */
+  void getSpanNames(String serviceName, Callback<List<String>> callback);
+
+  /**
+   * Version of {@link zipkin.SpanStore#getDependencies} that accepts {@link Callback}.
+   */
+  void getDependencies(long endTs, @Nullable Long lookback,
+      Callback<List<DependencyLink>> callback);
+}
diff --git a/zipkin/src/main/java/zipkin/async/BlockingSpanStoreAdapter.java b/zipkin/src/main/java/zipkin/async/BlockingSpanStoreAdapter.java
new file mode 100644
index 00000000000..7e07daf8bd9
--- /dev/null
+++ b/zipkin/src/main/java/zipkin/async/BlockingSpanStoreAdapter.java
@@ -0,0 +1,135 @@
+/**
+ * Copyright 2015-2016 The OpenZipkin Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
+ * in compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package zipkin.async;
+
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicReference;
+import zipkin.DependencyLink;
+import zipkin.QueryRequest;
+import zipkin.Span;
+import zipkin.SpanStore;
+import zipkin.internal.Nullable;
+
+/**
+ * A {@link SpanStore} implementation that can take a {@link AsyncSpanStore} and call its methods
+ * with blocking, for use in callers that need a normal {@link SpanStore}.
+ */
+public final class BlockingSpanStoreAdapter implements SpanStore {
+  /**
+   * Internal flag that allows you read-your-writes consistency during tests.
+   *
+   * <p>This is internal as collection endpoints are usually in different threads or not in the same
+   * process as query ones. Special-casing this allows tests to pass without changing {@link
+   * AsyncSpanConsumer#accept}.
+   */
+  public static boolean BLOCK_ON_ACCEPT;
+
+  private final AsyncSpanStore delegate;
+
+  public BlockingSpanStoreAdapter(AsyncSpanStore delegate) {
+    this.delegate = delegate;
+  }
+
+  // Only method that does not actually block even in synchronous spanstores.
+  @Override public void accept(List<Span> spans) {
+    CallbackCaptor<Void> captor = new CallbackCaptor<>();
+    delegate.accept(spans, captor);
+    if (BLOCK_ON_ACCEPT) {
+      captor.get();
+    }
+  }
+
+  @Override public List<List<Span>> getTraces(QueryRequest request) {
+    CallbackCaptor<List<List<Span>>> captor = new CallbackCaptor<>();
+    delegate.getTraces(request, captor);
+    return captor.get();
+  }
+
+  @Override public List<Span> getTrace(long id) {
+    CallbackCaptor<List<Span>> captor = new CallbackCaptor<>();
+    delegate.getTrace(id, captor);
+    return captor.get();
+  }
+
+  @Override public List<Span> getRawTrace(long traceId) {
+    CallbackCaptor<List<Span>> captor = new CallbackCaptor<>();
+    delegate.getRawTrace(traceId, captor);
+    return captor.get();
+  }
+
+  @Override public List<String> getServiceNames() {
+    CallbackCaptor<List<String>> captor = new CallbackCaptor<>();
+    delegate.getServiceNames(captor);
+    return captor.get();
+  }
+
+  @Override public List<String> getSpanNames(String serviceName) {
+    CallbackCaptor<List<String>> captor = new CallbackCaptor<>();
+    delegate.getSpanNames(serviceName, captor);
+    return captor.get();
+  }
+
+  @Override public List<DependencyLink> getDependencies(long endTs, @Nullable Long lookback) {
+    CallbackCaptor<List<DependencyLink>> captor = new CallbackCaptor<>();
+    delegate.getDependencies(endTs, lookback, captor);
+    return captor.get();
+  }
+
+  static final class CallbackCaptor<V> implements Callback<V> {
+    // countDown + ref as BlockingQueue forbids null
+    CountDownLatch countDown = new CountDownLatch(1);
+    AtomicReference<Object> ref = new AtomicReference<>();
+
+    /**
+     * Blocks until {@link Callback#onSuccess(Object)} or {@link Callback#onError(Throwable)}.
+     *
+     * <p>Returns the successful value if {@link Callback#onSuccess(Object)} was called. <p>Throws
+     * if {@link Callback#onError(Throwable)} was called.
+     */
+    @Nullable V get() {
+      boolean interrupted = false;
+      try {
+        while (true) {
+          try {
+            countDown.await();
+            Object result = ref.get();
+            if (result instanceof Throwable) {
+              if (result instanceof Error) throw (Error) result;
+              if (result instanceof RuntimeException) throw (RuntimeException) result;
+              throw new RuntimeException((Exception) result);
+            }
+            return (V) result;
+          } catch (InterruptedException e) {
+            interrupted = true;
+          }
+        }
+      } finally {
+        if (interrupted) {
+          Thread.currentThread().interrupt();
+        }
+      }
+    }
+
+    @Override public void onSuccess(@Nullable V value) {
+      ref.set(value);
+      countDown.countDown();
+    }
+
+    @Override public void onError(Throwable t) {
+      ref.set(t);
+      countDown.countDown();
+    }
+  }
+}
diff --git a/zipkin/src/main/java/zipkin/async/Callback.java b/zipkin/src/main/java/zipkin/async/Callback.java
new file mode 100644
index 00000000000..e5cbd432c32
--- /dev/null
+++ b/zipkin/src/main/java/zipkin/async/Callback.java
@@ -0,0 +1,40 @@
+/**
+ * Copyright 2015-2016 The OpenZipkin Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
+ * in compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package zipkin.async;
+
+import zipkin.internal.Nullable;
+
+/**
+ * A callback of a single result or error.
+ *
+ * <p>This is a bridge to async libraries such as CompletableFuture complete, completeExceptionally.
+ *
+ * <p>Implementations will call either {@link #onSuccess} or {@link #onError}, but not both.
+ */
+public interface Callback<V> {
+
+  /**
+   * Invoked when computation produces its potentially null value successfully.
+   *
+   * <p>When this is called, {@link #onError} won't be.
+   */
+  void onSuccess(@Nullable V value);
+
+  /**
+   * Invoked when computation produces a possibly null value successfully.
+   *
+   * <p>When this is called, {@link #onSuccess} won't be.
+   */
+  void onError(Throwable t);
+}
\ No newline at end of file
diff --git a/zipkin-spanstores/guava/src/test/java/zipkin/spanstore/guava/BlockingGuavaSpanStoreTest.java b/zipkin/src/test/java/zipkin/async/BlockingSpanStoreAdapterTest.java
old mode 100755
new mode 100644
similarity index 65%
rename from zipkin-spanstores/guava/src/test/java/zipkin/spanstore/guava/BlockingGuavaSpanStoreTest.java
rename to zipkin/src/test/java/zipkin/async/BlockingSpanStoreAdapterTest.java
index 80835618ffe..f5757df672a
--- a/zipkin-spanstores/guava/src/test/java/zipkin/spanstore/guava/BlockingGuavaSpanStoreTest.java
+++ b/zipkin/src/test/java/zipkin/async/BlockingSpanStoreAdapterTest.java
@@ -11,11 +11,10 @@
  * or implied. See the License for the specific language governing permissions and limitations under
  * the License.
  */
-package zipkin.spanstore.guava;
+package zipkin.async;
 
-import com.google.common.collect.ImmutableList;
-import java.util.Arrays;
 import java.util.List;
+import java.util.function.Consumer;
 import org.junit.Before;
 import org.junit.Rule;
 import org.junit.Test;
@@ -23,6 +22,7 @@
 import org.mockito.Mock;
 import org.mockito.junit.MockitoJUnit;
 import org.mockito.junit.MockitoRule;
+import org.mockito.stubbing.Answer;
 import zipkin.Annotation;
 import zipkin.BinaryAnnotation;
 import zipkin.DependencyLink;
@@ -30,12 +30,13 @@
 import zipkin.QueryRequest;
 import zipkin.Span;
 
-import static com.google.common.util.concurrent.Futures.immediateFuture;
 import static java.util.Arrays.asList;
 import static org.assertj.core.api.Assertions.assertThat;
-import static org.mockito.Mockito.when;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.doAnswer;
 
-public class BlockingGuavaSpanStoreTest {
+public class BlockingSpanStoreAdapterTest {
 
   long spanId = 456;
   long today = System.currentTimeMillis();
@@ -89,13 +90,13 @@ public class BlockingGuavaSpanStoreTest {
       .annotations(asList(ann5, ann8))
       .addBinaryAnnotation(BinaryAnnotation.create("BAH2", "BEH2", ep)).build();
 
-  List<Span> trace1 = ImmutableList.of(span1, span2, span3);
+  List<Span> trace1 = asList(span1, span2, span3);
 
-  List<Span> trace2 = ImmutableList.of(span4, span5);
+  List<Span> trace2 = asList(span4, span5);
 
-  List<List<Span>> traces = ImmutableList.of(trace1, trace2);
+  List<List<Span>> traces = asList(trace1, trace2);
 
-  List<DependencyLink> deps = ImmutableList.of(
+  List<DependencyLink> deps = asList(
       new DependencyLink.Builder().parent("zipkin-web").child("zipkin-query").callCount(1).build(),
       new DependencyLink.Builder().parent("zipkin-query").child("zipkin-foo").callCount(10).build()
   );
@@ -107,94 +108,124 @@ public class BlockingGuavaSpanStoreTest {
   public ExpectedException thrown = ExpectedException.none();
 
   @Mock
-  private GuavaSpanStore delegate;
+  private AsyncSpanStore delegate;
 
-  private BlockingGuavaSpanStore spanStore;
+  private BlockingSpanStoreAdapter spanStore;
 
   @Before
   public void setUp() {
-    spanStore = new BlockingGuavaSpanStore(delegate);
+    spanStore = new BlockingSpanStoreAdapter(delegate);
   }
 
   @Test
   public void getTraces_success() {
     QueryRequest request = new QueryRequest.Builder("service").endTs(1000L).build();
-    when(delegate.getTraces(request)).thenReturn(immediateFuture(traces));
+    doAnswer(answer(c -> c.onSuccess(traces)))
+        .when(delegate).getTraces(eq(request), any(Callback.class));
+
     assertThat(spanStore.getTraces(request)).containsExactlyElementsOf(traces);
   }
 
+
   @Test
   public void getTraces_exception() {
     QueryRequest request = new QueryRequest.Builder("service").endTs(1000L).build();
-    when(delegate.getTraces(request)).thenThrow(new IllegalStateException("failed"));
-    thrown.expect(IllegalStateException.class);;
+    doAnswer(answer(c -> c.onError(new IllegalStateException("failed"))))
+        .when(delegate).getTraces(eq(request), any(Callback.class));
+
+    thrown.expect(IllegalStateException.class);
     spanStore.getTraces(request);
   }
 
   @Test
   public void getTrace_success() {
-    when(delegate.getTrace(1L)).thenReturn(immediateFuture(trace1));
+    doAnswer(answer(c -> c.onSuccess(trace1)))
+        .when(delegate).getTrace(eq(1L), any(Callback.class));
+
     assertThat(spanStore.getTrace(1L)).containsExactlyElementsOf(trace1);
   }
 
   @Test
   public void getTrace_exception() {
-    when(delegate.getTrace(1L)).thenThrow(new IllegalStateException("failed"));
-    thrown.expect(IllegalStateException.class);;
+    doAnswer(answer(c -> c.onError(new IllegalStateException("failed"))))
+        .when(delegate).getTrace(eq(1L), any(Callback.class));
+
+    thrown.expect(IllegalStateException.class);
     spanStore.getTrace(1L);
   }
 
   @Test
   public void getRawTrace_success() {
-    when(delegate.getRawTrace(1L)).thenReturn(immediateFuture(trace1));
+    doAnswer(answer(c -> c.onSuccess(trace1)))
+        .when(delegate).getRawTrace(eq(1L), any(Callback.class));
+
     assertThat(spanStore.getRawTrace(1L)).containsExactlyElementsOf(trace1);
   }
 
   @Test
   public void getRawTrace_exception() {
-    when(delegate.getRawTrace(1L)).thenThrow(new IllegalStateException("failed"));
+    doAnswer(answer(c -> c.onError(new IllegalStateException("failed"))))
+        .when(delegate).getRawTrace(eq(1L), any(Callback.class));
+
     thrown.expect(IllegalStateException.class);;
     spanStore.getRawTrace(1L);
   }
 
   @Test
-  public void getServiceNamees_success() {
-    when(delegate.getServiceNames())
-        .thenReturn(immediateFuture(Arrays.asList("service1", "service2")));
+  public void getServiceNames_success() {
+    doAnswer(answer(c -> c.onSuccess(asList("service1", "service2"))))
+        .when(delegate).getServiceNames(any(Callback.class));
+
     assertThat(spanStore.getServiceNames()).containsExactly("service1", "service2");
   }
 
   @Test
   public void getServiceNames_exception() {
-    when(delegate.getServiceNames()).thenThrow(new IllegalStateException("failed"));
-    thrown.expect(IllegalStateException.class);;
+    doAnswer(answer(c -> c.onError(new IllegalStateException("failed"))))
+        .when(delegate).getServiceNames(any(Callback.class));
+
+    thrown.expect(IllegalStateException.class);
     spanStore.getServiceNames();
   }
 
   @Test
   public void getSpanNames_success() {
-    when(delegate.getSpanNames("service")).thenReturn(immediateFuture(
-        Arrays.asList("span1", "span2")));
+    doAnswer(answer(c -> c.onSuccess(asList("span1", "span2"))))
+        .when(delegate).getSpanNames(eq("service"), any(Callback.class));
+
     assertThat(spanStore.getSpanNames("service")).containsExactly("span1", "span2");
   }
 
   @Test
   public void getSpanNames_exception() {
-    when(delegate.getSpanNames("service")).thenThrow(new IllegalStateException("failed"));
-    thrown.expect(IllegalStateException.class);;
+    doAnswer(answer(c -> c.onError(new IllegalStateException("failed"))))
+        .when(delegate).getSpanNames(eq("service"), any(Callback.class));
+
+    thrown.expect(IllegalStateException.class);
     spanStore.getSpanNames("service");
   }
 
   @Test
   public void getDependencies_success() {
-    when(delegate.getDependencies(1L, 0L)).thenReturn(immediateFuture(deps));
+    doAnswer(answer(c -> c.onSuccess(deps)))
+        .when(delegate).getDependencies(eq(1L), eq(0L), any(Callback.class));
+
     assertThat(spanStore.getDependencies(1L, 0L)).containsExactlyElementsOf(deps);
   }
 
   @Test
   public void getDependencies_exception() {
-    when(delegate.getDependencies(1L, 0L)).thenThrow(new IllegalStateException("failed"));
+    doAnswer(answer(c -> c.onError(new IllegalStateException("failed"))))
+        .when(delegate).getDependencies(eq(1L), eq(0L), any(Callback.class));
+
     thrown.expect(IllegalStateException.class);;
     spanStore.getDependencies(1L, 0L);
   }
+
+  static <T> Answer answer(Consumer<Callback<T>> onCallback) {
+    return invocation -> {
+      onCallback.accept((Callback) invocation.getArguments()[invocation.getArguments().length - 1]);
+      return null;
+    };
+  }
 }
diff --git a/zipkin/src/test/java/zipkin/async/CallbackCaptorTest.java b/zipkin/src/test/java/zipkin/async/CallbackCaptorTest.java
new file mode 100644
index 00000000000..45d3c4d6629
--- /dev/null
+++ b/zipkin/src/test/java/zipkin/async/CallbackCaptorTest.java
@@ -0,0 +1,82 @@
+/**
+ * Copyright 2015-2016 The OpenZipkin Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
+ * in compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package zipkin.async;
+
+import java.io.IOException;
+import java.util.concurrent.atomic.AtomicBoolean;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import zipkin.async.BlockingSpanStoreAdapter.CallbackCaptor;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.hamcrest.core.Is.isA;
+
+/** Tests for the internal class used by {@link BlockingSpanStoreAdapter} */
+public class CallbackCaptorTest {
+
+  @Rule
+  public ExpectedException thrown = ExpectedException.none();
+
+  @Test
+  public void getIsUninterruptable() {
+    AtomicBoolean returned = new AtomicBoolean();
+    CallbackCaptor<String> captor = new CallbackCaptor<>();
+    Thread thread = new Thread(() -> {
+      captor.get();
+      returned.set(true);
+    });
+    thread.start();
+    thread.interrupt();
+
+    assertThat(thread.isInterrupted()).isTrue();
+    assertThat(returned.get()).isFalse();
+  }
+
+  @Test
+  public void onSuccessReturns() {
+    CallbackCaptor<String> captor = new CallbackCaptor<>();
+    captor.onSuccess("foo");
+
+    assertThat(captor.get()).isEqualTo("foo");
+  }
+
+  @Test
+  public void onError_propagatesRuntimeException() {
+    CallbackCaptor<String> captor = new CallbackCaptor<>();
+    captor.onError(new IllegalStateException());
+
+    thrown.expect(IllegalStateException.class);
+    captor.get();
+  }
+
+  @Test
+  public void onError_propagatesError() {
+    CallbackCaptor<String> captor = new CallbackCaptor<>();
+    captor.onError(new LinkageError());
+
+    thrown.expect(LinkageError.class);
+    captor.get();
+  }
+
+  @Test
+  public void onError_wrapsCheckedExceptions() {
+    CallbackCaptor<String> captor = new CallbackCaptor<>();
+    captor.onError(new IOException());
+
+    thrown.expect(RuntimeException.class);
+    thrown.expectCause(isA(IOException.class));
+    captor.get();
+  }
+}