From 2fb855126f9405500cfab1891d14f01484b243f1 Mon Sep 17 00:00:00 2001 From: SuXingLee Date: Mon, 14 Oct 2019 23:53:35 +0800 Subject: [PATCH 1/2] Fix BulkProcessor deadlock when bulk requests fail #47599 --- .../action/bulk/BulkProcessor.java | 1 + .../action/bulk/BulkProcessorTests.java | 92 ++++++++++++++++++- .../elasticsearch/action/bulk/RetryTests.java | 2 +- 3 files changed, 93 insertions(+), 2 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/action/bulk/BulkProcessor.java b/server/src/main/java/org/elasticsearch/action/bulk/BulkProcessor.java index 08c42c5ea40de..7d83347e95090 100644 --- a/server/src/main/java/org/elasticsearch/action/bulk/BulkProcessor.java +++ b/server/src/main/java/org/elasticsearch/action/bulk/BulkProcessor.java @@ -204,6 +204,7 @@ public static Builder builder(BiConsumer Scheduler.terminate(scheduledThreadPoolExecutor, 10, TimeUnit.SECONDS)); diff --git a/server/src/test/java/org/elasticsearch/action/bulk/BulkProcessorTests.java b/server/src/test/java/org/elasticsearch/action/bulk/BulkProcessorTests.java index 6a58696534ed4..a207a2b297907 100644 --- a/server/src/test/java/org/elasticsearch/action/bulk/BulkProcessorTests.java +++ b/server/src/test/java/org/elasticsearch/action/bulk/BulkProcessorTests.java @@ -22,12 +22,16 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.elasticsearch.ExceptionsHelper; +import org.elasticsearch.action.bulk.BulkItemResponse.Failure; +import org.elasticsearch.action.DocWriteRequest.OpType; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.common.bytes.BytesReference; +import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.ByteSizeUnit; import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException; import org.elasticsearch.common.util.concurrent.ThreadContext; import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.test.ESTestCase; @@ -36,6 +40,8 @@ import org.elasticsearch.threadpool.ThreadPool; import org.junit.After; import org.junit.Before; +import org.junit.Rule; +import org.junit.rules.ExpectedException; import java.nio.ByteBuffer; import java.nio.charset.StandardCharsets; @@ -46,7 +52,9 @@ import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; @@ -54,9 +62,15 @@ public class BulkProcessorTests extends ESTestCase { - private ThreadPool threadPool; private final Logger logger = LogManager.getLogger(BulkProcessorTests.class); + private ThreadPool threadPool; + private ExecutorService asyncExec = Executors.newFixedThreadPool(1); + private ExecutorService userExec = Executors.newFixedThreadPool(1); + private ScheduledThreadPoolExecutor scheduleThreadPoolExecutor = Scheduler.initScheduler(Settings.EMPTY); + @Rule + public ExpectedException exception = ExpectedException.none(); + @Before public void startThreadPool() { threadPool = new TestThreadPool("BulkProcessorTests"); @@ -65,6 +79,82 @@ public void startThreadPool() { @After public void stopThreadPool() throws InterruptedException { terminate(threadPool); + asyncExec.shutdown(); + userExec.shutdownNow(); + scheduleThreadPoolExecutor.shutdownNow(); + } + + public void testBulkProcessorThreadBlocked() throws Exception { + exception.expect(TimeoutException.class); + Future future = buildAndExecuteBulkProcessor(initScheduler(1)); + future.get(8, TimeUnit.SECONDS);// thread has been blocked, the IndexRequest cannot be successfully executed. + } + + public void testBulkProcessorThread() throws Exception { + Future future = buildAndExecuteBulkProcessor(initScheduler(2)); + assertNull(future.get(4, TimeUnit.SECONDS));//the IndexRequest executed successfully. + } + + private Scheduler initScheduler(int corePoolSize) { + scheduleThreadPoolExecutor.setCorePoolSize(corePoolSize); + return (command, delay, executor) -> + Scheduler.wrapAsScheduledCancellable(scheduleThreadPoolExecutor.schedule(command, delay.millis(), TimeUnit.MILLISECONDS)); + } + + private Future buildAndExecuteBulkProcessor(Scheduler scheduler) throws InterruptedException { + CountDownLatch latch = new CountDownLatch(1); + final int concurrentRequests = 0; + final int bulkActions = 4; + final TimeValue flushInterval = TimeValue.timeValueMillis(1000L); + final BackoffPolicy backoff = BackoffPolicy.constantBackoff(TimeValue.timeValueMillis(100L), 1); + final ByteSizeValue bulkSize = new ByteSizeValue(5, ByteSizeUnit.MB); + BulkProcessor bulkProcessor = new BulkProcessor(bulkAsync(latch), backoff, emptyListener(), + concurrentRequests, bulkActions, bulkSize, flushInterval, + scheduler, null, BulkRequest::new); + + bulkProcessor.add(new IndexRequest()); + bulkProcessor.add(new IndexRequest()); + bulkProcessor.add(new IndexRequest()); + Thread.sleep(2000L); + latch.countDown(); + Future future = userExec.submit(() -> { + bulkProcessor.add(new IndexRequest()); + bulkProcessor.add(new IndexRequest()); + bulkProcessor.add(new IndexRequest()); + bulkProcessor.add(new IndexRequest()); + }); + + + return future; + } + + private BiConsumer> bulkAsync(CountDownLatch latch) { + return (request, listener) -> + { + // Step-3: retry of bulk request by using scheduler thread + // Due to step-2, scheduler thread is already occupied, causing the retry policy to not be executed + // Causes the lock and latch in step-1 not to be released + asyncExec.execute(() -> { + try { + latch.await(); + listener.onResponse(createBulkResponse()); + } catch (InterruptedException e) { + throw ExceptionsHelper.convertToRuntime(e); + } + }); + }; + } + + private BulkResponse createBulkResponse() { + EsRejectedExecutionException exception =new EsRejectedExecutionException(); + Failure failure = new Failure("", "", "", exception, ExceptionsHelper.status(exception)); + BulkItemResponse[] bulkActionResponses = new BulkItemResponse[] { + new BulkItemResponse(), + new BulkItemResponse(), + new BulkItemResponse(3, OpType.INDEX, failure) + }; + BulkResponse bulkResponse = new BulkResponse(bulkActionResponses, 3000L); + return bulkResponse; } public void testBulkProcessorFlushPreservesContext() throws InterruptedException { diff --git a/server/src/test/java/org/elasticsearch/action/bulk/RetryTests.java b/server/src/test/java/org/elasticsearch/action/bulk/RetryTests.java index f5d881e2b04a6..6fa6b1354f1ab 100644 --- a/server/src/test/java/org/elasticsearch/action/bulk/RetryTests.java +++ b/server/src/test/java/org/elasticsearch/action/bulk/RetryTests.java @@ -41,7 +41,7 @@ import static org.hamcrest.Matchers.nullValue; public class RetryTests extends ESTestCase { - // no need to wait fof a long time in tests + // no need to wait for a long time in tests private static final TimeValue DELAY = TimeValue.timeValueMillis(1L); private static final int CALLS_TO_FAIL = 5; From 35cd5d9eaac349d3005ed4f8ed606cb433c495f9 Mon Sep 17 00:00:00 2001 From: SuXingLee Date: Tue, 15 Oct 2019 00:13:33 +0800 Subject: [PATCH 2/2] Fix BulkProcessor deadlock when bulk requests fail #47599 --- .../elasticsearch/action/bulk/BulkProcessorTests.java | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/server/src/test/java/org/elasticsearch/action/bulk/BulkProcessorTests.java b/server/src/test/java/org/elasticsearch/action/bulk/BulkProcessorTests.java index a207a2b297907..32a45ed2a203b 100644 --- a/server/src/test/java/org/elasticsearch/action/bulk/BulkProcessorTests.java +++ b/server/src/test/java/org/elasticsearch/action/bulk/BulkProcessorTests.java @@ -115,13 +115,14 @@ private Future buildAndExecuteBulkProcessor(Scheduler scheduler) throws Inter bulkProcessor.add(new IndexRequest()); bulkProcessor.add(new IndexRequest()); bulkProcessor.add(new IndexRequest()); + // Step-1: different from #46790, here we first execute `Flush` method Thread.sleep(2000L); latch.countDown(); Future future = userExec.submit(() -> { bulkProcessor.add(new IndexRequest()); bulkProcessor.add(new IndexRequest()); bulkProcessor.add(new IndexRequest()); - bulkProcessor.add(new IndexRequest()); + bulkProcessor.add(new IndexRequest());// Step-3: Due to step-1 is not completed,here we wait for ReentrantLock }); @@ -131,9 +132,9 @@ private Future buildAndExecuteBulkProcessor(Scheduler scheduler) throws Inter private BiConsumer> bulkAsync(CountDownLatch latch) { return (request, listener) -> { - // Step-3: retry of bulk request by using scheduler thread - // Due to step-2, scheduler thread is already occupied, causing the retry policy to not be executed - // Causes the lock and latch in step-1 not to be released + // Step-2: retry of bulk request by using scheduler thread + // Due to step-1, scheduler thread is already occupied, causing the retry policy to not be executed + // latch in step-1 not to be released asyncExec.execute(() -> { try { latch.await();