Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix BulkProcessorRetryIT #41700

Merged
merged 1 commit into from
May 1, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ public void afterBulk(long executionId, BulkRequest request, BulkResponse respon

@Override
public void afterBulk(long executionId, BulkRequest request, Throwable failure) {
internalPolicy.logResponse(failure);
responses.add(failure);
latch.countDown();
}
Expand All @@ -105,16 +106,8 @@ public void afterBulk(long executionId, BulkRequest request, Throwable failure)
BulkItemResponse.Failure failure = bulkItemResponse.getFailure();
if (failure.getStatus() == RestStatus.TOO_MANY_REQUESTS) {
if (rejectedExecutionExpected == false) {
Iterator<TimeValue> backoffState = internalPolicy.backoffStateFor(bulkResponse);
assertNotNull("backoffState is null (indicates a bulk request got rejected without retry)", backoffState);
if (backoffState.hasNext()) {
// we're not expecting that we overwhelmed it even once when we maxed out the number of retries
throw new AssertionError("Got rejected although backoff policy would allow more retries",
failure.getCause());
} else {
rejectedAfterAllRetries = true;
logger.debug("We maxed out the number of bulk retries and got rejected (this is ok).");
}
assertRetriedCorrectly(internalPolicy, bulkResponse, failure.getCause());
rejectedAfterAllRetries = true;
}
} else {
throw new AssertionError("Unexpected failure with status: " + failure.getStatus());
Expand All @@ -123,8 +116,12 @@ public void afterBulk(long executionId, BulkRequest request, Throwable failure)
}
} else {
if (response instanceof RemoteTransportException
&& ((RemoteTransportException) response).status() == RestStatus.TOO_MANY_REQUESTS && rejectedExecutionExpected) {
// ignored, we exceeded the write queue size with dispatching the initial bulk request
&& ((RemoteTransportException) response).status() == RestStatus.TOO_MANY_REQUESTS) {
if (rejectedExecutionExpected == false) {
assertRetriedCorrectly(internalPolicy, response, ((Throwable) response).getCause());
rejectedAfterAllRetries = true;
}
// ignored, we exceeded the write queue size when dispatching the initial bulk request
} else {
Throwable t = (Throwable) response;
// we're not expecting any other errors
Expand All @@ -146,6 +143,17 @@ public void afterBulk(long executionId, BulkRequest request, Throwable failure)

}

private void assertRetriedCorrectly(CorrelatingBackoffPolicy internalPolicy, Object bulkResponse, Throwable failure) {
Iterator<TimeValue> backoffState = internalPolicy.backoffStateFor(bulkResponse);
assertNotNull("backoffState is null (indicates a bulk request got rejected without retry)", backoffState);
if (backoffState.hasNext()) {
// we're not expecting that we overwhelmed it even once when we maxed out the number of retries
throw new AssertionError("Got rejected although backoff policy would allow more retries", failure);
} else {
logger.debug("We maxed out the number of bulk retries and got rejected (this is ok).");
}
}

private static MultiGetRequest indexDocs(BulkProcessor processor, int numDocs) {
MultiGetRequest multiGetRequest = new MultiGetRequest();
for (int i = 1; i <= numDocs; i++) {
Expand All @@ -164,7 +172,7 @@ private static MultiGetRequest indexDocs(BulkProcessor processor, int numDocs) {
* as the last call to the backoff policy's iterator. The advantage is that this is non-invasive to the rest of the production code.
*/
private static class CorrelatingBackoffPolicy extends BackoffPolicy {
private final Map<BulkResponse, Iterator<TimeValue>> correlations = new ConcurrentHashMap<>();
private final Map<Object, Iterator<TimeValue>> correlations = new ConcurrentHashMap<>();
// this is intentionally *not* static final. We will only ever have one instance of this class per test case and want the
// thread local to be eligible for garbage collection right after the test to avoid leaks.
private final ThreadLocal<Iterator<TimeValue>> iterators = new ThreadLocal<>();
Expand All @@ -175,13 +183,13 @@ private CorrelatingBackoffPolicy(BackoffPolicy delegate) {
this.delegate = delegate;
}

public Iterator<TimeValue> backoffStateFor(BulkResponse response) {
public Iterator<TimeValue> backoffStateFor(Object response) {
return correlations.get(response);
}

// Assumption: This method is called from the same thread as the last call to the internal iterator's #hasNext() / #next()
// see also Retry.AbstractRetryHandler#onResponse().
public void logResponse(BulkResponse response) {
public void logResponse(Object response) {
Iterator<TimeValue> iterator = iterators.get();
// did we ever retry?
if (iterator != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ public void afterBulk(long executionId, BulkRequest request, BulkResponse respon

@Override
public void afterBulk(long executionId, BulkRequest request, Throwable failure) {
internalPolicy.logResponse(failure);
responses.add(failure);
latch.countDown();
}
Expand All @@ -117,16 +118,8 @@ public void afterBulk(long executionId, BulkRequest request, Throwable failure)
BulkItemResponse.Failure failure = bulkItemResponse.getFailure();
if (failure.getStatus() == RestStatus.TOO_MANY_REQUESTS) {
if (rejectedExecutionExpected == false) {
Iterator<TimeValue> backoffState = internalPolicy.backoffStateFor(bulkResponse);
assertNotNull("backoffState is null (indicates a bulk request got rejected without retry)", backoffState);
if (backoffState.hasNext()) {
// we're not expecting that we overwhelmed it even once when we maxed out the number of retries
throw new AssertionError("Got rejected although backoff policy would allow more retries",
failure.getCause());
} else {
rejectedAfterAllRetries = true;
logger.debug("We maxed out the number of bulk retries and got rejected (this is ok).");
}
assertRetriedCorrectly(internalPolicy, bulkResponse, failure.getCause());
rejectedAfterAllRetries = true;
}
} else {
throw new AssertionError("Unexpected failure status: " + failure.getStatus());
Expand All @@ -135,8 +128,12 @@ public void afterBulk(long executionId, BulkRequest request, Throwable failure)
}
} else {
if (response instanceof RemoteTransportException
&& ((RemoteTransportException) response).status() == RestStatus.TOO_MANY_REQUESTS && rejectedExecutionExpected) {
// ignored, we exceeded the write queue size with dispatching the initial bulk request
&& ((RemoteTransportException) response).status() == RestStatus.TOO_MANY_REQUESTS) {
if (rejectedExecutionExpected == false) {
assertRetriedCorrectly(internalPolicy, response, ((Throwable) response).getCause());
rejectedAfterAllRetries = true;
}
// ignored, we exceeded the write queue size when dispatching the initial bulk request
} else {
Throwable t = (Throwable) response;
// we're not expecting any other errors
Expand All @@ -163,6 +160,17 @@ public void afterBulk(long executionId, BulkRequest request, Throwable failure)
}
}

private void assertRetriedCorrectly(CorrelatingBackoffPolicy internalPolicy, Object bulkResponse, Throwable failure) {
Iterator<TimeValue> backoffState = internalPolicy.backoffStateFor(bulkResponse);
assertNotNull("backoffState is null (indicates a bulk request got rejected without retry)", backoffState);
if (backoffState.hasNext()) {
// we're not expecting that we overwhelmed it even once when we maxed out the number of retries
throw new AssertionError("Got rejected although backoff policy would allow more retries", failure);
} else {
logger.debug("We maxed out the number of bulk retries and got rejected (this is ok).");
}
}

private static void indexDocs(BulkProcessor processor, int numDocs) {
for (int i = 1; i <= numDocs; i++) {
processor.add(client()
Expand All @@ -183,7 +191,7 @@ private static void indexDocs(BulkProcessor processor, int numDocs) {
* as the last call to the backoff policy's iterator. The advantage is that this is non-invasive to the rest of the production code.
*/
private static class CorrelatingBackoffPolicy extends BackoffPolicy {
private final Map<BulkResponse, Iterator<TimeValue>> correlations = new ConcurrentHashMap<>();
private final Map<Object, Iterator<TimeValue>> correlations = new ConcurrentHashMap<>();
// this is intentionally *not* static final. We will only ever have one instance of this class per test case and want the
// thread local to be eligible for garbage collection right after the test to avoid leaks.
private final ThreadLocal<Iterator<TimeValue>> iterators = new ThreadLocal<>();
Expand All @@ -194,13 +202,13 @@ private CorrelatingBackoffPolicy(BackoffPolicy delegate) {
this.delegate = delegate;
}

public Iterator<TimeValue> backoffStateFor(BulkResponse response) {
public Iterator<TimeValue> backoffStateFor(Object response) {
return correlations.get(response);
}

// Assumption: This method is called from the same thread as the last call to the internal iterator's #hasNext() / #next()
// see also Retry.AbstractRetryHandler#onResponse().
public void logResponse(BulkResponse response) {
public void logResponse(Object response) {
Iterator<TimeValue> iterator = iterators.get();
// did we ever retry?
if (iterator != null) {
Expand Down