From 52a4ab75a291170c3d8a1d3e8fe38a8c26bd2957 Mon Sep 17 00:00:00 2001 From: Tim Brooks Date: Fri, 4 Oct 2024 14:45:09 -0600 Subject: [PATCH] Fix high water mark test which submits data early (#114159) It is possible in the incremental high watermark test that the data is submitted causing a corruption of the bulk request. This commit fixes the issue to ensure we only send new data after it has been requested. Additionally, it adds an assertion to prevent this error from happening again. --- .../org/elasticsearch/action/bulk/IncrementalBulkIT.java | 3 +++ .../action/bulk/IncrementalBulkService.java | 9 ++++++++- 2 files changed, 11 insertions(+), 1 deletion(-) diff --git a/server/src/internalClusterTest/java/org/elasticsearch/action/bulk/IncrementalBulkIT.java b/server/src/internalClusterTest/java/org/elasticsearch/action/bulk/IncrementalBulkIT.java index 54a9e089551a4..c4edbe0185541 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/action/bulk/IncrementalBulkIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/action/bulk/IncrementalBulkIT.java @@ -234,6 +234,9 @@ public void testIncrementalBulkHighWatermarkBackOff() throws Exception { handlers.add(handlerThrottled); + // Wait until we are ready for the next page + assertBusy(() -> assertTrue(nextPage.get())); + for (IncrementalBulkService.Handler h : handlers) { refCounted.incRef(); PlainActionFuture future = new PlainActionFuture<>(); diff --git a/server/src/main/java/org/elasticsearch/action/bulk/IncrementalBulkService.java b/server/src/main/java/org/elasticsearch/action/bulk/IncrementalBulkService.java index 25e58a82f8e8b..8f36fed380c48 100644 --- a/server/src/main/java/org/elasticsearch/action/bulk/IncrementalBulkService.java +++ b/server/src/main/java/org/elasticsearch/action/bulk/IncrementalBulkService.java @@ -105,6 +105,7 @@ public static class Handler implements Releasable { private boolean closed = false; private boolean globalFailure = false; private boolean incrementalRequestSubmitted = false; + private boolean bulkInProgress = false; private ThreadContext.StoredContext requestContext; private Exception bulkActionLevelFailure = null; private long currentBulkSize = 0L; @@ -130,6 +131,7 @@ protected Handler( public void addItems(List> items, Releasable releasable, Runnable nextItems) { assert closed == false; + assert bulkInProgress == false; if (bulkActionLevelFailure != null) { shortCircuitDueToTopLevelFailure(items, releasable); nextItems.run(); @@ -143,6 +145,7 @@ public void addItems(List> items, Releasable releasable, Runn requestContext.restore(); final ArrayList toRelease = new ArrayList<>(releasables); releasables.clear(); + bulkInProgress = true; client.bulk(bulkRequest, ActionListener.runAfter(new ActionListener<>() { @Override @@ -158,6 +161,7 @@ public void onFailure(Exception e) { handleBulkFailure(isFirstRequest, e); } }, () -> { + bulkInProgress = false; requestContext = threadContext.newStoredContext(); toRelease.forEach(Releasable::close); nextItems.run(); @@ -177,6 +181,7 @@ private boolean shouldBackOff() { } public void lastItems(List> items, Releasable releasable, ActionListener listener) { + assert bulkInProgress == false; if (bulkActionLevelFailure != null) { shortCircuitDueToTopLevelFailure(items, releasable); errorResponse(listener); @@ -187,7 +192,9 @@ public void lastItems(List> items, Releasable releasable, Act requestContext.restore(); final ArrayList toRelease = new ArrayList<>(releasables); releasables.clear(); - client.bulk(bulkRequest, ActionListener.runBefore(new ActionListener<>() { + // We do not need to set this back to false as this will be the last request. + bulkInProgress = true; + client.bulk(bulkRequest, ActionListener.runAfter(new ActionListener<>() { private final boolean isFirstRequest = incrementalRequestSubmitted == false;