diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AsyncResultSetImpl.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AsyncResultSetImpl.java index 442556b56d7..1cb768ea85a 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AsyncResultSetImpl.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AsyncResultSetImpl.java @@ -275,7 +275,7 @@ public void run() { switch (response) { case DONE: state = State.DONE; - closeDelegateResultSet(); + cursorReturnedDoneOrException = true; return; case PAUSE: state = State.PAUSED; diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/AsyncResultSetImplStressTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/AsyncResultSetImplStressTest.java index 44a05712594..71447d46e7e 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/AsyncResultSetImplStressTest.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/AsyncResultSetImplStressTest.java @@ -284,6 +284,47 @@ public CallbackResponse cursorReady(AsyncResultSet resultSet) { } } + @Test + public void returnDoneBeforeEnd() throws Exception { + ExecutorProvider executorProvider = SpannerOptions.createDefaultAsyncExecutorProvider(); + final Random random = new Random(); + for (Executor executor : + new Executor[] { + MoreExecutors.directExecutor(), createExecService(), createExecService(32) + }) { + for (int bufferSize = 1; bufferSize < resultSetSize * 2; bufferSize *= 2) { + for (int i = 0; i < TEST_RUNS; i++) { + try (AsyncResultSetImpl impl = + new AsyncResultSetImpl(executorProvider, createResultSet(), bufferSize)) { + ApiFuture res = + impl.setCallback( + executor, + new ReadyCallback() { + @Override + public CallbackResponse cursorReady(AsyncResultSet resultSet) { + switch (resultSet.tryNext()) { + case DONE: + return CallbackResponse.DONE; + case NOT_READY: + return random.nextBoolean() + ? CallbackResponse.DONE + : CallbackResponse.CONTINUE; + case OK: + return random.nextInt(resultSetSize) <= 2 + ? CallbackResponse.DONE + : CallbackResponse.CONTINUE; + default: + throw new IllegalStateException(); + } + } + }); + assertThat(res.get(10L, TimeUnit.SECONDS)).isNull(); + } + } + } + } + } + @Test public void pauseResume() throws Exception { ExecutorProvider executorProvider = SpannerOptions.createDefaultAsyncExecutorProvider(); diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/AsyncResultSetImplTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/AsyncResultSetImplTest.java index 78da23ca433..682802d85eb 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/AsyncResultSetImplTest.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/AsyncResultSetImplTest.java @@ -440,4 +440,26 @@ public CallbackResponse cursorReady(AsyncResultSet resultSet) { assertThat(callbackCounter.get()).isEqualTo(1); } } + + @Test + public void callbackReturnsDoneBeforeEnd_shouldStopIteration() throws Exception { + Executor executor = Executors.newSingleThreadExecutor(); + ResultSet delegate = mock(ResultSet.class); + when(delegate.next()).thenReturn(true, true, true, false); + when(delegate.getCurrentRowAsStruct()).thenReturn(mock(Struct.class)); + try (AsyncResultSetImpl rs = + new AsyncResultSetImpl(simpleProvider, delegate, AsyncResultSetImpl.DEFAULT_BUFFER_SIZE)) { + rs.setCallback( + executor, + new ReadyCallback() { + @Override + public CallbackResponse cursorReady(AsyncResultSet resultSet) { + // Not calling resultSet.tryNext() means that it will also never return DONE. + // Instead the callback indicates that it does not want any more rows. + return CallbackResponse.DONE; + } + }); + rs.getResult().get(10L, TimeUnit.SECONDS); + } + } }