diff --git a/zipkin-storage/cassandra/src/test/java/zipkin/storage/cassandra/DeduplicatingExecutorTest.java b/zipkin-storage/cassandra/src/test/java/zipkin/storage/cassandra/DeduplicatingExecutorTest.java index 3cedc939b1c..1c96a4251f6 100644 --- a/zipkin-storage/cassandra/src/test/java/zipkin/storage/cassandra/DeduplicatingExecutorTest.java +++ b/zipkin-storage/cassandra/src/test/java/zipkin/storage/cassandra/DeduplicatingExecutorTest.java @@ -14,7 +14,7 @@ package zipkin.storage.cassandra; import com.datastax.driver.core.BoundStatement; -import com.datastax.driver.core.PreparedStatement; +import com.datastax.driver.core.ResultSetFuture; import com.datastax.driver.core.Session; import com.google.common.collect.ImmutableSet; import com.google.common.reflect.Reflection; @@ -35,6 +35,7 @@ import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.failBecauseExceptionWasNotThrown; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; public class DeduplicatingExecutorTest { TestDeduplicatingExecutor executor = TestDeduplicatingExecutor.create(Futures::immediateFuture); @@ -137,12 +138,14 @@ private void exceptionsArentCached() throws Exception { /** * This shows that any number of threads perform a computation only once. */ - @Test(timeout = 2000L) // 1000 for the selects + expiration (which is 1 second) + @Test public void multithreaded() throws Exception { - Session session = CassandraTestGraph.INSTANCE.storage.get().session(); + Session session = mock(Session.class); DeduplicatingExecutor executor = new DeduplicatingExecutor(session, TimeUnit.SECONDS.toMillis(1L)); - PreparedStatement now = session.prepare("SELECT now() FROM system.local"); + BoundStatement statement = mock(BoundStatement.class); + when(session.executeAsync(statement)) + .thenAnswer(invocationOnMock -> mock(ResultSetFuture.class)); int loopCount = 1000; CountDownLatch latch = new CountDownLatch(loopCount); @@ -151,8 +154,8 @@ public void multithreaded() throws Exception { Collection> futures = new ConcurrentLinkedDeque<>(); for (int i = 0; i < loopCount; i++) { exec.execute(() -> { - futures.add(executor.maybeExecuteAsync(now.bind(), "foo")); - futures.add(executor.maybeExecuteAsync(now.bind(), "bar")); + futures.add(executor.maybeExecuteAsync(statement, "foo")); + futures.add(executor.maybeExecuteAsync(statement, "bar")); latch.countDown(); }); } @@ -166,9 +169,9 @@ public void multithreaded() throws Exception { Thread.sleep(1000L); // Sanity check: we don't memoize after we should have expired. - assertThat(executor.maybeExecuteAsync(now.bind(), "foo")) + assertThat(executor.maybeExecuteAsync(statement, "foo")) .isNotIn(distinctFutures); - assertThat(executor.maybeExecuteAsync(now.bind(), "bar")) + assertThat(executor.maybeExecuteAsync(statement, "bar")) .isNotIn(distinctFutures); }