Skip to content

Commit

Permalink
fix: do not close delegate rs in callback runnable (#425)
Browse files Browse the repository at this point in the history
* fix: do not close delegate rs in callback runnable

* test: add test case to show infinite loop

* test: add stress test
  • Loading branch information
olavloite authored Sep 17, 2020
1 parent 01d6bfd commit dce3ee7
Show file tree
Hide file tree
Showing 3 changed files with 64 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -275,7 +275,7 @@ public void run() {
switch (response) {
case DONE:
state = State.DONE;
closeDelegateResultSet();
cursorReturnedDoneOrException = true;
return;
case PAUSE:
state = State.PAUSED;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -284,6 +284,47 @@ public CallbackResponse cursorReady(AsyncResultSet resultSet) {
}
}

@Test
public void returnDoneBeforeEnd() throws Exception {
ExecutorProvider executorProvider = SpannerOptions.createDefaultAsyncExecutorProvider();
final Random random = new Random();
for (Executor executor :
new Executor[] {
MoreExecutors.directExecutor(), createExecService(), createExecService(32)
}) {
for (int bufferSize = 1; bufferSize < resultSetSize * 2; bufferSize *= 2) {
for (int i = 0; i < TEST_RUNS; i++) {
try (AsyncResultSetImpl impl =
new AsyncResultSetImpl(executorProvider, createResultSet(), bufferSize)) {
ApiFuture<Void> res =
impl.setCallback(
executor,
new ReadyCallback() {
@Override
public CallbackResponse cursorReady(AsyncResultSet resultSet) {
switch (resultSet.tryNext()) {
case DONE:
return CallbackResponse.DONE;
case NOT_READY:
return random.nextBoolean()
? CallbackResponse.DONE
: CallbackResponse.CONTINUE;
case OK:
return random.nextInt(resultSetSize) <= 2
? CallbackResponse.DONE
: CallbackResponse.CONTINUE;
default:
throw new IllegalStateException();
}
}
});
assertThat(res.get(10L, TimeUnit.SECONDS)).isNull();
}
}
}
}
}

@Test
public void pauseResume() throws Exception {
ExecutorProvider executorProvider = SpannerOptions.createDefaultAsyncExecutorProvider();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -440,4 +440,26 @@ public CallbackResponse cursorReady(AsyncResultSet resultSet) {
assertThat(callbackCounter.get()).isEqualTo(1);
}
}

@Test
public void callbackReturnsDoneBeforeEnd_shouldStopIteration() throws Exception {
Executor executor = Executors.newSingleThreadExecutor();
ResultSet delegate = mock(ResultSet.class);
when(delegate.next()).thenReturn(true, true, true, false);
when(delegate.getCurrentRowAsStruct()).thenReturn(mock(Struct.class));
try (AsyncResultSetImpl rs =
new AsyncResultSetImpl(simpleProvider, delegate, AsyncResultSetImpl.DEFAULT_BUFFER_SIZE)) {
rs.setCallback(
executor,
new ReadyCallback() {
@Override
public CallbackResponse cursorReady(AsyncResultSet resultSet) {
// Not calling resultSet.tryNext() means that it will also never return DONE.
// Instead the callback indicates that it does not want any more rows.
return CallbackResponse.DONE;
}
});
rs.getResult().get(10L, TimeUnit.SECONDS);
}
}
}

0 comments on commit dce3ee7

Please sign in to comment.