Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Prevent deadlock by using separate schedulers #48697

Merged
merged 2 commits into from
Oct 31, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,8 @@ public static class Builder {

private final BiConsumer<BulkRequest, ActionListener<BulkResponse>> consumer;
private final Listener listener;
private final Scheduler scheduler;
private final Scheduler flushScheduler;
private final Scheduler retryScheduler;
private final Runnable onClose;
private int concurrentRequests = 1;
private int bulkActions = 1000;
Expand All @@ -96,10 +97,11 @@ public static class Builder {
private String globalPipeline;

private Builder(BiConsumer<BulkRequest, ActionListener<BulkResponse>> consumer, Listener listener,
Scheduler scheduler, Runnable onClose) {
Scheduler flushScheduler, Scheduler retryScheduler, Runnable onClose) {
this.consumer = consumer;
this.listener = listener;
this.scheduler = scheduler;
this.flushScheduler = flushScheduler;
this.retryScheduler = retryScheduler;
this.onClose = onClose;
}

Expand Down Expand Up @@ -178,7 +180,7 @@ public Builder setBackoffPolicy(BackoffPolicy backoffPolicy) {
*/
public BulkProcessor build() {
return new BulkProcessor(consumer, backoffPolicy, listener, concurrentRequests, bulkActions,
bulkSize, flushInterval, scheduler, onClose, createBulkRequestWithGlobalDefaults());
bulkSize, flushInterval, flushScheduler, retryScheduler, onClose, createBulkRequestWithGlobalDefaults());
}

private Supplier<BulkRequest> createBulkRequestWithGlobalDefaults() {
Expand All @@ -188,19 +190,55 @@ private Supplier<BulkRequest> createBulkRequestWithGlobalDefaults() {
}
}

/**
* @param client The client that executes the bulk operations
* @param listener The BulkProcessor listener that gets called on bulk events
* @param flushScheduler The scheduler that is used to flush
* @param retryScheduler The scheduler that is used for retries
* @param onClose The runnable instance that is executed on close. Consumers are required to clean up the schedulers.
* @return the builder for BulkProcessor
*/
public static Builder builder(Client client, Listener listener, Scheduler flushScheduler, Scheduler retryScheduler, Runnable onClose) {
Objects.requireNonNull(client, "client");
Objects.requireNonNull(listener, "listener");
return new Builder(client::bulk, listener, flushScheduler, retryScheduler, onClose);
}


/**
* @param client The client that executes the bulk operations
* @param listener The BulkProcessor listener that gets called on bulk events
* @return the builder for BulkProcessor
* @deprecated Use {@link #builder(java.util.function.BiConsumer, org.elasticsearch.action.bulk.BulkProcessor.Listener)}
* with client::bulk as the first argument, or {@link #builder(org.elasticsearch.client.Client,
* org.elasticsearch.action.bulk.BulkProcessor.Listener, org.elasticsearch.threadpool.Scheduler,
* org.elasticsearch.threadpool.Scheduler, java.lang.Runnable)} and manage the flush and retry schedulers explicitly
*/
@Deprecated
public static Builder builder(Client client, Listener listener) {
Objects.requireNonNull(client, "client");
Objects.requireNonNull(listener, "listener");
return new Builder(client::bulk, listener, client.threadPool(), () -> {});
return new Builder(client::bulk, listener, client.threadPool(), client.threadPool(), () -> {});
}

/**
* @param consumer The consumer that is called to fulfil bulk operations
* @param listener The BulkProcessor listener that gets called on bulk events
* @return the builder for BulkProcessor
*/
public static Builder builder(BiConsumer<BulkRequest, ActionListener<BulkResponse>> consumer, Listener listener) {
Objects.requireNonNull(consumer, "consumer");
Objects.requireNonNull(listener, "listener");
final ScheduledThreadPoolExecutor scheduledThreadPoolExecutor = Scheduler.initScheduler(Settings.EMPTY);
final ScheduledThreadPoolExecutor flushScheduledThreadPoolExecutor = Scheduler.initScheduler(Settings.EMPTY);
final ScheduledThreadPoolExecutor retryScheduledThreadPoolExecutor = Scheduler.initScheduler(Settings.EMPTY);
return new Builder(consumer, listener,
buildScheduler(scheduledThreadPoolExecutor),
() -> Scheduler.terminate(scheduledThreadPoolExecutor, 10, TimeUnit.SECONDS));
buildScheduler(flushScheduledThreadPoolExecutor),
buildScheduler(retryScheduledThreadPoolExecutor),
() ->
{
Scheduler.terminate(flushScheduledThreadPoolExecutor, 10, TimeUnit.SECONDS);
Scheduler.terminate(retryScheduledThreadPoolExecutor, 10, TimeUnit.SECONDS);
});
}

private static Scheduler buildScheduler(ScheduledThreadPoolExecutor scheduledThreadPoolExecutor) {
Expand All @@ -225,17 +263,28 @@ private static Scheduler buildScheduler(ScheduledThreadPoolExecutor scheduledThr

BulkProcessor(BiConsumer<BulkRequest, ActionListener<BulkResponse>> consumer, BackoffPolicy backoffPolicy, Listener listener,
int concurrentRequests, int bulkActions, ByteSizeValue bulkSize, @Nullable TimeValue flushInterval,
Scheduler scheduler, Runnable onClose, Supplier<BulkRequest> bulkRequestSupplier) {
Scheduler flushScheduler, Scheduler retryScheduler, Runnable onClose, Supplier<BulkRequest> bulkRequestSupplier) {
this.bulkActions = bulkActions;
this.bulkSize = bulkSize.getBytes();
this.bulkRequest = bulkRequestSupplier.get();
this.bulkRequestSupplier = bulkRequestSupplier;
this.bulkRequestHandler = new BulkRequestHandler(consumer, backoffPolicy, listener, scheduler, concurrentRequests);
this.bulkRequestHandler = new BulkRequestHandler(consumer, backoffPolicy, listener, retryScheduler, concurrentRequests);
// Start period flushing task after everything is setup
this.cancellableFlushTask = startFlushTask(flushInterval, scheduler);
this.cancellableFlushTask = startFlushTask(flushInterval, flushScheduler);
this.onClose = onClose;
}

/**
* @deprecated use the {@link BulkProcessor} constructor which uses separate schedulers for flush and retry
*/
@Deprecated
BulkProcessor(BiConsumer<BulkRequest, ActionListener<BulkResponse>> consumer, BackoffPolicy backoffPolicy, Listener listener,
int concurrentRequests, int bulkActions, ByteSizeValue bulkSize, @Nullable TimeValue flushInterval,
Scheduler scheduler, Runnable onClose, Supplier<BulkRequest> bulkRequestSupplier) {
this(consumer, backoffPolicy, listener, concurrentRequests, bulkActions, bulkSize, flushInterval,
scheduler, scheduler, onClose, bulkRequestSupplier );
}

/**
* Closes the processor. If flushing by time is enabled, then it's shutdown. Any remaining bulk actions are flushed.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ public void testThatBulkProcessorCountIsCorrect() throws Exception {
BulkProcessorTestListener listener = new BulkProcessorTestListener(latch);

int numDocs = randomIntBetween(10, 100);
try (BulkProcessor processor = BulkProcessor.builder(client(), listener)
try (BulkProcessor processor = BulkProcessor.builder(client()::bulk, listener)
//let's make sure that the bulk action limit trips, one single execution will index all the documents
.setConcurrentRequests(randomIntBetween(0, 1)).setBulkActions(numDocs)
.setFlushInterval(TimeValue.timeValueHours(24)).setBulkSize(new ByteSizeValue(1, ByteSizeUnit.GB))
Expand All @@ -81,7 +81,7 @@ public void testBulkProcessorFlush() throws Exception {

int numDocs = randomIntBetween(10, 100);

try (BulkProcessor processor = BulkProcessor.builder(client(), listener)
try (BulkProcessor processor = BulkProcessor.builder(client()::bulk, listener)
//let's make sure that this bulk won't be automatically flushed
.setConcurrentRequests(randomIntBetween(0, 10)).setBulkActions(numDocs + randomIntBetween(1, 100))
.setFlushInterval(TimeValue.timeValueHours(24)).setBulkSize(new ByteSizeValue(1, ByteSizeUnit.GB)).build()) {
Expand Down Expand Up @@ -116,7 +116,7 @@ public void testBulkProcessorConcurrentRequests() throws Exception {

MultiGetRequestBuilder multiGetRequestBuilder;

try (BulkProcessor processor = BulkProcessor.builder(client(), listener)
try (BulkProcessor processor = BulkProcessor.builder(client()::bulk, listener)
.setConcurrentRequests(concurrentRequests).setBulkActions(bulkActions)
//set interval and size to high values
.setFlushInterval(TimeValue.timeValueHours(24)).setBulkSize(new ByteSizeValue(1, ByteSizeUnit.GB)).build()) {
Expand Down Expand Up @@ -155,7 +155,7 @@ public void testBulkProcessorWaitOnClose() throws Exception {
BulkProcessorTestListener listener = new BulkProcessorTestListener();

int numDocs = randomIntBetween(10, 100);
BulkProcessor processor = BulkProcessor.builder(client(), listener)
BulkProcessor processor = BulkProcessor.builder(client()::bulk, listener)
//let's make sure that the bulk action limit trips, one single execution will index all the documents
.setConcurrentRequests(randomIntBetween(0, 1)).setBulkActions(numDocs)
.setFlushInterval(TimeValue.timeValueHours(24)).setBulkSize(new ByteSizeValue(randomIntBetween(1, 10),
Expand Down Expand Up @@ -202,7 +202,7 @@ public void testBulkProcessorConcurrentRequestsReadOnlyIndex() throws Exception
MultiGetRequestBuilder multiGetRequestBuilder = client().prepareMultiGet();
BulkProcessorTestListener listener = new BulkProcessorTestListener(latch, closeLatch);

try (BulkProcessor processor = BulkProcessor.builder(client(), listener)
try (BulkProcessor processor = BulkProcessor.builder(client()::bulk, listener)
.setConcurrentRequests(concurrentRequests).setBulkActions(bulkActions)
//set interval and size to high values
.setFlushInterval(TimeValue.timeValueHours(24)).setBulkSize(new ByteSizeValue(1, ByteSizeUnit.GB)).build()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ private void executeBulkRejectionLoad(BackoffPolicy backoffPolicy, boolean rejec
assertAcked(prepareCreate(INDEX_NAME));
ensureGreen();

BulkProcessor bulkProcessor = BulkProcessor.builder(client(), new BulkProcessor.Listener() {
BulkProcessor bulkProcessor = BulkProcessor.builder(client()::bulk, new BulkProcessor.Listener() {
@Override
public void beforeBulk(long executionId, BulkRequest request) {
// no op
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -485,7 +485,7 @@ public void afterBulk(long executionId, BulkRequest request, BulkResponse respon
public void afterBulk(long executionId, BulkRequest request, Throwable failure) {}
};
int bulkSize = between(1, 20);
BulkProcessor bulkProcessor = BulkProcessor.builder(leaderClient(), listener)
BulkProcessor bulkProcessor = BulkProcessor.builder(leaderClient()::bulk, listener)
.setBulkActions(bulkSize)
.setConcurrentRequests(4)
.build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.bootstrap.BootstrapCheck;
import org.elasticsearch.client.Client;
import org.elasticsearch.client.OriginSettingClient;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.metadata.IndexTemplateMetaData;
import org.elasticsearch.cluster.node.DiscoveryNodes;
Expand Down Expand Up @@ -53,7 +54,6 @@
import org.elasticsearch.threadpool.FixedExecutorBuilder;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.watcher.ResourceWatcherService;
import org.elasticsearch.xpack.core.ClientHelper;
import org.elasticsearch.xpack.core.XPackPlugin;
import org.elasticsearch.xpack.core.XPackSettings;
import org.elasticsearch.xpack.core.action.XPackInfoFeatureAction;
Expand Down Expand Up @@ -324,7 +324,7 @@ ScriptTransform.TYPE, new ScriptTransformFactory(scriptService),
final InputRegistry inputRegistry = new InputRegistry(inputFactories);
inputFactories.put(ChainInput.TYPE, new ChainInputFactory(inputRegistry));

bulkProcessor = BulkProcessor.builder(ClientHelper.clientWithOrigin(client, WATCHER_ORIGIN), new BulkProcessor.Listener() {
bulkProcessor = BulkProcessor.builder(new OriginSettingClient(client, WATCHER_ORIGIN)::bulk, new BulkProcessor.Listener() {
@Override
public void beforeBulk(long executionId, BulkRequest request) {
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,8 @@ public void init() {
when(client.settings()).thenReturn(settings);
when(threadPool.getThreadContext()).thenReturn(new ThreadContext(Settings.EMPTY));
parser = mock(TriggeredWatch.Parser.class);
BulkProcessor bulkProcessor = BulkProcessor.builder(client, listener).setConcurrentRequests(0).setBulkActions(1).build();
BulkProcessor bulkProcessor = BulkProcessor.
builder(client::bulk, listener).setConcurrentRequests(0).setBulkActions(1).build();
triggeredWatchStore = new TriggeredWatchStore(settings, client, parser, bulkProcessor);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ public void init() {
when(client.settings()).thenReturn(settings);
when(threadPool.getThreadContext()).thenReturn(new ThreadContext(settings));
BulkProcessor.Listener listener = mock(BulkProcessor.Listener.class);
BulkProcessor bulkProcessor = BulkProcessor.builder(client, listener).setConcurrentRequests(0).setBulkActions(1).build();
BulkProcessor bulkProcessor = BulkProcessor.builder(client::bulk, listener).setConcurrentRequests(0).setBulkActions(1).build();
historyStore = new HistoryStore(bulkProcessor);
}

Expand Down