Skip to content

Commit

Permalink
Fix BulkProcessor Retry ITs (elastic#41338)
Browse files Browse the repository at this point in the history
* The test fails for the retry backoff enabled case because the retry handler in the bulk processor hasn't been adjusted to account for elastic#40866 which now might lead to an outright rejection of the request instead of its items individually
   * Fixed by adding retry functionality to the top level request as well
* Also fixed the duplicate test for the HLRC that wasn't handling the non-backoff case yet the same way the non-client IT did
* closes elastic#41324
  • Loading branch information
original-brownbear authored and Gurkan Kaymak committed May 27, 2019
1 parent ada4164 commit 89df76d
Show file tree
Hide file tree
Showing 3 changed files with 19 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.transport.RemoteTransportException;

import java.util.Collections;
import java.util.Iterator;
Expand Down Expand Up @@ -56,7 +57,6 @@ public void testBulkRejectionLoadWithoutBackoff() throws Exception {
executeBulkRejectionLoad(BackoffPolicy.noBackoff(), rejectedExecutionExpected);
}

@AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/41324")
public void testBulkRejectionLoadWithBackoff() throws Throwable {
boolean rejectedExecutionExpected = false;
executeBulkRejectionLoad(BackoffPolicy.exponentialBackoff(), rejectedExecutionExpected);
Expand Down Expand Up @@ -122,9 +122,14 @@ public void afterBulk(long executionId, BulkRequest request, Throwable failure)
}
}
} else {
Throwable t = (Throwable) response;
// we're not expecting any other errors
throw new AssertionError("Unexpected failure", t);
if (response instanceof RemoteTransportException
&& ((RemoteTransportException) response).status() == RestStatus.TOO_MANY_REQUESTS && rejectedExecutionExpected) {
// ignored, we exceeded the write queue size with dispatching the initial bulk request
} else {
Throwable t = (Throwable) response;
// we're not expecting any other errors
throw new AssertionError("Unexpected failure", t);
}
}
}

Expand Down
15 changes: 10 additions & 5 deletions server/src/main/java/org/elasticsearch/action/bulk/Retry.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.threadpool.Scheduler;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.RemoteTransportException;

import java.util.ArrayList;
import java.util.Iterator;
Expand Down Expand Up @@ -118,11 +119,15 @@ public void onResponse(BulkResponse bulkItemResponses) {

@Override
public void onFailure(Exception e) {
try {
listener.onFailure(e);
} finally {
if (retryCancellable != null) {
retryCancellable.cancel();
if (e instanceof RemoteTransportException && ((RemoteTransportException) e).status() == RETRY_STATUS && backoff.hasNext()) {
retry(currentBulkRequest);
} else {
try {
listener.onFailure(e);
} finally {
if (retryCancellable != null) {
retryCancellable.cancel();
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,6 @@ public void testBulkRejectionLoadWithoutBackoff() throws Throwable {
executeBulkRejectionLoad(BackoffPolicy.noBackoff(), rejectedExecutionExpected);
}

@AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/41324")
public void testBulkRejectionLoadWithBackoff() throws Throwable {
boolean rejectedExecutionExpected = false;
executeBulkRejectionLoad(BackoffPolicy.exponentialBackoff(), rejectedExecutionExpected);
Expand Down

0 comments on commit 89df76d

Please sign in to comment.