Skip to content

Commit

Permalink
Fix BulkProcessorRetryIT (elastic#41700)
Browse files Browse the repository at this point in the history
* Now that we process the bulk requests themselves on the WRITE threadpool, they can run out of retries too like the item requests even when backoff is active
* Fixes elastic#41324 by using the same logic that checks failed item requests for their retry status for the top level bulk requests as well
  • Loading branch information
original-brownbear authored and Gurkan Kaymak committed May 27, 2019
1 parent 00b7c04 commit 82dd2b6
Show file tree
Hide file tree
Showing 2 changed files with 46 additions and 30 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ public void afterBulk(long executionId, BulkRequest request, BulkResponse respon

@Override
public void afterBulk(long executionId, BulkRequest request, Throwable failure) {
internalPolicy.logResponse(failure);
responses.add(failure);
latch.countDown();
}
Expand All @@ -105,16 +106,8 @@ public void afterBulk(long executionId, BulkRequest request, Throwable failure)
BulkItemResponse.Failure failure = bulkItemResponse.getFailure();
if (failure.getStatus() == RestStatus.TOO_MANY_REQUESTS) {
if (rejectedExecutionExpected == false) {
Iterator<TimeValue> backoffState = internalPolicy.backoffStateFor(bulkResponse);
assertNotNull("backoffState is null (indicates a bulk request got rejected without retry)", backoffState);
if (backoffState.hasNext()) {
// we're not expecting that we overwhelmed it even once when we maxed out the number of retries
throw new AssertionError("Got rejected although backoff policy would allow more retries",
failure.getCause());
} else {
rejectedAfterAllRetries = true;
logger.debug("We maxed out the number of bulk retries and got rejected (this is ok).");
}
assertRetriedCorrectly(internalPolicy, bulkResponse, failure.getCause());
rejectedAfterAllRetries = true;
}
} else {
throw new AssertionError("Unexpected failure with status: " + failure.getStatus());
Expand All @@ -123,8 +116,12 @@ public void afterBulk(long executionId, BulkRequest request, Throwable failure)
}
} else {
if (response instanceof RemoteTransportException
&& ((RemoteTransportException) response).status() == RestStatus.TOO_MANY_REQUESTS && rejectedExecutionExpected) {
// ignored, we exceeded the write queue size with dispatching the initial bulk request
&& ((RemoteTransportException) response).status() == RestStatus.TOO_MANY_REQUESTS) {
if (rejectedExecutionExpected == false) {
assertRetriedCorrectly(internalPolicy, response, ((Throwable) response).getCause());
rejectedAfterAllRetries = true;
}
// ignored, we exceeded the write queue size when dispatching the initial bulk request
} else {
Throwable t = (Throwable) response;
// we're not expecting any other errors
Expand All @@ -146,6 +143,17 @@ public void afterBulk(long executionId, BulkRequest request, Throwable failure)

}

private void assertRetriedCorrectly(CorrelatingBackoffPolicy internalPolicy, Object bulkResponse, Throwable failure) {
Iterator<TimeValue> backoffState = internalPolicy.backoffStateFor(bulkResponse);
assertNotNull("backoffState is null (indicates a bulk request got rejected without retry)", backoffState);
if (backoffState.hasNext()) {
// we're not expecting that we overwhelmed it even once when we maxed out the number of retries
throw new AssertionError("Got rejected although backoff policy would allow more retries", failure);
} else {
logger.debug("We maxed out the number of bulk retries and got rejected (this is ok).");
}
}

private static MultiGetRequest indexDocs(BulkProcessor processor, int numDocs) {
MultiGetRequest multiGetRequest = new MultiGetRequest();
for (int i = 1; i <= numDocs; i++) {
Expand All @@ -164,7 +172,7 @@ private static MultiGetRequest indexDocs(BulkProcessor processor, int numDocs) {
* as the last call to the backoff policy's iterator. The advantage is that this is non-invasive to the rest of the production code.
*/
private static class CorrelatingBackoffPolicy extends BackoffPolicy {
private final Map<BulkResponse, Iterator<TimeValue>> correlations = new ConcurrentHashMap<>();
private final Map<Object, Iterator<TimeValue>> correlations = new ConcurrentHashMap<>();
// this is intentionally *not* static final. We will only ever have one instance of this class per test case and want the
// thread local to be eligible for garbage collection right after the test to avoid leaks.
private final ThreadLocal<Iterator<TimeValue>> iterators = new ThreadLocal<>();
Expand All @@ -175,13 +183,13 @@ private CorrelatingBackoffPolicy(BackoffPolicy delegate) {
this.delegate = delegate;
}

public Iterator<TimeValue> backoffStateFor(BulkResponse response) {
public Iterator<TimeValue> backoffStateFor(Object response) {
return correlations.get(response);
}

// Assumption: This method is called from the same thread as the last call to the internal iterator's #hasNext() / #next()
// see also Retry.AbstractRetryHandler#onResponse().
public void logResponse(BulkResponse response) {
public void logResponse(Object response) {
Iterator<TimeValue> iterator = iterators.get();
// did we ever retry?
if (iterator != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ public void afterBulk(long executionId, BulkRequest request, BulkResponse respon

@Override
public void afterBulk(long executionId, BulkRequest request, Throwable failure) {
internalPolicy.logResponse(failure);
responses.add(failure);
latch.countDown();
}
Expand All @@ -117,16 +118,8 @@ public void afterBulk(long executionId, BulkRequest request, Throwable failure)
BulkItemResponse.Failure failure = bulkItemResponse.getFailure();
if (failure.getStatus() == RestStatus.TOO_MANY_REQUESTS) {
if (rejectedExecutionExpected == false) {
Iterator<TimeValue> backoffState = internalPolicy.backoffStateFor(bulkResponse);
assertNotNull("backoffState is null (indicates a bulk request got rejected without retry)", backoffState);
if (backoffState.hasNext()) {
// we're not expecting that we overwhelmed it even once when we maxed out the number of retries
throw new AssertionError("Got rejected although backoff policy would allow more retries",
failure.getCause());
} else {
rejectedAfterAllRetries = true;
logger.debug("We maxed out the number of bulk retries and got rejected (this is ok).");
}
assertRetriedCorrectly(internalPolicy, bulkResponse, failure.getCause());
rejectedAfterAllRetries = true;
}
} else {
throw new AssertionError("Unexpected failure status: " + failure.getStatus());
Expand All @@ -135,8 +128,12 @@ public void afterBulk(long executionId, BulkRequest request, Throwable failure)
}
} else {
if (response instanceof RemoteTransportException
&& ((RemoteTransportException) response).status() == RestStatus.TOO_MANY_REQUESTS && rejectedExecutionExpected) {
// ignored, we exceeded the write queue size with dispatching the initial bulk request
&& ((RemoteTransportException) response).status() == RestStatus.TOO_MANY_REQUESTS) {
if (rejectedExecutionExpected == false) {
assertRetriedCorrectly(internalPolicy, response, ((Throwable) response).getCause());
rejectedAfterAllRetries = true;
}
// ignored, we exceeded the write queue size when dispatching the initial bulk request
} else {
Throwable t = (Throwable) response;
// we're not expecting any other errors
Expand All @@ -163,6 +160,17 @@ public void afterBulk(long executionId, BulkRequest request, Throwable failure)
}
}

private void assertRetriedCorrectly(CorrelatingBackoffPolicy internalPolicy, Object bulkResponse, Throwable failure) {
Iterator<TimeValue> backoffState = internalPolicy.backoffStateFor(bulkResponse);
assertNotNull("backoffState is null (indicates a bulk request got rejected without retry)", backoffState);
if (backoffState.hasNext()) {
// we're not expecting that we overwhelmed it even once when we maxed out the number of retries
throw new AssertionError("Got rejected although backoff policy would allow more retries", failure);
} else {
logger.debug("We maxed out the number of bulk retries and got rejected (this is ok).");
}
}

private static void indexDocs(BulkProcessor processor, int numDocs) {
for (int i = 1; i <= numDocs; i++) {
processor.add(client()
Expand All @@ -183,7 +191,7 @@ private static void indexDocs(BulkProcessor processor, int numDocs) {
* as the last call to the backoff policy's iterator. The advantage is that this is non-invasive to the rest of the production code.
*/
private static class CorrelatingBackoffPolicy extends BackoffPolicy {
private final Map<BulkResponse, Iterator<TimeValue>> correlations = new ConcurrentHashMap<>();
private final Map<Object, Iterator<TimeValue>> correlations = new ConcurrentHashMap<>();
// this is intentionally *not* static final. We will only ever have one instance of this class per test case and want the
// thread local to be eligible for garbage collection right after the test to avoid leaks.
private final ThreadLocal<Iterator<TimeValue>> iterators = new ThreadLocal<>();
Expand All @@ -194,13 +202,13 @@ private CorrelatingBackoffPolicy(BackoffPolicy delegate) {
this.delegate = delegate;
}

public Iterator<TimeValue> backoffStateFor(BulkResponse response) {
public Iterator<TimeValue> backoffStateFor(Object response) {
return correlations.get(response);
}

// Assumption: This method is called from the same thread as the last call to the internal iterator's #hasNext() / #next()
// see also Retry.AbstractRetryHandler#onResponse().
public void logResponse(BulkResponse response) {
public void logResponse(Object response) {
Iterator<TimeValue> iterator = iterators.get();
// did we ever retry?
if (iterator != null) {
Expand Down

0 comments on commit 82dd2b6

Please sign in to comment.