Skip to content

Commit

Permalink
Decouple BulkProcessor from ThreadPool (#26727)
Browse files Browse the repository at this point in the history
Introduce minimal thread scheduler as a base class for `ThreadPool`. Such a class can be used from the `BulkProcessor` to schedule retries and the flush task. This allows to remove the `ThreadPool` dependency from `BulkProcessor`, which requires to provide settings that contain `node.name` and also needed log4j for logging. Instead, it needs now a `Scheduler` that is much lighter and gets automatically created and shut down on close.

Closes #26028
  • Loading branch information
javanna authored Oct 25, 2017
1 parent cc3364e commit 8caf7d4
Show file tree
Hide file tree
Showing 18 changed files with 344 additions and 241 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@
import org.elasticsearch.action.update.UpdateResponse;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.xcontent.XContentBuilder;
Expand All @@ -50,7 +49,6 @@
import org.elasticsearch.script.Script;
import org.elasticsearch.script.ScriptType;
import org.elasticsearch.search.fetch.subphase.FetchSourceContext;
import org.elasticsearch.threadpool.ThreadPool;

import java.io.IOException;
import java.util.Collections;
Expand Down Expand Up @@ -614,14 +612,14 @@ public void afterBulk(long executionId, BulkRequest request, Throwable failure)
}
};

ThreadPool threadPool = new ThreadPool(Settings.builder().put("node.name", getClass().getName()).build());
// Pull the client to a variable to work around https://bugs.eclipse.org/bugs/show_bug.cgi?id=514884
RestHighLevelClient hlClient = highLevelClient();
try(BulkProcessor processor = new BulkProcessor.Builder(hlClient::bulkAsync, listener, threadPool)
.setConcurrentRequests(0)
.setBulkSize(new ByteSizeValue(5, ByteSizeUnit.GB))
.setBulkActions(nbItems + 1)
.build()) {

try (BulkProcessor processor = BulkProcessor.builder(hlClient::bulkAsync, listener)
.setConcurrentRequests(0)
.setBulkSize(new ByteSizeValue(5, ByteSizeUnit.GB))
.setBulkActions(nbItems + 1)
.build()) {
for (int i = 0; i < nbItems; i++) {
String id = String.valueOf(i);
boolean erroneous = randomBoolean();
Expand All @@ -631,7 +629,7 @@ public void afterBulk(long executionId, BulkRequest request, Throwable failure)
if (opType == DocWriteRequest.OpType.DELETE) {
if (erroneous == false) {
assertEquals(RestStatus.CREATED,
highLevelClient().index(new IndexRequest("index", "test", id).source("field", -1)).status());
highLevelClient().index(new IndexRequest("index", "test", id).source("field", -1)).status());
}
DeleteRequest deleteRequest = new DeleteRequest("index", "test", id);
processor.add(deleteRequest);
Expand All @@ -653,10 +651,10 @@ public void afterBulk(long executionId, BulkRequest request, Throwable failure)

} else if (opType == DocWriteRequest.OpType.UPDATE) {
UpdateRequest updateRequest = new UpdateRequest("index", "test", id)
.doc(new IndexRequest().source(xContentType, "id", i));
.doc(new IndexRequest().source(xContentType, "id", i));
if (erroneous == false) {
assertEquals(RestStatus.CREATED,
highLevelClient().index(new IndexRequest("index", "test", id).source("field", -1)).status());
highLevelClient().index(new IndexRequest("index", "test", id).source("field", -1)).status());
}
processor.add(updateRequest);
}
Expand All @@ -676,8 +674,6 @@ public void afterBulk(long executionId, BulkRequest request, Throwable failure)
assertNull(error.get());

validateBulkResponses(nbItems, errors, bulkResponse, bulkRequest);

terminate(threadPool);
}

private void validateBulkResponses(int nbItems, boolean[] errors, BulkResponse bulkResponse, BulkRequest bulkRequest) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,11 @@

package org.elasticsearch.client.documentation;

import org.elasticsearch.Build;
import org.apache.http.HttpEntity;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.entity.ContentType;
import org.apache.http.nio.entity.NStringEntity;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.DocWriteRequest;
import org.elasticsearch.action.DocWriteResponse;
Expand All @@ -40,7 +38,6 @@
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.main.MainResponse;
import org.elasticsearch.action.support.ActiveShardCount;
import org.elasticsearch.action.support.WriteRequest;
import org.elasticsearch.action.support.replication.ReplicationResponse;
Expand All @@ -49,9 +46,7 @@
import org.elasticsearch.client.ESRestHighLevelClientTestCase;
import org.elasticsearch.client.Response;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;
Expand All @@ -64,7 +59,7 @@
import org.elasticsearch.script.Script;
import org.elasticsearch.script.ScriptType;
import org.elasticsearch.search.fetch.subphase.FetchSourceContext;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.threadpool.Scheduler;

