From bd1dc98f6939b26b38a1260dc447a84e463c5688 Mon Sep 17 00:00:00 2001 From: Jake Landis Date: Thu, 23 May 2019 07:54:00 -0500 Subject: [PATCH] Bulk processor concurrent requests (#41451) `org.elasticsearch.action.bulk.BulkProcessor` is a threadsafe class that allows for simple semantics to deal with sending bulk requests. Once a bulk reaches it's pre-defined size, documents, or flush interval it will execute sending the bulk. One configurable option is the number of concurrent outstanding bulk requests. That concurrency is implemented in `org.elasticsearch.action.bulk.BulkRequestHandler` via a semaphore. However, the only code that currently calls into this code is blocked by `synchronized` methods. This results in the in-ability for the BulkProcessor to behave concurrently despite supporting configurable amounts of concurrent requests. This change removes the `synchronized` method in favor an explicit lock around the non-thread safe parts of the method. The call into `org.elasticsearch.action.bulk.BulkRequestHandler` is no longer blocking, which allows `org.elasticsearch.action.bulk.BulkRequestHandler` to handle it's own concurrency. --- .../action/bulk/BulkProcessor.java | 108 +++++--- .../action/bulk/BulkProcessorTests.java | 251 +++++++++++++++++- 2 files changed, 328 insertions(+), 31 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/action/bulk/BulkProcessor.java b/server/src/main/java/org/elasticsearch/action/bulk/BulkProcessor.java index b0ad87a8b744a..08c42c5ea40de 100644 --- a/server/src/main/java/org/elasticsearch/action/bulk/BulkProcessor.java +++ b/server/src/main/java/org/elasticsearch/action/bulk/BulkProcessor.java @@ -26,6 +26,7 @@ import org.elasticsearch.client.Client; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.bytes.BytesReference; +import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.ByteSizeUnit; import org.elasticsearch.common.unit.ByteSizeValue; @@ -39,6 +40,7 @@ import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.locks.ReentrantLock; import java.util.function.BiConsumer; import java.util.function.Supplier; @@ -225,6 +227,7 @@ private static Scheduler buildScheduler(ScheduledThreadPoolExecutor scheduledThr private final Runnable onClose; private volatile boolean closed = false; + private final ReentrantLock lock = new ReentrantLock(); BulkProcessor(BiConsumer> consumer, BackoffPolicy backoffPolicy, Listener listener, int concurrentRequests, int bulkActions, ByteSizeValue bulkSize, @Nullable TimeValue flushInterval, @@ -264,21 +267,26 @@ public void close() { * completed * @throws InterruptedException If the current thread is interrupted */ - public synchronized boolean awaitClose(long timeout, TimeUnit unit) throws InterruptedException { - if (closed) { - return true; - } - closed = true; + public boolean awaitClose(long timeout, TimeUnit unit) throws InterruptedException { + lock.lock(); + try { + if (closed) { + return true; + } + closed = true; - this.cancellableFlushTask.cancel(); + this.cancellableFlushTask.cancel(); - if (bulkRequest.numberOfActions() > 0) { - execute(); - } - try { - return this.bulkRequestHandler.awaitClose(timeout, unit); + if (bulkRequest.numberOfActions() > 0) { + execute(); + } + try { + return this.bulkRequestHandler.awaitClose(timeout, unit); + } finally { + onClose.run(); + } } finally { - onClose.run(); + lock.unlock(); } } @@ -315,10 +323,22 @@ protected void ensureOpen() { } } - private synchronized void internalAdd(DocWriteRequest request) { - ensureOpen(); - bulkRequest.add(request); - executeIfNeeded(); + private void internalAdd(DocWriteRequest request) { + //bulkRequest and instance swapping is not threadsafe, so execute the mutations under a lock. + //once the bulk request is ready to be shipped swap the instance reference unlock and send the local reference to the handler. + Tuple bulkRequestToExecute = null; + lock.lock(); + try { + ensureOpen(); + bulkRequest.add(request); + bulkRequestToExecute = newBulkRequestIfNeeded(); + } finally { + lock.unlock(); + } + //execute sending the local reference outside the lock to allow handler to control the concurrency via it's configuration. + if (bulkRequestToExecute != null) { + execute(bulkRequestToExecute.v1(), bulkRequestToExecute.v2()); + } } /** @@ -332,11 +352,23 @@ public BulkProcessor add(BytesReference data, @Nullable String defaultIndex, @Nu /** * Adds the data from the bytes to be processed by the bulk processor */ - public synchronized BulkProcessor add(BytesReference data, @Nullable String defaultIndex, @Nullable String defaultType, + public BulkProcessor add(BytesReference data, @Nullable String defaultIndex, @Nullable String defaultType, @Nullable String defaultPipeline, XContentType xContentType) throws Exception { - bulkRequest.add(data, defaultIndex, defaultType, null, null, defaultPipeline, true, xContentType); - executeIfNeeded(); + Tuple bulkRequestToExecute = null; + lock.lock(); + try { + ensureOpen(); + bulkRequest.add(data, defaultIndex, defaultType, null, null, defaultPipeline, + true, xContentType); + bulkRequestToExecute = newBulkRequestIfNeeded(); + } finally { + lock.unlock(); + } + + if (bulkRequestToExecute != null) { + execute(bulkRequestToExecute.v1(), bulkRequestToExecute.v2()); + } return this; } @@ -358,23 +390,32 @@ public boolean isCancelled() { return scheduler.scheduleWithFixedDelay(flushRunnable, flushInterval, ThreadPool.Names.GENERIC); } - private void executeIfNeeded() { + // needs to be executed under a lock + private Tuple newBulkRequestIfNeeded(){ ensureOpen(); if (!isOverTheLimit()) { - return; + return null; } - execute(); + final BulkRequest bulkRequest = this.bulkRequest; + this.bulkRequest = bulkRequestSupplier.get(); + return new Tuple<>(bulkRequest,executionIdGen.incrementAndGet()) ; + } + + // may be executed without a lock + private void execute(BulkRequest bulkRequest, long executionId ){ + this.bulkRequestHandler.execute(bulkRequest, executionId); } - // (currently) needs to be executed under a lock + // needs to be executed under a lock private void execute() { final BulkRequest bulkRequest = this.bulkRequest; final long executionId = executionIdGen.incrementAndGet(); this.bulkRequest = bulkRequestSupplier.get(); - this.bulkRequestHandler.execute(bulkRequest, executionId); + execute(bulkRequest, executionId); } + // needs to be executed under a lock private boolean isOverTheLimit() { if (bulkActions != -1 && bulkRequest.numberOfActions() >= bulkActions) { return true; @@ -388,18 +429,23 @@ private boolean isOverTheLimit() { /** * Flush pending delete or index requests. */ - public synchronized void flush() { - ensureOpen(); - if (bulkRequest.numberOfActions() > 0) { - execute(); + public void flush() { + lock.lock(); + try { + ensureOpen(); + if (bulkRequest.numberOfActions() > 0) { + execute(); + } + } finally { + lock.unlock(); } } class Flush implements Runnable { - @Override public void run() { - synchronized (BulkProcessor.this) { + lock.lock(); + try { if (closed) { return; } @@ -407,6 +453,8 @@ public void run() { return; } execute(); + } finally { + lock.unlock(); } } } diff --git a/server/src/test/java/org/elasticsearch/action/bulk/BulkProcessorTests.java b/server/src/test/java/org/elasticsearch/action/bulk/BulkProcessorTests.java index e2527397a780a..6a58696534ed4 100644 --- a/server/src/test/java/org/elasticsearch/action/bulk/BulkProcessorTests.java +++ b/server/src/test/java/org/elasticsearch/action/bulk/BulkProcessorTests.java @@ -19,26 +19,43 @@ package org.elasticsearch.action.bulk; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.index.IndexRequest; +import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.unit.ByteSizeUnit; import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.concurrent.ThreadContext; +import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.threadpool.Scheduler; import org.elasticsearch.threadpool.TestThreadPool; import org.elasticsearch.threadpool.ThreadPool; import org.junit.After; import org.junit.Before; +import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.List; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; import java.util.function.BiConsumer; public class BulkProcessorTests extends ESTestCase { private ThreadPool threadPool; + private final Logger logger = LogManager.getLogger(BulkProcessorTests.class); @Before public void startThreadPool() { @@ -90,10 +107,216 @@ public void testBulkProcessorFlushPreservesContext() throws InterruptedException bulkProcessor.close(); } + public void testConcurrentExecutions() throws Exception { + final AtomicBoolean called = new AtomicBoolean(false); + final AtomicReference exceptionRef = new AtomicReference<>(); + int estimatedTimeForTest = Integer.MAX_VALUE; + final int simulateWorkTimeInMillis = 5; + int concurrentClients = 0; + int concurrentBulkRequests = 0; + int expectedExecutions = 0; + int maxBatchSize = 0; + int maxDocuments = 0; + int iterations = 0; + boolean runTest = true; + //find some randoms that allow this test to take under ~ 10 seconds + while (estimatedTimeForTest > 10_000) { + if (iterations++ > 1_000) { //extremely unlikely + runTest = false; + break; + } + maxBatchSize = randomIntBetween(1, 100); + maxDocuments = randomIntBetween(maxBatchSize, 1_000_000); + concurrentClients = randomIntBetween(1, 20); + concurrentBulkRequests = randomIntBetween(0, 20); + expectedExecutions = maxDocuments / maxBatchSize; + estimatedTimeForTest = (expectedExecutions * simulateWorkTimeInMillis) / + Math.min(concurrentBulkRequests + 1, concurrentClients); + } + assumeTrue("failed to find random values that allows test to run quickly", runTest); + BulkResponse bulkResponse = new BulkResponse(new BulkItemResponse[]{ new BulkItemResponse() }, 0); + AtomicInteger failureCount = new AtomicInteger(0); + AtomicInteger successCount = new AtomicInteger(0); + AtomicInteger requestCount = new AtomicInteger(0); + AtomicInteger docCount = new AtomicInteger(0); + BiConsumer> consumer = (request, listener) -> + { + try { + Thread.sleep(simulateWorkTimeInMillis); //simulate work + listener.onResponse(bulkResponse); + } catch (InterruptedException e) { + //should never happen + Thread.currentThread().interrupt(); + failureCount.getAndIncrement(); + exceptionRef.set(ExceptionsHelper.useOrSuppress(exceptionRef.get(), e)); + } + }; + try (BulkProcessor bulkProcessor = new BulkProcessor(consumer, BackoffPolicy.noBackoff(), + countingListener(requestCount, successCount, failureCount, docCount, exceptionRef), + concurrentBulkRequests, maxBatchSize, new ByteSizeValue(Integer.MAX_VALUE), null, + (command, delay, executor) -> null, () -> called.set(true), BulkRequest::new)) { + + ExecutorService executorService = Executors.newFixedThreadPool(concurrentClients); + CountDownLatch startGate = new CountDownLatch(1 + concurrentClients); + + IndexRequest indexRequest = new IndexRequest(); + String bulkRequest = "{ \"index\" : { \"_index\" : \"test\", \"_id\" : \"1\" } }\n" + "{ \"field1\" : \"value1\" }\n"; + BytesReference bytesReference = + BytesReference.fromByteBuffers(new ByteBuffer[]{ ByteBuffer.wrap(bulkRequest.getBytes(StandardCharsets.UTF_8)) }); + List futures = new ArrayList<>(); + for (final AtomicInteger i = new AtomicInteger(0); i.getAndIncrement() < maxDocuments; ) { + futures.add(executorService.submit(() -> { + try { + //don't start any work until all tasks are submitted + startGate.countDown(); + startGate.await(); + //alternate between ways to add to the bulk processor + if (randomBoolean()) { + bulkProcessor.add(indexRequest); + } else { + bulkProcessor.add(bytesReference, null, null, XContentType.JSON); + } + } catch (Exception e) { + throw ExceptionsHelper.convertToRuntime(e); + } + })); + } + startGate.countDown(); + startGate.await(); + + for (Future f : futures) { + try { + f.get(); + } catch (Exception e) { + failureCount.incrementAndGet(); + exceptionRef.set(ExceptionsHelper.useOrSuppress(exceptionRef.get(), e)); + } + } + executorService.shutdown(); + executorService.awaitTermination(10, TimeUnit.SECONDS); + + if (failureCount.get() > 0 || successCount.get() != expectedExecutions || requestCount.get() != successCount.get()) { + if (exceptionRef.get() != null) { + logger.error("exception(s) caught during test", exceptionRef.get()); + } + fail("\nExpected Bulks: " + expectedExecutions + "\n" + + "Requested Bulks: " + requestCount.get() + "\n" + + "Successful Bulks: " + successCount.get() + "\n" + + "Failed Bulks: " + failureCount.get() + "\n" + + "Max Documents: " + maxDocuments + "\n" + + "Max Batch Size: " + maxBatchSize + "\n" + + "Concurrent Clients: " + concurrentClients + "\n" + + "Concurrent Bulk Requests: " + concurrentBulkRequests + "\n" + ); + } + } + //count total docs after processor is closed since there may have been partial batches that are flushed on close. + assertEquals(docCount.get(), maxDocuments); + } + + public void testConcurrentExecutionsWithFlush() throws Exception { + final AtomicReference exceptionRef = new AtomicReference<>(); + final int maxDocuments = 100_000; + final int concurrentClients = 2; + final int maxBatchSize = Integer.MAX_VALUE; //don't flush based on size + final int concurrentBulkRequests = randomIntBetween(0, 20); + final int simulateWorkTimeInMillis = 5; + BulkResponse bulkResponse = new BulkResponse(new BulkItemResponse[]{ new BulkItemResponse() }, 0); + AtomicInteger failureCount = new AtomicInteger(0); + AtomicInteger successCount = new AtomicInteger(0); + AtomicInteger requestCount = new AtomicInteger(0); + AtomicInteger docCount = new AtomicInteger(0); + BiConsumer> consumer = (request, listener) -> + { + try { + Thread.sleep(simulateWorkTimeInMillis); //simulate work + listener.onResponse(bulkResponse); + } catch (InterruptedException e) { + //should never happen + Thread.currentThread().interrupt(); + failureCount.getAndIncrement(); + exceptionRef.set(ExceptionsHelper.useOrSuppress(exceptionRef.get(), e)); + } + }; + ScheduledExecutorService flushExecutor = Executors.newScheduledThreadPool(1); + try (BulkProcessor bulkProcessor = new BulkProcessor(consumer, BackoffPolicy.noBackoff(), + countingListener(requestCount, successCount, failureCount, docCount, exceptionRef), + concurrentBulkRequests, maxBatchSize, new ByteSizeValue(Integer.MAX_VALUE), + TimeValue.timeValueMillis(simulateWorkTimeInMillis * 2), + (command, delay, executor) -> + Scheduler.wrapAsScheduledCancellable(flushExecutor.schedule(command, delay.millis(), TimeUnit.MILLISECONDS)), + () -> + { + flushExecutor.shutdown(); + try { + flushExecutor.awaitTermination(10L, TimeUnit.SECONDS); + if (flushExecutor.isTerminated() == false) { + flushExecutor.shutdownNow(); + } + } catch (InterruptedException ie) { + Thread.currentThread().interrupt(); + } + }, + BulkRequest::new)) { + + ExecutorService executorService = Executors.newFixedThreadPool(concurrentClients); + IndexRequest indexRequest = new IndexRequest(); + String bulkRequest = "{ \"index\" : { \"_index\" : \"test\", \"_id\" : \"1\" } }\n" + "{ \"field1\" : \"value1\" }\n"; + BytesReference bytesReference = + BytesReference.fromByteBuffers(new ByteBuffer[]{ ByteBuffer.wrap(bulkRequest.getBytes(StandardCharsets.UTF_8)) }); + List futures = new ArrayList<>(); + CountDownLatch startGate = new CountDownLatch(1 + concurrentClients); + for (final AtomicInteger i = new AtomicInteger(0); i.getAndIncrement() < maxDocuments; ) { + futures.add(executorService.submit(() -> { + try { + //don't start any work until all tasks are submitted + startGate.countDown(); + startGate.await(); + //alternate between ways to add to the bulk processor + if (randomBoolean()) { + bulkProcessor.add(indexRequest); + } else { + bulkProcessor.add(bytesReference, null, null, XContentType.JSON); + } + } catch (Exception e) { + throw ExceptionsHelper.convertToRuntime(e); + } + })); + } + startGate.countDown(); + startGate.await(); + + for (Future f : futures) { + try { + f.get(); + } catch (Exception e) { + failureCount.incrementAndGet(); + exceptionRef.set(ExceptionsHelper.useOrSuppress(exceptionRef.get(), e)); + } + } + executorService.shutdown(); + executorService.awaitTermination(10, TimeUnit.SECONDS); + } + + if (failureCount.get() > 0 || requestCount.get() != successCount.get() || maxDocuments != docCount.get()) { + if (exceptionRef.get() != null) { + logger.error("exception(s) caught during test", exceptionRef.get()); + } + fail("\nRequested Bulks: " + requestCount.get() + "\n" + + "Successful Bulks: " + successCount.get() + "\n" + + "Failed Bulks: " + failureCount.get() + "\n" + + "Total Documents: " + docCount.get() + "\n" + + "Max Documents: " + maxDocuments + "\n" + + "Max Batch Size: " + maxBatchSize + "\n" + + "Concurrent Clients: " + concurrentClients + "\n" + + "Concurrent Bulk Requests: " + concurrentBulkRequests + "\n" + ); + } + } public void testAwaitOnCloseCallsOnClose() throws Exception { final AtomicBoolean called = new AtomicBoolean(false); - BiConsumer> consumer = (request, listener) -> {}; + BiConsumer> consumer = (request, listener) -> { }; BulkProcessor bulkProcessor = new BulkProcessor(consumer, BackoffPolicy.noBackoff(), emptyListener(), 0, 10, new ByteSizeValue(1000), null, (command, delay, executor) -> null, () -> called.set(true), BulkRequest::new); @@ -118,4 +341,30 @@ public void afterBulk(long executionId, BulkRequest request, Throwable failure) } }; } + + private BulkProcessor.Listener countingListener(AtomicInteger requestCount, AtomicInteger successCount, AtomicInteger failureCount, + AtomicInteger docCount, AtomicReference exceptionRef) { + + return new BulkProcessor.Listener() { + @Override + public void beforeBulk(long executionId, BulkRequest request) { + requestCount.incrementAndGet(); + } + + @Override + public void afterBulk(long executionId, BulkRequest request, BulkResponse response) { + successCount.incrementAndGet(); + docCount.addAndGet(request.requests().size()); + } + + @Override + public void afterBulk(long executionId, BulkRequest request, Throwable failure) { + if (failure != null) { + failureCount.incrementAndGet(); + exceptionRef.set(ExceptionsHelper.useOrSuppress(exceptionRef.get(), failure)); + + } + } + }; + } }