diff --git a/zipkin-storage/cassandra3/src/main/java/zipkin/storage/cassandra3/CassandraSpanConsumer.java b/zipkin-storage/cassandra3/src/main/java/zipkin/storage/cassandra3/CassandraSpanConsumer.java index 3af176c704e..721f7b9c6c1 100644 --- a/zipkin-storage/cassandra3/src/main/java/zipkin/storage/cassandra3/CassandraSpanConsumer.java +++ b/zipkin-storage/cassandra3/src/main/java/zipkin/storage/cassandra3/CassandraSpanConsumer.java @@ -34,23 +34,29 @@ import zipkin.Annotation; import zipkin.BinaryAnnotation; import zipkin.Span; +import zipkin.internal.Pair; import zipkin.storage.cassandra3.Schema.AnnotationUDT; import zipkin.storage.cassandra3.Schema.BinaryAnnotationUDT; import zipkin.storage.guava.GuavaSpanConsumer; import static com.google.common.util.concurrent.Futures.transform; import static zipkin.internal.ApplyTimestampAndDuration.guessTimestamp; +import static zipkin.internal.Util.checkNotNull; import static zipkin.storage.cassandra3.CassandraUtil.bindWithName; import static zipkin.storage.cassandra3.CassandraUtil.durationIndexBucket; final class CassandraSpanConsumer implements GuavaSpanConsumer { private static final Logger LOG = LoggerFactory.getLogger(CassandraSpanConsumer.class); private static final Function TO_VOID = Functions.constant(null); + private static final long WRITTEN_NAMES_TTL + = Long.getLong("zipkin.store.cassandra3.internal.writtenNamesTtl", 60 * 60 * 1000); private final Session session; private final PreparedStatement insertSpan; + private final PreparedStatement insertTraceServiceSpanName; private final PreparedStatement insertServiceSpanName; private final Schema.Metadata metadata; + private final DeduplicatingExecutor deduplicatingExecutor; CassandraSpanConsumer(Session session) { this.session = session; @@ -70,7 +76,7 @@ final class CassandraSpanConsumer implements GuavaSpanConsumer { .value("binary_annotations", QueryBuilder.bindMarker("binary_annotations")) .value("all_annotations", QueryBuilder.bindMarker("all_annotations"))); - insertServiceSpanName = session.prepare( + insertTraceServiceSpanName = session.prepare( QueryBuilder .insertInto(Schema.TABLE_TRACE_BY_SERVICE_SPAN) .value("service_name", QueryBuilder.bindMarker("service_name")) @@ -79,6 +85,14 @@ final class CassandraSpanConsumer implements GuavaSpanConsumer { .value("ts", QueryBuilder.bindMarker("ts")) .value("trace_id", QueryBuilder.bindMarker("trace_id")) .value("duration", QueryBuilder.bindMarker("duration"))); + + insertServiceSpanName = session.prepare( + QueryBuilder + .insertInto(Schema.TABLE_SERVICE_SPANS) + .value("service_name", QueryBuilder.bindMarker("service_name")) + .value("span_name", QueryBuilder.bindMarker("span_name"))); + + deduplicatingExecutor = new DeduplicatingExecutor(session, WRITTEN_NAMES_TTL); } /** @@ -98,12 +112,13 @@ public ListenableFuture accept(List rawSpans) { for (String serviceName : span.serviceNames()) { // QueryRequest.min/maxDuration if (timestamp != null) { - // Contract for Repository.storeServiceSpanName is to store the span twice, once with + // Contract for Repository.storeTraceServiceSpanName is to store the span twice, once with // the span name and another with empty string. - futures.add(storeServiceSpanName(serviceName, span.name, timestamp, span.duration, + futures.add(storeTraceServiceSpanName(serviceName, span.name, timestamp, span.duration, traceId)); if (!span.name.isEmpty()) { // If span.name == "", this would be redundant - futures.add(storeServiceSpanName(serviceName, "", timestamp, span.duration, traceId)); + futures.add(storeTraceServiceSpanName(serviceName, "", timestamp, span.duration, traceId)); + futures.add(storeServiceSpanName(serviceName, span.name)); } } } @@ -162,7 +177,7 @@ ListenableFuture storeSpan(Span span, BigInteger traceId, Long timestamp) { } } - ListenableFuture storeServiceSpanName( + ListenableFuture storeTraceServiceSpanName( String serviceName, String spanName, long timestamp_micro, @@ -170,24 +185,98 @@ ListenableFuture storeServiceSpanName( BigInteger traceId) { int bucket = durationIndexBucket(timestamp_micro); + UUID ts = new UUID( + UUIDs.startOf(timestamp_micro / 1000).getMostSignificantBits(), + UUIDs.random().getLeastSignificantBits()); try { BoundStatement bound = - bindWithName(insertServiceSpanName, "insert-service-span-name") + bindWithName(insertTraceServiceSpanName, "insert-service-span-name") .setString("service_name", serviceName) .setString("span_name", spanName) .setInt("bucket", bucket) - .setUUID("ts", new UUID( - UUIDs.startOf(timestamp_micro / 1000).getMostSignificantBits(), - UUIDs.random().getLeastSignificantBits())) + .setUUID("ts", ts) .setVarint("trace_id", traceId); if (null != duration) { bound = bound.setLong("duration", duration); } - return session.executeAsync(bound); + return deduplicatingExecutor.maybeExecuteAsync(bound, + new TraceServiceSpanNameKey(serviceName, spanName, bucket, ts)); + + } catch (RuntimeException ex) { + return Futures.immediateFailedFuture(ex); + } + } + + ListenableFuture storeServiceSpanName( + String serviceName, + String spanName + ) { + try { + BoundStatement bound = bindWithName(insertServiceSpanName, "insert-service-span-name") + .setString("service_name", serviceName) + .setString("span_name", spanName); + + return deduplicatingExecutor.maybeExecuteAsync(bound, Pair.create(serviceName, spanName)); } catch (RuntimeException ex) { return Futures.immediateFailedFuture(ex); } } + + static BigInteger traceId(Span span) { + return span.traceIdHigh != 0 + ? BigInteger.valueOf(span.traceIdHigh).shiftLeft(64).or(BigInteger.valueOf(span.traceId)) + : BigInteger.valueOf(span.traceId); + } + + final class TraceServiceSpanNameKey { + + final String serviceName; + final String spanName; + final int bucket; + final UUID ts; + + TraceServiceSpanNameKey(String serviceName, String spanName, int bucket, UUID ts) { + this.serviceName = serviceName; + this.spanName = spanName; + this.bucket = bucket; + this.ts = ts; + } + + @Override + public String toString() { + return "(" + serviceName + ", " + spanName + ", " + bucket + ", " + ts + " )"; + } + + @Override + public boolean equals(Object o) { + if (o == this) { + return true; + } + if (o instanceof TraceServiceSpanNameKey) { + TraceServiceSpanNameKey that = (TraceServiceSpanNameKey) o; + return this.serviceName.equals(that.serviceName) && + this.spanName.equals(that.spanName) && + this.bucket == that.bucket && + this.ts.equals(that.ts); + } + return false; + } + + @Override + public int hashCode() { + int h = 1; + h *= 1000003; + h ^= this.serviceName.hashCode(); + h *= 1000003; + h ^= this.spanName.hashCode(); + h *= 1000003; + h ^= this.bucket; + h *= 1000003; + h ^= this.ts.hashCode(); + return h; + } + } + } diff --git a/zipkin-storage/cassandra3/src/main/java/zipkin/storage/cassandra3/CassandraSpanStore.java b/zipkin-storage/cassandra3/src/main/java/zipkin/storage/cassandra3/CassandraSpanStore.java index 3b84b867198..522dbe91171 100644 --- a/zipkin-storage/cassandra3/src/main/java/zipkin/storage/cassandra3/CassandraSpanStore.java +++ b/zipkin-storage/cassandra3/src/main/java/zipkin/storage/cassandra3/CassandraSpanStore.java @@ -23,9 +23,9 @@ import com.datastax.driver.core.utils.UUIDs; import com.google.common.base.Function; import com.google.common.collect.ContiguousSet; -import com.google.common.collect.FluentIterable; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Iterators; import com.google.common.collect.Ordering; import com.google.common.collect.Range; import com.google.common.util.concurrent.AsyncFunction; @@ -67,6 +67,7 @@ import static zipkin.internal.Util.getDays; import static zipkin.storage.cassandra3.Schema.TABLE_TRACES; import static zipkin.storage.cassandra3.Schema.TABLE_TRACE_BY_SERVICE_SPAN; +import static zipkin.storage.cassandra3.Schema.TABLE_SERVICE_SPANS; final class CassandraSpanStore implements GuavaSpanStore { private static final Logger LOG = LoggerFactory.getLogger(CassandraSpanStore.class); @@ -116,11 +117,11 @@ public int compare(List left, List right) { selectServiceNames = session.prepare( QueryBuilder.select("service_name") .distinct() - .from(Schema.VIEW_TRACE_BY_SERVICE)); + .from(TABLE_SERVICE_SPANS)); selectSpanNames = session.prepare( QueryBuilder.select("span_name") - .from(Schema.VIEW_TRACE_BY_SERVICE) + .from(TABLE_SERVICE_SPANS) .where(QueryBuilder.eq("service_name", QueryBuilder.bindMarker("service_name"))) .limit(QueryBuilder.bindMarker("limit_"))); @@ -223,8 +224,9 @@ public ListenableFuture>> getTraces(final QueryRequest request) } return transform(traceIds, new AsyncFunction, List>>() { @Override public ListenableFuture>> apply(Collection traceIds) { - traceIds = FluentIterable.from(traceIds).limit(request.limit).toSet(); - return transform(getSpansByTraceIds(ImmutableSet.copyOf(traceIds), maxTraceCols), AdjustTraces.INSTANCE); + ImmutableSet set = ImmutableSet.copyOf(traceIds); + set = ImmutableSet.copyOf(Iterators.limit(set.iterator(), request.limit)); + return transform(getSpansByTraceIds(set, maxTraceCols), AdjustTraces.INSTANCE); } @Override public String toString() { diff --git a/zipkin-storage/cassandra3/src/main/java/zipkin/storage/cassandra3/DeduplicatingExecutor.java b/zipkin-storage/cassandra3/src/main/java/zipkin/storage/cassandra3/DeduplicatingExecutor.java new file mode 100644 index 00000000000..b8b7a975248 --- /dev/null +++ b/zipkin-storage/cassandra3/src/main/java/zipkin/storage/cassandra3/DeduplicatingExecutor.java @@ -0,0 +1,160 @@ +/** + * Copyright 2015-2016 The OpenZipkin Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package zipkin.storage.cassandra3; + +import com.datastax.driver.core.BoundStatement; +import com.datastax.driver.core.Session; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Ticker; +import com.google.common.cache.CacheBuilder; +import com.google.common.cache.CacheLoader; +import com.google.common.cache.LoadingCache; +import com.google.common.util.concurrent.FutureCallback; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.SettableFuture; +import com.google.common.util.concurrent.UncheckedExecutionException; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; + +import static zipkin.internal.Util.checkNotNull; + +/** + * This reduces load on cassandra by preventing semantically equivalent requests from being invoked, + * subject to a local TTL. + * + *

Ex. If you want to test that you don't repeatedly send bad data, you could send a 400 back. + * + *

{@code
+ * ttl = 60 * 1000; // 1 minute
+ * deduper = new DeduplicatingExecutor(session, ttl);
+ *
+ * // the result of the first execution against "foo" is returned to other callers
+ * // until it expires a minute later.
+ * deduper.maybeExecute(bound, "foo");
+ * deduper.maybeExecute(bound, "foo");
+ * }
+ */ +class DeduplicatingExecutor { // not final for testing + + private final Session session; + private final LoadingCache> cache; + + /** + * @param session which conditionally executes bound statements + * @param ttl how long the results of statements are remembered, in milliseconds. + */ + DeduplicatingExecutor(Session session, long ttl) { + this.session = session; + this.cache = CacheBuilder.newBuilder() + .expireAfterWrite(ttl, TimeUnit.MILLISECONDS) + .ticker(new Ticker() { + @Override public long read() { + return nanoTime(); + } + }) + // TODO: maximum size or weight + .build(new CacheLoader>() { + @Override public ListenableFuture load(final BoundStatementKey key) { + ListenableFuture cassandraFuture = executeAsync(key.statement); + + // Drop the cassandra future so that we don't hold references to cassandra state for + // long periods of time. + final SettableFuture disconnectedFuture = SettableFuture.create(); + Futures.addCallback(cassandraFuture, new FutureCallback() { + + @Override public void onSuccess(Object result) { + disconnectedFuture.set(null); + } + + @Override public void onFailure(Throwable t) { + cache.invalidate(key); + disconnectedFuture.setException(t); + } + }); + return disconnectedFuture; + } + }); + } + + /** + * Upon success, the statement's result will be remembered and returned for all subsequent + * executions with the same key, subject to a local TTL. + * + *

The results of failed statements are forgotten based on the supplied key. + * + * @param statement what to conditionally execute + * @param key determines equivalence of the bound statement + * @return future of work initiated by this or a previous request + */ + ListenableFuture maybeExecuteAsync(BoundStatement statement, Object key) { + BoundStatementKey cacheKey = new BoundStatementKey(statement, key); + try { + ListenableFuture result = cache.get(new BoundStatementKey(statement, key)); + // A future could be constructed directly (i.e. immediate future), get the value to + // see if it was exceptional. If so, the catch block will invalidate that key. + if (result.isDone()) result.get(); + return result; + } catch (UncheckedExecutionException | ExecutionException e) { + cache.invalidate(cacheKey); + return Futures.immediateFailedFuture(e.getCause()); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new AssertionError(); + } + } + + // visible for testing, since nanoTime is weird and can return negative + long nanoTime() { + return System.nanoTime(); + } + + @VisibleForTesting ListenableFuture executeAsync(BoundStatement statement) { + return session.executeAsync(statement); + } + + @VisibleForTesting void clear() { + cache.invalidateAll(); + } + + /** Used to hold a reference to the last statement executed, but without using it in hashCode */ + static final class BoundStatementKey { + final BoundStatement statement; + final Object key; + + BoundStatementKey(BoundStatement statement, Object key) { + this.statement = checkNotNull(statement, "statement"); + this.key = checkNotNull(key, "key"); + } + + @Override + public String toString() { + return "(" + key + ", " + statement + ")"; + } + + @Override + public boolean equals(Object o) { + if (o == this) return true; + if (o instanceof BoundStatementKey) { + return this.key.equals(((BoundStatementKey) o).key); + } + return false; + } + + @Override + public int hashCode() { + return key.hashCode(); + } + } +} diff --git a/zipkin-storage/cassandra3/src/main/java/zipkin/storage/cassandra3/Schema.java b/zipkin-storage/cassandra3/src/main/java/zipkin/storage/cassandra3/Schema.java index 512330b4933..35974932079 100644 --- a/zipkin-storage/cassandra3/src/main/java/zipkin/storage/cassandra3/Schema.java +++ b/zipkin-storage/cassandra3/src/main/java/zipkin/storage/cassandra3/Schema.java @@ -42,8 +42,8 @@ final class Schema { static final String TABLE_TRACES = "traces"; static final String TABLE_TRACE_BY_SERVICE_SPAN = "trace_by_service_span"; + static final String TABLE_SERVICE_SPANS = "span_name_by_service"; static final String TABLE_DEPENDENCIES = "dependencies"; - static final String VIEW_TRACE_BY_SERVICE = "trace_by_service"; static final String DEFAULT_KEYSPACE = "zipkin3"; private static final String SCHEMA_RESOURCE = "/cassandra3-schema.cql"; diff --git a/zipkin-storage/cassandra3/src/main/resources/cassandra3-schema.cql b/zipkin-storage/cassandra3/src/main/resources/cassandra3-schema.cql index 3370e249016..c24ef1ed948 100644 --- a/zipkin-storage/cassandra3/src/main/resources/cassandra3-schema.cql +++ b/zipkin-storage/cassandra3/src/main/resources/cassandra3-schema.cql @@ -51,14 +51,13 @@ CREATE TABLE zipkin3.trace_by_service_span ( AND compaction = {'class': 'org.apache.cassandra.db.compaction.TimeWindowCompactionStrategy'} AND default_time_to_live = 259200; - -CREATE MATERIALIZED VIEW IF NOT EXISTS zipkin3.trace_by_service - AS SELECT service_name, span_name, bucket, ts, trace_id FROM trace_by_service_span - WHERE service_name is not null AND span_name is not null AND bucket is not null AND ts is not null - PRIMARY KEY (service_name, bucket, span_name, ts) - WITH CLUSTERING ORDER BY (bucket DESC, ts DESC) - AND compaction = {'class': 'org.apache.cassandra.db.compaction.LeveledCompactionStrategy', 'unchecked_tombstone_compaction': 'true', 'tombstone_threshold': '0.2'} - AND default_time_to_live = 259200; +CREATE TABLE zipkin3.span_name_by_service ( + service_name text, + span_name text, + PRIMARY KEY (service_name, span_name) +) + WITH compaction = {'class': 'org.apache.cassandra.db.compaction.LeveledCompactionStrategy', 'unchecked_tombstone_compaction': 'true', 'tombstone_threshold': '0.2'} + AND default_time_to_live = 259200; CREATE CUSTOM INDEX ON zipkin3.traces (all_annotations) USING 'org.apache.cassandra.index.sasi.SASIIndex' WITH OPTIONS = { diff --git a/zipkin-storage/cassandra3/src/test/java/zipkin/storage/cassandra3/DeduplicatingExecutorTest.java b/zipkin-storage/cassandra3/src/test/java/zipkin/storage/cassandra3/DeduplicatingExecutorTest.java new file mode 100644 index 00000000000..46db5fe39f3 --- /dev/null +++ b/zipkin-storage/cassandra3/src/test/java/zipkin/storage/cassandra3/DeduplicatingExecutorTest.java @@ -0,0 +1,221 @@ +/** + * Copyright 2015-2016 The OpenZipkin Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package zipkin.storage.cassandra3; + +import com.datastax.driver.core.BoundStatement; +import com.datastax.driver.core.ResultSetFuture; +import com.datastax.driver.core.Session; +import com.google.common.collect.ImmutableSet; +import com.google.common.reflect.Reflection; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.ListeningExecutorService; +import java.util.Collection; +import java.util.concurrent.ConcurrentLinkedDeque; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.function.Function; +import org.junit.Test; + +import static com.google.common.util.concurrent.MoreExecutors.listeningDecorator; +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.failBecauseExceptionWasNotThrown; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +public class DeduplicatingExecutorTest { + TestDeduplicatingExecutor executor = TestDeduplicatingExecutor.create(Futures::immediateFuture); + + BoundStatement first = mock(BoundStatement.class); + BoundStatement next = mock(BoundStatement.class); + + @Test + public void expiresWhenTtlPasses() throws Exception { + executor.nanoTime = 0; + + ListenableFuture firstFoo = executor.maybeExecuteAsync(first, "foo"); + + // same result for the key foo + assertThat(executor.maybeExecuteAsync(next, "foo")) + .isEqualTo(firstFoo); + + executor.nanoTime = TimeUnit.MILLISECONDS.toNanos(500); + + // still, same result for the foo + assertThat(executor.maybeExecuteAsync(next, "foo")) + .isEqualTo(firstFoo); + + // add a key for the element that happened after "foo" + ListenableFuture firstBar = executor.maybeExecuteAsync(first, "bar"); + assertThat(firstBar) + .isNotEqualTo(firstFoo); + + // A second after the first call, we should try again + executor.nanoTime = TimeUnit.SECONDS.toNanos(1); + + // first key refreshes + assertThat(executor.maybeExecuteAsync(next, "foo")) + .isNotEqualTo(firstFoo); + + // second key still caching + assertThat(executor.maybeExecuteAsync(first, "bar")) + .isEqualTo(firstBar); + } + + @Test + public void exceptionArentCached_immediateFuture() throws Exception { + executor = TestDeduplicatingExecutor.create(s -> { + if (s == first) return Futures.immediateFailedFuture(new IllegalArgumentException()); + return Futures.immediateFuture(s); + }); + exceptionsArentCached(); + } + + @Test + public void exceptionArentCached_deferredFuture() throws Exception { + ListeningExecutorService exec = listeningDecorator(Executors.newSingleThreadExecutor()); + try { + executor = TestDeduplicatingExecutor.create(s -> { + if (s == first) { + return exec.submit(() -> { + Thread.sleep(50); + throw new IllegalArgumentException(); + }); + } + return Futures.immediateFuture(s); + }); + exceptionsArentCached(); + } finally { + exec.shutdownNow(); + } + } + + @Test + public void exceptionArentCached_creatingFuture() throws Exception { + executor = TestDeduplicatingExecutor.create(s -> { + if (s == first) throw new IllegalArgumentException(); + return Futures.immediateFuture(s); + }); + exceptionsArentCached(); + } + + private void exceptionsArentCached() throws Exception { + executor.nanoTime = 0; + + // Intentionally not dereferencing the future. We need to ensure that dropped failed + // futures still purge! + ListenableFuture firstFoo = executor.maybeExecuteAsync(first, "foo"); + + Thread.sleep(100); // wait a bit for the future to execute and cache to purge the entry + + // doesn't cache exception + assertThat(executor.maybeExecuteAsync(next, "foo")) + .isNotEqualTo(firstFoo); + + // sanity check the first future actually failed + try { + firstFoo.get(); + failBecauseExceptionWasNotThrown(ExecutionException.class); + } catch (ExecutionException e) { + assertThat(e).hasCauseInstanceOf(IllegalArgumentException.class); + } + } + + /** + * This shows that any number of threads perform a computation only once. + */ + @Test + public void multithreaded() throws Exception { + Session session = mock(Session.class); + DeduplicatingExecutor executor = + new DeduplicatingExecutor(session, TimeUnit.SECONDS.toMillis(1L)); + BoundStatement statement = mock(BoundStatement.class); + when(session.executeAsync(statement)) + .thenAnswer(invocationOnMock -> mock(ResultSetFuture.class)); + + int loopCount = 1000; + CountDownLatch latch = new CountDownLatch(loopCount); + ExecutorService exec = Executors.newFixedThreadPool(10); + + Collection> futures = new ConcurrentLinkedDeque<>(); + for (int i = 0; i < loopCount; i++) { + exec.execute(() -> { + futures.add(executor.maybeExecuteAsync(statement, "foo")); + futures.add(executor.maybeExecuteAsync(statement, "bar")); + latch.countDown(); + }); + } + latch.await(); + + ImmutableSet> distinctFutures = ImmutableSet.copyOf(futures); + + assertThat(distinctFutures).hasSize(2); + + // expire the result + Thread.sleep(1000L); + + // Sanity check: we don't memoize after we should have expired. + assertThat(executor.maybeExecuteAsync(statement, "foo")) + .isNotIn(distinctFutures); + assertThat(executor.maybeExecuteAsync(statement, "bar")) + .isNotIn(distinctFutures); + } + + @Test + public void expiresWhenTtlPasses_initiallyNegative() throws Exception { + executor.nanoTime = -TimeUnit.SECONDS.toNanos(1); + + ListenableFuture firstFuture = executor.maybeExecuteAsync(first, "foo"); + assertThat(executor.maybeExecuteAsync(next, "foo")) + .isEqualTo(firstFuture); + + // A second after the first call, we should try again + executor.nanoTime = 0; + + assertThat(executor.maybeExecuteAsync(next, "foo")) + .isNotEqualTo(firstFuture); + } + + static class TestDeduplicatingExecutor extends DeduplicatingExecutor { + static TestDeduplicatingExecutor create(Function> callee) { + return new TestDeduplicatingExecutor(callee); + } + + final Function> delegate; + long nanoTime; + + protected TestDeduplicatingExecutor(Function> delegate) { + super(fakeSession(delegate), TimeUnit.SECONDS.toMillis(1L)); + this.delegate = delegate; + } + + @Override long nanoTime() { + return nanoTime; + } + + @Override ListenableFuture executeAsync(BoundStatement statement) { + return delegate.apply(statement); + } + } + + static Session fakeSession(final Function> delegate) { + return Reflection.newProxy(Session.class, (proxy, method, args) -> { + assert method.getName().equals("executeAsync") && args[0] instanceof BoundStatement; + return delegate.apply((BoundStatement) args[0]); + }); + } +}