import java.io.IOException;
import java.util.Collections;
Expand Down Expand Up @@ -868,31 +863,27 @@ public void onFailure(Exception e) {
}

public void testBulkProcessor() throws InterruptedException, IOException {
Settings settings = Settings.builder().put("node.name", "my-application").build();
RestHighLevelClient client = highLevelClient();
{
// tag::bulk-processor-init
ThreadPool threadPool = new ThreadPool(settings); // <1>

BulkProcessor.Listener listener = new BulkProcessor.Listener() { // <2>
BulkProcessor.Listener listener = new BulkProcessor.Listener() { // <1>
@Override
public void beforeBulk(long executionId, BulkRequest request) {
// <3>
// <2>
}

@Override
public void afterBulk(long executionId, BulkRequest request, BulkResponse response) {
// <4>
// <3>
}

@Override
public void afterBulk(long executionId, BulkRequest request, Throwable failure) {
// <5>
// <4>
}
};

BulkProcessor bulkProcessor = new BulkProcessor.Builder(client::bulkAsync, listener, threadPool)
.build(); // <6>
BulkProcessor bulkProcessor = BulkProcessor.builder(client::bulkAsync, listener).build(); // <5>
// end::bulk-processor-init
assertNotNull(bulkProcessor);

Expand All @@ -917,7 +908,6 @@ public void afterBulk(long executionId, BulkRequest request, Throwable failure)
// tag::bulk-processor-close
bulkProcessor.close();
// end::bulk-processor-close
terminate(threadPool);
}
{
// tag::bulk-processor-listener
Expand All @@ -944,19 +934,14 @@ public void afterBulk(long executionId, BulkRequest request, Throwable failure)
};
// end::bulk-processor-listener

ThreadPool threadPool = new ThreadPool(settings);
try {
// tag::bulk-processor-options
BulkProcessor.Builder builder = new BulkProcessor.Builder(client::bulkAsync, listener, threadPool);
builder.setBulkActions(500); // <1>
builder.setBulkSize(new ByteSizeValue(1L, ByteSizeUnit.MB)); // <2>
builder.setConcurrentRequests(0); // <3>
builder.setFlushInterval(TimeValue.timeValueSeconds(10L)); // <4>
builder.setBackoffPolicy(BackoffPolicy.constantBackoff(TimeValue.timeValueSeconds(1L), 3)); // <5>
// end::bulk-processor-options
} finally {
terminate(threadPool);
}
// tag::bulk-processor-options
BulkProcessor.Builder builder = BulkProcessor.builder(client::bulkAsync, listener);
builder.setBulkActions(500); // <1>
builder.setBulkSize(new ByteSizeValue(1L, ByteSizeUnit.MB)); // <2>
builder.setConcurrentRequests(0); // <3>
builder.setFlushInterval(TimeValue.timeValueSeconds(10L)); // <4>
builder.setBackoffPolicy(BackoffPolicy.constantBackoff(TimeValue.timeValueSeconds(1L), 3)); // <5>
// end::bulk-processor-options
}
}
}
53 changes: 33 additions & 20 deletions core/src/main/java/org/elasticsearch/action/bulk/BulkProcessor.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,14 +26,17 @@
import org.elasticsearch.client.Client;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.threadpool.Scheduler;
import org.elasticsearch.threadpool.ThreadPool;

