Skip to content

Commit

Permalink
Fix high water mark test which submits data early (elastic#114159)
Browse files Browse the repository at this point in the history
It is possible in the incremental high watermark test that the data is
submitted causing a corruption of the bulk request. This commit fixes
the issue to ensure we only send new data after it has been requested.
Additionally, it adds an assertion to prevent this error from happening
again.
  • Loading branch information
Tim-Brooks committed Oct 4, 2024
1 parent c4698c6 commit 52a4ab7
Show file tree
Hide file tree
Showing 2 changed files with 11 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -234,6 +234,9 @@ public void testIncrementalBulkHighWatermarkBackOff() throws Exception {

handlers.add(handlerThrottled);

// Wait until we are ready for the next page
assertBusy(() -> assertTrue(nextPage.get()));

for (IncrementalBulkService.Handler h : handlers) {
refCounted.incRef();
PlainActionFuture<BulkResponse> future = new PlainActionFuture<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@ public static class Handler implements Releasable {
private boolean closed = false;
private boolean globalFailure = false;
private boolean incrementalRequestSubmitted = false;
private boolean bulkInProgress = false;
private ThreadContext.StoredContext requestContext;
private Exception bulkActionLevelFailure = null;
private long currentBulkSize = 0L;
Expand All @@ -130,6 +131,7 @@ protected Handler(

public void addItems(List<DocWriteRequest<?>> items, Releasable releasable, Runnable nextItems) {
assert closed == false;
assert bulkInProgress == false;
if (bulkActionLevelFailure != null) {
shortCircuitDueToTopLevelFailure(items, releasable);
nextItems.run();
Expand All @@ -143,6 +145,7 @@ public void addItems(List<DocWriteRequest<?>> items, Releasable releasable, Runn
requestContext.restore();
final ArrayList<Releasable> toRelease = new ArrayList<>(releasables);
releasables.clear();
bulkInProgress = true;
client.bulk(bulkRequest, ActionListener.runAfter(new ActionListener<>() {

@Override
Expand All @@ -158,6 +161,7 @@ public void onFailure(Exception e) {
handleBulkFailure(isFirstRequest, e);
}
}, () -> {
bulkInProgress = false;
requestContext = threadContext.newStoredContext();
toRelease.forEach(Releasable::close);
nextItems.run();
Expand All @@ -177,6 +181,7 @@ private boolean shouldBackOff() {
}

public void lastItems(List<DocWriteRequest<?>> items, Releasable releasable, ActionListener<BulkResponse> listener) {
assert bulkInProgress == false;
if (bulkActionLevelFailure != null) {
shortCircuitDueToTopLevelFailure(items, releasable);
errorResponse(listener);
Expand All @@ -187,7 +192,9 @@ public void lastItems(List<DocWriteRequest<?>> items, Releasable releasable, Act
requestContext.restore();
final ArrayList<Releasable> toRelease = new ArrayList<>(releasables);
releasables.clear();
client.bulk(bulkRequest, ActionListener.runBefore(new ActionListener<>() {
// We do not need to set this back to false as this will be the last request.
bulkInProgress = true;
client.bulk(bulkRequest, ActionListener.runAfter(new ActionListener<>() {

private final boolean isFirstRequest = incrementalRequestSubmitted == false;

Expand Down

0 comments on commit 52a4ab7

Please sign in to comment.