From 6a55b3ede632c9a24c138a41c70f46bab17b3130 Mon Sep 17 00:00:00 2001 From: Armin Date: Tue, 30 Apr 2019 19:52:25 +0200 Subject: [PATCH] Fix BulkProcessorRetryIT * Now that we process the bulk requests themselves on the WRITE threadpool, they can run out of retries too like the item requests even when backoff is active * Fixes #41324 by using the same logic that checks failed item requests for their retry status for the top level bulk requests as well --- .../client/BulkProcessorRetryIT.java | 38 +++++++++++-------- .../action/bulk/BulkProcessorRetryIT.java | 38 +++++++++++-------- 2 files changed, 46 insertions(+), 30 deletions(-) diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/BulkProcessorRetryIT.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/BulkProcessorRetryIT.java index c18c4363897e7..77877c46f1aee 100644 --- a/client/rest-high-level/src/test/java/org/elasticsearch/client/BulkProcessorRetryIT.java +++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/BulkProcessorRetryIT.java @@ -82,6 +82,7 @@ public void afterBulk(long executionId, BulkRequest request, BulkResponse respon @Override public void afterBulk(long executionId, BulkRequest request, Throwable failure) { + internalPolicy.logResponse(failure); responses.add(failure); latch.countDown(); } @@ -105,16 +106,8 @@ public void afterBulk(long executionId, BulkRequest request, Throwable failure) BulkItemResponse.Failure failure = bulkItemResponse.getFailure(); if (failure.getStatus() == RestStatus.TOO_MANY_REQUESTS) { if (rejectedExecutionExpected == false) { - Iterator backoffState = internalPolicy.backoffStateFor(bulkResponse); - assertNotNull("backoffState is null (indicates a bulk request got rejected without retry)", backoffState); - if (backoffState.hasNext()) { - // we're not expecting that we overwhelmed it even once when we maxed out the number of retries - throw new AssertionError("Got rejected although backoff policy would allow more retries", - failure.getCause()); - } else { - rejectedAfterAllRetries = true; - logger.debug("We maxed out the number of bulk retries and got rejected (this is ok)."); - } + assertRetriedCorrectly(internalPolicy, bulkResponse, failure.getCause()); + rejectedAfterAllRetries = true; } } else { throw new AssertionError("Unexpected failure with status: " + failure.getStatus()); @@ -123,8 +116,12 @@ public void afterBulk(long executionId, BulkRequest request, Throwable failure) } } else { if (response instanceof RemoteTransportException - && ((RemoteTransportException) response).status() == RestStatus.TOO_MANY_REQUESTS && rejectedExecutionExpected) { - // ignored, we exceeded the write queue size with dispatching the initial bulk request + && ((RemoteTransportException) response).status() == RestStatus.TOO_MANY_REQUESTS) { + if (rejectedExecutionExpected == false) { + assertRetriedCorrectly(internalPolicy, response, ((Throwable) response).getCause()); + rejectedAfterAllRetries = true; + } + // ignored, we exceeded the write queue size when dispatching the initial bulk request } else { Throwable t = (Throwable) response; // we're not expecting any other errors @@ -146,6 +143,17 @@ public void afterBulk(long executionId, BulkRequest request, Throwable failure) } + private void assertRetriedCorrectly(CorrelatingBackoffPolicy internalPolicy, Object bulkResponse, Throwable failure) { + Iterator backoffState = internalPolicy.backoffStateFor(bulkResponse); + assertNotNull("backoffState is null (indicates a bulk request got rejected without retry)", backoffState); + if (backoffState.hasNext()) { + // we're not expecting that we overwhelmed it even once when we maxed out the number of retries + throw new AssertionError("Got rejected although backoff policy would allow more retries", failure); + } else { + logger.debug("We maxed out the number of bulk retries and got rejected (this is ok)."); + } + } + private static MultiGetRequest indexDocs(BulkProcessor processor, int numDocs) { MultiGetRequest multiGetRequest = new MultiGetRequest(); for (int i = 1; i <= numDocs; i++) { @@ -164,7 +172,7 @@ private static MultiGetRequest indexDocs(BulkProcessor processor, int numDocs) { * as the last call to the backoff policy's iterator. The advantage is that this is non-invasive to the rest of the production code. */ private static class CorrelatingBackoffPolicy extends BackoffPolicy { - private final Map> correlations = new ConcurrentHashMap<>(); + private final Map> correlations = new ConcurrentHashMap<>(); // this is intentionally *not* static final. We will only ever have one instance of this class per test case and want the // thread local to be eligible for garbage collection right after the test to avoid leaks. private final ThreadLocal> iterators = new ThreadLocal<>(); @@ -175,13 +183,13 @@ private CorrelatingBackoffPolicy(BackoffPolicy delegate) { this.delegate = delegate; } - public Iterator backoffStateFor(BulkResponse response) { + public Iterator backoffStateFor(Object response) { return correlations.get(response); } // Assumption: This method is called from the same thread as the last call to the internal iterator's #hasNext() / #next() // see also Retry.AbstractRetryHandler#onResponse(). - public void logResponse(BulkResponse response) { + public void logResponse(Object response) { Iterator iterator = iterators.get(); // did we ever retry? if (iterator != null) { diff --git a/server/src/test/java/org/elasticsearch/action/bulk/BulkProcessorRetryIT.java b/server/src/test/java/org/elasticsearch/action/bulk/BulkProcessorRetryIT.java index e4b6fff9fc353..e7285ff6f97ed 100644 --- a/server/src/test/java/org/elasticsearch/action/bulk/BulkProcessorRetryIT.java +++ b/server/src/test/java/org/elasticsearch/action/bulk/BulkProcessorRetryIT.java @@ -93,6 +93,7 @@ public void afterBulk(long executionId, BulkRequest request, BulkResponse respon @Override public void afterBulk(long executionId, BulkRequest request, Throwable failure) { + internalPolicy.logResponse(failure); responses.add(failure); latch.countDown(); } @@ -117,16 +118,8 @@ public void afterBulk(long executionId, BulkRequest request, Throwable failure) BulkItemResponse.Failure failure = bulkItemResponse.getFailure(); if (failure.getStatus() == RestStatus.TOO_MANY_REQUESTS) { if (rejectedExecutionExpected == false) { - Iterator backoffState = internalPolicy.backoffStateFor(bulkResponse); - assertNotNull("backoffState is null (indicates a bulk request got rejected without retry)", backoffState); - if (backoffState.hasNext()) { - // we're not expecting that we overwhelmed it even once when we maxed out the number of retries - throw new AssertionError("Got rejected although backoff policy would allow more retries", - failure.getCause()); - } else { - rejectedAfterAllRetries = true; - logger.debug("We maxed out the number of bulk retries and got rejected (this is ok)."); - } + assertRetriedCorrectly(internalPolicy, bulkResponse, failure.getCause()); + rejectedAfterAllRetries = true; } } else { throw new AssertionError("Unexpected failure status: " + failure.getStatus()); @@ -135,8 +128,12 @@ public void afterBulk(long executionId, BulkRequest request, Throwable failure) } } else { if (response instanceof RemoteTransportException - && ((RemoteTransportException) response).status() == RestStatus.TOO_MANY_REQUESTS && rejectedExecutionExpected) { - // ignored, we exceeded the write queue size with dispatching the initial bulk request + && ((RemoteTransportException) response).status() == RestStatus.TOO_MANY_REQUESTS) { + if (rejectedExecutionExpected == false) { + assertRetriedCorrectly(internalPolicy, response, ((Throwable) response).getCause()); + rejectedAfterAllRetries = true; + } + // ignored, we exceeded the write queue size when dispatching the initial bulk request } else { Throwable t = (Throwable) response; // we're not expecting any other errors @@ -163,6 +160,17 @@ public void afterBulk(long executionId, BulkRequest request, Throwable failure) } } + private void assertRetriedCorrectly(CorrelatingBackoffPolicy internalPolicy, Object bulkResponse, Throwable failure) { + Iterator backoffState = internalPolicy.backoffStateFor(bulkResponse); + assertNotNull("backoffState is null (indicates a bulk request got rejected without retry)", backoffState); + if (backoffState.hasNext()) { + // we're not expecting that we overwhelmed it even once when we maxed out the number of retries + throw new AssertionError("Got rejected although backoff policy would allow more retries", failure); + } else { + logger.debug("We maxed out the number of bulk retries and got rejected (this is ok)."); + } + } + private static void indexDocs(BulkProcessor processor, int numDocs) { for (int i = 1; i <= numDocs; i++) { processor.add(client() @@ -183,7 +191,7 @@ private static void indexDocs(BulkProcessor processor, int numDocs) { * as the last call to the backoff policy's iterator. The advantage is that this is non-invasive to the rest of the production code. */ private static class CorrelatingBackoffPolicy extends BackoffPolicy { - private final Map> correlations = new ConcurrentHashMap<>(); + private final Map> correlations = new ConcurrentHashMap<>(); // this is intentionally *not* static final. We will only ever have one instance of this class per test case and want the // thread local to be eligible for garbage collection right after the test to avoid leaks. private final ThreadLocal> iterators = new ThreadLocal<>(); @@ -194,13 +202,13 @@ private CorrelatingBackoffPolicy(BackoffPolicy delegate) { this.delegate = delegate; } - public Iterator backoffStateFor(BulkResponse response) { + public Iterator backoffStateFor(Object response) { return correlations.get(response); } // Assumption: This method is called from the same thread as the last call to the internal iterator's #hasNext() / #next() // see also Retry.AbstractRetryHandler#onResponse(). - public void logResponse(BulkResponse response) { + public void logResponse(Object response) { Iterator iterator = iterators.get(); // did we ever retry? if (iterator != null) {