import java.io.Closeable;
import java.util.Objects;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.BiConsumer;
Expand Down Expand Up @@ -78,22 +81,20 @@ public static class Builder {

private final BiConsumer<BulkRequest, ActionListener<BulkResponse>> consumer;
private final Listener listener;
private final ThreadPool threadPool;

private final Scheduler scheduler;
private final Runnable onClose;
private int concurrentRequests = 1;
private int bulkActions = 1000;
private ByteSizeValue bulkSize = new ByteSizeValue(5, ByteSizeUnit.MB);
private TimeValue flushInterval = null;
private BackoffPolicy backoffPolicy = BackoffPolicy.exponentialBackoff();

/**
* Creates a builder of bulk processor with the client to use and the listener that will be used
* to be notified on the completion of bulk requests.
*/
public Builder(BiConsumer<BulkRequest, ActionListener<BulkResponse>> consumer, Listener listener, ThreadPool threadPool) {
private Builder(BiConsumer<BulkRequest, ActionListener<BulkResponse>> consumer, Listener listener,
Scheduler scheduler, Runnable onClose) {
this.consumer = consumer;
this.listener = listener;
this.threadPool = threadPool;
this.scheduler = scheduler;
this.onClose = onClose;
}

/**
Expand Down Expand Up @@ -155,39 +156,51 @@ public Builder setBackoffPolicy(BackoffPolicy backoffPolicy) {
* Builds a new bulk processor.
*/
public BulkProcessor build() {
return new BulkProcessor(consumer, backoffPolicy, listener, concurrentRequests, bulkActions, bulkSize, flushInterval, threadPool);
return new BulkProcessor(consumer, backoffPolicy, listener, concurrentRequests, bulkActions, bulkSize, flushInterval,
scheduler, onClose);
}
}

public static Builder builder(Client client, Listener listener) {
Objects.requireNonNull(client, "client");
Objects.requireNonNull(listener, "listener");
return new Builder(client::bulk, listener, client.threadPool(), () -> {});
}

return new Builder(client::bulk, listener, client.threadPool());
public static Builder builder(BiConsumer<BulkRequest, ActionListener<BulkResponse>> consumer, Listener listener) {
Objects.requireNonNull(consumer, "consumer");
Objects.requireNonNull(listener, "listener");
final ScheduledThreadPoolExecutor scheduledThreadPoolExecutor = Scheduler.initScheduler(Settings.EMPTY);
return new Builder(consumer, listener,
(delay, executor, command) -> scheduledThreadPoolExecutor.schedule(command, delay.millis(), TimeUnit.MILLISECONDS),
() -> Scheduler.terminate(scheduledThreadPoolExecutor, 10, TimeUnit.SECONDS));
}

private final int bulkActions;
private final long bulkSize;

private final ThreadPool.Cancellable cancellableFlushTask;
private final Scheduler.Cancellable cancellableFlushTask;

private final AtomicLong executionIdGen = new AtomicLong();

private BulkRequest bulkRequest;
private final BulkRequestHandler bulkRequestHandler;
private final Scheduler scheduler;
private final Runnable onClose;

private volatile boolean closed = false;

BulkProcessor(BiConsumer<BulkRequest, ActionListener<BulkResponse>> consumer, BackoffPolicy backoffPolicy, Listener listener,
int concurrentRequests, int bulkActions, ByteSizeValue bulkSize, @Nullable TimeValue flushInterval,
ThreadPool threadPool) {
Scheduler scheduler, Runnable onClose) {
this.bulkActions = bulkActions;
this.bulkSize = bulkSize.getBytes();
this.bulkRequest = new BulkRequest();
this.bulkRequestHandler = new BulkRequestHandler(consumer, backoffPolicy, listener, threadPool, concurrentRequests);

this.scheduler = scheduler;
this.bulkRequestHandler = new BulkRequestHandler(consumer, backoffPolicy, listener, scheduler, concurrentRequests);
// Start period flushing task after everything is setup
this.cancellableFlushTask = startFlushTask(flushInterval, threadPool);
this.cancellableFlushTask = startFlushTask(flushInterval, scheduler);
this.onClose = onClose;
}

/**
Expand All @@ -200,6 +213,7 @@ public void close() {
} catch (InterruptedException exc) {
Thread.currentThread().interrupt();
}
onClose.run();
}

/**
Expand Down Expand Up @@ -289,9 +303,9 @@ public synchronized BulkProcessor add(BytesReference data, @Nullable String defa
return this;
}

private ThreadPool.Cancellable startFlushTask(TimeValue flushInterval, ThreadPool threadPool) {
private Scheduler.Cancellable startFlushTask(TimeValue flushInterval, Scheduler scheduler) {
if (flushInterval == null) {
return new ThreadPool.Cancellable() {
return new Scheduler.Cancellable() {
@Override
public void cancel() {}

Expand All @@ -301,9 +315,8 @@ public boolean isCancelled() {
}
};
}

final Runnable flushRunnable = threadPool.getThreadContext().preserveContext(new Flush());
return threadPool.scheduleWithFixedDelay(flushRunnable, flushInterval, ThreadPool.Names.GENERIC);
final Runnable flushRunnable = scheduler.preserveContext(new Flush());
return scheduler.scheduleWithFixedDelay(flushRunnable, flushInterval, ThreadPool.Names.GENERIC);
}

private void executeIfNeeded() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.threadpool.Scheduler;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Semaphore;
Expand All @@ -44,14 +44,13 @@ public final class BulkRequestHandler {
private final int concurrentRequests;

BulkRequestHandler(BiConsumer<BulkRequest, ActionListener<BulkResponse>> consumer, BackoffPolicy backoffPolicy,
BulkProcessor.Listener listener, ThreadPool threadPool,
int concurrentRequests) {
BulkProcessor.Listener listener, Scheduler scheduler, int concurrentRequests) {
assert concurrentRequests >= 0;
this.logger = Loggers.getLogger(getClass());
this.consumer = consumer;
this.listener = listener;
this.concurrentRequests = concurrentRequests;
this.retry = new Retry(EsRejectedExecutionException.class, backoffPolicy, threadPool);
this.retry = new Retry(EsRejectedExecutionException.class, backoffPolicy, scheduler);
this.semaphore = new Semaphore(concurrentRequests > 0 ? concurrentRequests : 1);
}

Expand Down
Loading

0 comments on commit 8caf7d4

Please sign in to comment.