diff --git a/server/src/main/java/org/elasticsearch/action/bulk/BulkProcessor.java b/server/src/main/java/org/elasticsearch/action/bulk/BulkProcessor.java index 08c42c5ea40de..7d83347e95090 100644 --- a/server/src/main/java/org/elasticsearch/action/bulk/BulkProcessor.java +++ b/server/src/main/java/org/elasticsearch/action/bulk/BulkProcessor.java @@ -204,6 +204,7 @@ public static Builder builder(BiConsumer Scheduler.terminate(scheduledThreadPoolExecutor, 10, TimeUnit.SECONDS)); diff --git a/server/src/test/java/org/elasticsearch/action/bulk/BulkProcessorTests.java b/server/src/test/java/org/elasticsearch/action/bulk/BulkProcessorTests.java index 6a58696534ed4..32a45ed2a203b 100644 --- a/server/src/test/java/org/elasticsearch/action/bulk/BulkProcessorTests.java +++ b/server/src/test/java/org/elasticsearch/action/bulk/BulkProcessorTests.java @@ -22,12 +22,16 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.elasticsearch.ExceptionsHelper; +import org.elasticsearch.action.bulk.BulkItemResponse.Failure; +import org.elasticsearch.action.DocWriteRequest.OpType; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.common.bytes.BytesReference; +import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.ByteSizeUnit; import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException; import org.elasticsearch.common.util.concurrent.ThreadContext; import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.test.ESTestCase; @@ -36,6 +40,8 @@ import org.elasticsearch.threadpool.ThreadPool; import org.junit.After; import org.junit.Before; +import org.junit.Rule; +import org.junit.rules.ExpectedException; import java.nio.ByteBuffer; import java.nio.charset.StandardCharsets; @@ -46,7 +52,9 @@ import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; @@ -54,9 +62,15 @@ public class BulkProcessorTests extends ESTestCase { - private ThreadPool threadPool; private final Logger logger = LogManager.getLogger(BulkProcessorTests.class); + private ThreadPool threadPool; + private ExecutorService asyncExec = Executors.newFixedThreadPool(1); + private ExecutorService userExec = Executors.newFixedThreadPool(1); + private ScheduledThreadPoolExecutor scheduleThreadPoolExecutor = Scheduler.initScheduler(Settings.EMPTY); + @Rule + public ExpectedException exception = ExpectedException.none(); + @Before public void startThreadPool() { threadPool = new TestThreadPool("BulkProcessorTests"); @@ -65,6 +79,83 @@ public void startThreadPool() { @After public void stopThreadPool() throws InterruptedException { terminate(threadPool); + asyncExec.shutdown(); + userExec.shutdownNow(); + scheduleThreadPoolExecutor.shutdownNow(); + } + + public void testBulkProcessorThreadBlocked() throws Exception { + exception.expect(TimeoutException.class); + Future future = buildAndExecuteBulkProcessor(initScheduler(1)); + future.get(8, TimeUnit.SECONDS);// thread has been blocked, the IndexRequest cannot be successfully executed. + } + + public void testBulkProcessorThread() throws Exception { + Future future = buildAndExecuteBulkProcessor(initScheduler(2)); + assertNull(future.get(4, TimeUnit.SECONDS));//the IndexRequest executed successfully. + } + + private Scheduler initScheduler(int corePoolSize) { + scheduleThreadPoolExecutor.setCorePoolSize(corePoolSize); + return (command, delay, executor) -> + Scheduler.wrapAsScheduledCancellable(scheduleThreadPoolExecutor.schedule(command, delay.millis(), TimeUnit.MILLISECONDS)); + } + + private Future buildAndExecuteBulkProcessor(Scheduler scheduler) throws InterruptedException { + CountDownLatch latch = new CountDownLatch(1); + final int concurrentRequests = 0; + final int bulkActions = 4; + final TimeValue flushInterval = TimeValue.timeValueMillis(1000L); + final BackoffPolicy backoff = BackoffPolicy.constantBackoff(TimeValue.timeValueMillis(100L), 1); + final ByteSizeValue bulkSize = new ByteSizeValue(5, ByteSizeUnit.MB); + BulkProcessor bulkProcessor = new BulkProcessor(bulkAsync(latch), backoff, emptyListener(), + concurrentRequests, bulkActions, bulkSize, flushInterval, + scheduler, null, BulkRequest::new); + + bulkProcessor.add(new IndexRequest()); + bulkProcessor.add(new IndexRequest()); + bulkProcessor.add(new IndexRequest()); + // Step-1: different from #46790, here we first execute `Flush` method + Thread.sleep(2000L); + latch.countDown(); + Future future = userExec.submit(() -> { + bulkProcessor.add(new IndexRequest()); + bulkProcessor.add(new IndexRequest()); + bulkProcessor.add(new IndexRequest()); + bulkProcessor.add(new IndexRequest());// Step-3: Due to step-1 is not completed,here we wait for ReentrantLock + }); + + + return future; + } + + private BiConsumer> bulkAsync(CountDownLatch latch) { + return (request, listener) -> + { + // Step-2: retry of bulk request by using scheduler thread + // Due to step-1, scheduler thread is already occupied, causing the retry policy to not be executed + // latch in step-1 not to be released + asyncExec.execute(() -> { + try { + latch.await(); + listener.onResponse(createBulkResponse()); + } catch (InterruptedException e) { + throw ExceptionsHelper.convertToRuntime(e); + } + }); + }; + } + + private BulkResponse createBulkResponse() { + EsRejectedExecutionException exception =new EsRejectedExecutionException(); + Failure failure = new Failure("", "", "", exception, ExceptionsHelper.status(exception)); + BulkItemResponse[] bulkActionResponses = new BulkItemResponse[] { + new BulkItemResponse(), + new BulkItemResponse(), + new BulkItemResponse(3, OpType.INDEX, failure) + }; + BulkResponse bulkResponse = new BulkResponse(bulkActionResponses, 3000L); + return bulkResponse; } public void testBulkProcessorFlushPreservesContext() throws InterruptedException { diff --git a/server/src/test/java/org/elasticsearch/action/bulk/RetryTests.java b/server/src/test/java/org/elasticsearch/action/bulk/RetryTests.java index f5d881e2b04a6..6fa6b1354f1ab 100644 --- a/server/src/test/java/org/elasticsearch/action/bulk/RetryTests.java +++ b/server/src/test/java/org/elasticsearch/action/bulk/RetryTests.java @@ -41,7 +41,7 @@ import static org.hamcrest.Matchers.nullValue; public class RetryTests extends ESTestCase { - // no need to wait fof a long time in tests + // no need to wait for a long time in tests private static final TimeValue DELAY = TimeValue.timeValueMillis(1L); private static final int CALLS_TO_FAIL = 5;