diff --git a/server/src/main/java/org/elasticsearch/action/bulk/BulkProcessor.java b/server/src/main/java/org/elasticsearch/action/bulk/BulkProcessor.java index a44d47859d9c5..4a651b70a28bd 100644 --- a/server/src/main/java/org/elasticsearch/action/bulk/BulkProcessor.java +++ b/server/src/main/java/org/elasticsearch/action/bulk/BulkProcessor.java @@ -84,7 +84,8 @@ public static class Builder { private final BiConsumer> consumer; private final Listener listener; - private final Scheduler scheduler; + private final Scheduler flushScheduler; + private final Scheduler retryScheduler; private final Runnable onClose; private int concurrentRequests = 1; private int bulkActions = 1000; @@ -96,10 +97,11 @@ public static class Builder { private String globalPipeline; private Builder(BiConsumer> consumer, Listener listener, - Scheduler scheduler, Runnable onClose) { + Scheduler flushScheduler, Scheduler retryScheduler, Runnable onClose) { this.consumer = consumer; this.listener = listener; - this.scheduler = scheduler; + this.flushScheduler = flushScheduler; + this.retryScheduler = retryScheduler; this.onClose = onClose; } @@ -178,7 +180,7 @@ public Builder setBackoffPolicy(BackoffPolicy backoffPolicy) { */ public BulkProcessor build() { return new BulkProcessor(consumer, backoffPolicy, listener, concurrentRequests, bulkActions, - bulkSize, flushInterval, scheduler, onClose, createBulkRequestWithGlobalDefaults()); + bulkSize, flushInterval, flushScheduler, retryScheduler, onClose, createBulkRequestWithGlobalDefaults()); } private Supplier createBulkRequestWithGlobalDefaults() { @@ -188,19 +190,55 @@ private Supplier createBulkRequestWithGlobalDefaults() { } } + /** + * @param client The client that executes the bulk operations + * @param listener The BulkProcessor listener that gets called on bulk events + * @param flushScheduler The scheduler that is used to flush + * @param retryScheduler The scheduler that is used for retries + * @param onClose The runnable instance that is executed on close. Consumers are required to clean up the schedulers. + * @return the builder for BulkProcessor + */ + public static Builder builder(Client client, Listener listener, Scheduler flushScheduler, Scheduler retryScheduler, Runnable onClose) { + Objects.requireNonNull(client, "client"); + Objects.requireNonNull(listener, "listener"); + return new Builder(client::bulk, listener, flushScheduler, retryScheduler, onClose); + } + + + /** + * @param client The client that executes the bulk operations + * @param listener The BulkProcessor listener that gets called on bulk events + * @return the builder for BulkProcessor + * @deprecated Use {@link #builder(java.util.function.BiConsumer, org.elasticsearch.action.bulk.BulkProcessor.Listener)} + * with client::bulk as the first argument, or {@link #builder(org.elasticsearch.client.Client, + * org.elasticsearch.action.bulk.BulkProcessor.Listener, org.elasticsearch.threadpool.Scheduler, + * org.elasticsearch.threadpool.Scheduler, java.lang.Runnable)} and manage the flush and retry schedulers explicitly + */ + @Deprecated public static Builder builder(Client client, Listener listener) { Objects.requireNonNull(client, "client"); Objects.requireNonNull(listener, "listener"); - return new Builder(client::bulk, listener, client.threadPool(), () -> {}); + return new Builder(client::bulk, listener, client.threadPool(), client.threadPool(), () -> {}); } + /** + * @param consumer The consumer that is called to fulfil bulk operations + * @param listener The BulkProcessor listener that gets called on bulk events + * @return the builder for BulkProcessor + */ public static Builder builder(BiConsumer> consumer, Listener listener) { Objects.requireNonNull(consumer, "consumer"); Objects.requireNonNull(listener, "listener"); - final ScheduledThreadPoolExecutor scheduledThreadPoolExecutor = Scheduler.initScheduler(Settings.EMPTY); + final ScheduledThreadPoolExecutor flushScheduledThreadPoolExecutor = Scheduler.initScheduler(Settings.EMPTY); + final ScheduledThreadPoolExecutor retryScheduledThreadPoolExecutor = Scheduler.initScheduler(Settings.EMPTY); return new Builder(consumer, listener, - buildScheduler(scheduledThreadPoolExecutor), - () -> Scheduler.terminate(scheduledThreadPoolExecutor, 10, TimeUnit.SECONDS)); + buildScheduler(flushScheduledThreadPoolExecutor), + buildScheduler(retryScheduledThreadPoolExecutor), + () -> + { + Scheduler.terminate(flushScheduledThreadPoolExecutor, 10, TimeUnit.SECONDS); + Scheduler.terminate(retryScheduledThreadPoolExecutor, 10, TimeUnit.SECONDS); + }); } private static Scheduler buildScheduler(ScheduledThreadPoolExecutor scheduledThreadPoolExecutor) { @@ -225,17 +263,28 @@ private static Scheduler buildScheduler(ScheduledThreadPoolExecutor scheduledThr BulkProcessor(BiConsumer> consumer, BackoffPolicy backoffPolicy, Listener listener, int concurrentRequests, int bulkActions, ByteSizeValue bulkSize, @Nullable TimeValue flushInterval, - Scheduler scheduler, Runnable onClose, Supplier bulkRequestSupplier) { + Scheduler flushScheduler, Scheduler retryScheduler, Runnable onClose, Supplier bulkRequestSupplier) { this.bulkActions = bulkActions; this.bulkSize = bulkSize.getBytes(); this.bulkRequest = bulkRequestSupplier.get(); this.bulkRequestSupplier = bulkRequestSupplier; - this.bulkRequestHandler = new BulkRequestHandler(consumer, backoffPolicy, listener, scheduler, concurrentRequests); + this.bulkRequestHandler = new BulkRequestHandler(consumer, backoffPolicy, listener, retryScheduler, concurrentRequests); // Start period flushing task after everything is setup - this.cancellableFlushTask = startFlushTask(flushInterval, scheduler); + this.cancellableFlushTask = startFlushTask(flushInterval, flushScheduler); this.onClose = onClose; } + /** + * @deprecated use the {@link BulkProcessor} constructor which uses separate schedulers for flush and retry + */ + @Deprecated + BulkProcessor(BiConsumer> consumer, BackoffPolicy backoffPolicy, Listener listener, + int concurrentRequests, int bulkActions, ByteSizeValue bulkSize, @Nullable TimeValue flushInterval, + Scheduler scheduler, Runnable onClose, Supplier bulkRequestSupplier) { + this(consumer, backoffPolicy, listener, concurrentRequests, bulkActions, bulkSize, flushInterval, + scheduler, scheduler, onClose, bulkRequestSupplier ); + } + /** * Closes the processor. If flushing by time is enabled, then it's shutdown. Any remaining bulk actions are flushed. */ diff --git a/server/src/test/java/org/elasticsearch/action/bulk/BulkProcessorIT.java b/server/src/test/java/org/elasticsearch/action/bulk/BulkProcessorIT.java index db796be23f65b..c81d537e9272d 100644 --- a/server/src/test/java/org/elasticsearch/action/bulk/BulkProcessorIT.java +++ b/server/src/test/java/org/elasticsearch/action/bulk/BulkProcessorIT.java @@ -57,7 +57,7 @@ public void testThatBulkProcessorCountIsCorrect() throws Exception { BulkProcessorTestListener listener = new BulkProcessorTestListener(latch); int numDocs = randomIntBetween(10, 100); - try (BulkProcessor processor = BulkProcessor.builder(client(), listener) + try (BulkProcessor processor = BulkProcessor.builder(client()::bulk, listener) //let's make sure that the bulk action limit trips, one single execution will index all the documents .setConcurrentRequests(randomIntBetween(0, 1)).setBulkActions(numDocs) .setFlushInterval(TimeValue.timeValueHours(24)).setBulkSize(new ByteSizeValue(1, ByteSizeUnit.GB)) @@ -81,7 +81,7 @@ public void testBulkProcessorFlush() throws Exception { int numDocs = randomIntBetween(10, 100); - try (BulkProcessor processor = BulkProcessor.builder(client(), listener) + try (BulkProcessor processor = BulkProcessor.builder(client()::bulk, listener) //let's make sure that this bulk won't be automatically flushed .setConcurrentRequests(randomIntBetween(0, 10)).setBulkActions(numDocs + randomIntBetween(1, 100)) .setFlushInterval(TimeValue.timeValueHours(24)).setBulkSize(new ByteSizeValue(1, ByteSizeUnit.GB)).build()) { @@ -116,7 +116,7 @@ public void testBulkProcessorConcurrentRequests() throws Exception { MultiGetRequestBuilder multiGetRequestBuilder; - try (BulkProcessor processor = BulkProcessor.builder(client(), listener) + try (BulkProcessor processor = BulkProcessor.builder(client()::bulk, listener) .setConcurrentRequests(concurrentRequests).setBulkActions(bulkActions) //set interval and size to high values .setFlushInterval(TimeValue.timeValueHours(24)).setBulkSize(new ByteSizeValue(1, ByteSizeUnit.GB)).build()) { @@ -155,7 +155,7 @@ public void testBulkProcessorWaitOnClose() throws Exception { BulkProcessorTestListener listener = new BulkProcessorTestListener(); int numDocs = randomIntBetween(10, 100); - BulkProcessor processor = BulkProcessor.builder(client(), listener) + BulkProcessor processor = BulkProcessor.builder(client()::bulk, listener) //let's make sure that the bulk action limit trips, one single execution will index all the documents .setConcurrentRequests(randomIntBetween(0, 1)).setBulkActions(numDocs) .setFlushInterval(TimeValue.timeValueHours(24)).setBulkSize(new ByteSizeValue(randomIntBetween(1, 10), @@ -202,7 +202,7 @@ public void testBulkProcessorConcurrentRequestsReadOnlyIndex() throws Exception MultiGetRequestBuilder multiGetRequestBuilder = client().prepareMultiGet(); BulkProcessorTestListener listener = new BulkProcessorTestListener(latch, closeLatch); - try (BulkProcessor processor = BulkProcessor.builder(client(), listener) + try (BulkProcessor processor = BulkProcessor.builder(client()::bulk, listener) .setConcurrentRequests(concurrentRequests).setBulkActions(bulkActions) //set interval and size to high values .setFlushInterval(TimeValue.timeValueHours(24)).setBulkSize(new ByteSizeValue(1, ByteSizeUnit.GB)).build()) { diff --git a/server/src/test/java/org/elasticsearch/action/bulk/BulkProcessorRetryIT.java b/server/src/test/java/org/elasticsearch/action/bulk/BulkProcessorRetryIT.java index 4b80f260a7d1d..0bb0b89c0042c 100644 --- a/server/src/test/java/org/elasticsearch/action/bulk/BulkProcessorRetryIT.java +++ b/server/src/test/java/org/elasticsearch/action/bulk/BulkProcessorRetryIT.java @@ -77,7 +77,7 @@ private void executeBulkRejectionLoad(BackoffPolicy backoffPolicy, boolean rejec assertAcked(prepareCreate(INDEX_NAME)); ensureGreen(); - BulkProcessor bulkProcessor = BulkProcessor.builder(client(), new BulkProcessor.Listener() { + BulkProcessor bulkProcessor = BulkProcessor.builder(client()::bulk, new BulkProcessor.Listener() { @Override public void beforeBulk(long executionId, BulkRequest request) { // no op diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/IndexFollowingIT.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/IndexFollowingIT.java index 1582534cda14c..42dcb709b6591 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/IndexFollowingIT.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/IndexFollowingIT.java @@ -485,7 +485,7 @@ public void afterBulk(long executionId, BulkRequest request, BulkResponse respon public void afterBulk(long executionId, BulkRequest request, Throwable failure) {} }; int bulkSize = between(1, 20); - BulkProcessor bulkProcessor = BulkProcessor.builder(leaderClient(), listener) + BulkProcessor bulkProcessor = BulkProcessor.builder(leaderClient()::bulk, listener) .setBulkActions(bulkSize) .setConcurrentRequests(4) .build(); diff --git a/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/Watcher.java b/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/Watcher.java index 621d478106030..eb5d5eeb8b9cf 100644 --- a/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/Watcher.java +++ b/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/Watcher.java @@ -15,6 +15,7 @@ import org.elasticsearch.action.bulk.BulkResponse; import org.elasticsearch.bootstrap.BootstrapCheck; import org.elasticsearch.client.Client; +import org.elasticsearch.client.OriginSettingClient; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; import org.elasticsearch.cluster.metadata.IndexTemplateMetaData; import org.elasticsearch.cluster.node.DiscoveryNodes; @@ -53,7 +54,6 @@ import org.elasticsearch.threadpool.FixedExecutorBuilder; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.watcher.ResourceWatcherService; -import org.elasticsearch.xpack.core.ClientHelper; import org.elasticsearch.xpack.core.XPackPlugin; import org.elasticsearch.xpack.core.XPackSettings; import org.elasticsearch.xpack.core.action.XPackInfoFeatureAction; @@ -324,7 +324,7 @@ ScriptTransform.TYPE, new ScriptTransformFactory(scriptService), final InputRegistry inputRegistry = new InputRegistry(inputFactories); inputFactories.put(ChainInput.TYPE, new ChainInputFactory(inputRegistry)); - bulkProcessor = BulkProcessor.builder(ClientHelper.clientWithOrigin(client, WATCHER_ORIGIN), new BulkProcessor.Listener() { + bulkProcessor = BulkProcessor.builder(new OriginSettingClient(client, WATCHER_ORIGIN)::bulk, new BulkProcessor.Listener() { @Override public void beforeBulk(long executionId, BulkRequest request) { } diff --git a/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/execution/TriggeredWatchStoreTests.java b/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/execution/TriggeredWatchStoreTests.java index d9915fc4a6d44..657b89949bba6 100644 --- a/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/execution/TriggeredWatchStoreTests.java +++ b/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/execution/TriggeredWatchStoreTests.java @@ -128,7 +128,8 @@ public void init() { when(client.settings()).thenReturn(settings); when(threadPool.getThreadContext()).thenReturn(new ThreadContext(Settings.EMPTY)); parser = mock(TriggeredWatch.Parser.class); - BulkProcessor bulkProcessor = BulkProcessor.builder(client, listener).setConcurrentRequests(0).setBulkActions(1).build(); + BulkProcessor bulkProcessor = BulkProcessor. + builder(client::bulk, listener).setConcurrentRequests(0).setBulkActions(1).build(); triggeredWatchStore = new TriggeredWatchStore(settings, client, parser, bulkProcessor); } diff --git a/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/history/HistoryStoreTests.java b/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/history/HistoryStoreTests.java index 790f83e200479..8a6e53ec02426 100644 --- a/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/history/HistoryStoreTests.java +++ b/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/history/HistoryStoreTests.java @@ -73,7 +73,7 @@ public void init() { when(client.settings()).thenReturn(settings); when(threadPool.getThreadContext()).thenReturn(new ThreadContext(settings)); BulkProcessor.Listener listener = mock(BulkProcessor.Listener.class); - BulkProcessor bulkProcessor = BulkProcessor.builder(client, listener).setConcurrentRequests(0).setBulkActions(1).build(); + BulkProcessor bulkProcessor = BulkProcessor.builder(client::bulk, listener).setConcurrentRequests(0).setBulkActions(1).build(); historyStore = new HistoryStore(bulkProcessor); }