Skip to content

Commit

Permalink
AsyncOperator#isFinished must never return true on failure (elastic#1…
Browse files Browse the repository at this point in the history
…04029) (elastic#104079)

Enrich IT tests can return OK with some missing results instead of
Failure when the enrich lookup hits circuit breakers. This is due to a
race condition in isFinished and onFailure within the AsyncOperator.
When an async lookup fails, we set the exception and then discard pages.
Unfortunately, in the isFinished method, we perform the checks in the
same order: first, we check for failure, and then we check for
outstanding pages. If there is a long pause between these steps,
isFinished might not detect the failure but see no outstanding pages,
leading it to return true despite the presence of a failure. This change
swaps the order of the checks.
  • Loading branch information
dnhatn authored Jan 8, 2024
1 parent da06c53 commit 3f5f7d2
Show file tree
Hide file tree
Showing 3 changed files with 60 additions and 2 deletions.
5 changes: 5 additions & 0 deletions docs/changelog/104029.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 104029
summary: '`AsyncOperator#isFinished` must never return true on failure'
area: ES|QL
type: bug
issues: []
Original file line number Diff line number Diff line change
Expand Up @@ -134,8 +134,12 @@ public void finish() {

@Override
public boolean isFinished() {
checkFailure();
return finished && checkpoint.getPersistedCheckpoint() == checkpoint.getMaxSeqNo();
if (finished && checkpoint.getPersistedCheckpoint() == checkpoint.getMaxSeqNo()) {
checkFailure();
return true;
} else {
return false;
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
package org.elasticsearch.compute.operator;

import org.apache.lucene.util.BytesRef;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionRunnable;
import org.elasticsearch.action.support.PlainActionFuture;
Expand Down Expand Up @@ -36,6 +37,8 @@
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

import static org.hamcrest.Matchers.equalTo;
Expand Down Expand Up @@ -166,6 +169,52 @@ public void close() {
operator.close();
}

public void testIsFinished() {
int iters = iterations(10, 10_000);
for (int i = 0; i < iters; i++) {
DriverContext driverContext = driverContext();
CyclicBarrier barrier = new CyclicBarrier(2);
AsyncOperator asyncOperator = new AsyncOperator(between(1, 10)) {
@Override
protected void performAsync(Page inputPage, ActionListener<Page> listener) {
ActionRunnable<Page> command = new ActionRunnable<>(listener) {
@Override
protected void doRun() {
try {
barrier.await(10, TimeUnit.SECONDS);
} catch (Exception e) {
throw new AssertionError(e);
}
listener.onFailure(new ElasticsearchException("simulated"));
}
};
threadPool.executor(ESQL_TEST_EXECUTOR).execute(command);
}

@Override
public void close() {

}
};
asyncOperator.addInput(new Page(driverContext.blockFactory().newConstantIntBlockWith(randomInt(), between(1, 10))));
asyncOperator.finish();
try {
barrier.await(10, TimeUnit.SECONDS);
} catch (Exception e) {
throw new AssertionError(e);
}
int numChecks = between(10, 100);
while (--numChecks >= 0) {
try {
assertFalse("must not finished or failed", asyncOperator.isFinished());
} catch (ElasticsearchException e) {
assertThat(e.getMessage(), equalTo("simulated"));
break;
}
}
}
}

static class LookupService {
private final ThreadPool threadPool;
private final Map<Long, String> dict;
Expand Down

0 comments on commit 3f5f7d2

Please sign in to comment.