Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reindex Throttle Checkpointing #46763

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,8 @@ private void execute(ReindexJob reindexJob) {
String taskId = getPersistentTaskId();
long allocationId = getAllocationId();
Consumer<BulkByScrollTask.Status> committedCallback = childTask::setCommittedStatus;
ReindexTaskStateUpdater taskUpdater = new ReindexTaskStateUpdater(reindexIndexClient, taskId, allocationId, committedCallback);
ReindexTaskStateUpdater taskUpdater = new ReindexTaskStateUpdater(reindexIndexClient, client.threadPool(), taskId, allocationId,
committedCallback);
taskUpdater.assign(new ActionListener<>() {
@Override
public void onResponse(ReindexTaskStateDoc stateDoc) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,11 @@
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.index.engine.VersionConflictEngineException;
import org.elasticsearch.threadpool.ThreadPool;

import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;

public class ReindexTaskStateUpdater implements Reindexer.CheckpointListener {
Expand All @@ -36,18 +38,20 @@ public class ReindexTaskStateUpdater implements Reindexer.CheckpointListener {
private static final Logger logger = LogManager.getLogger(ReindexTask.class);

private final ReindexIndexClient reindexIndexClient;
private final ThreadPool threadPool;
private final String persistentTaskId;
private final long allocationId;
private final Consumer<BulkByScrollTask.Status> committedCallback;
private final Semaphore semaphore = new Semaphore(1);
private ThrottlingConsumer<Tuple<ScrollableHitSource.Checkpoint, BulkByScrollTask.Status>> checkpointThrottler;

private int assignmentAttempts = 0;
private ReindexTaskState lastState;
private boolean isDone = false;
private AtomicBoolean isDone = new AtomicBoolean();

public ReindexTaskStateUpdater(ReindexIndexClient reindexIndexClient, String persistentTaskId, long allocationId,
public ReindexTaskStateUpdater(ReindexIndexClient reindexIndexClient, ThreadPool threadPool, String persistentTaskId, long allocationId,
Consumer<BulkByScrollTask.Status> committedCallback) {
this.reindexIndexClient = reindexIndexClient;
this.threadPool = threadPool;
this.persistentTaskId = persistentTaskId;
this.allocationId = allocationId;
// TODO: At some point I think we would like to replace a single universal callback to a listener that
Expand All @@ -70,7 +74,12 @@ public void onResponse(ReindexTaskState taskState) {
reindexIndexClient.updateReindexTaskDoc(persistentTaskId, newDoc, term, seqNo, new ActionListener<>() {
@Override
public void onResponse(ReindexTaskState newTaskState) {
assert checkpointThrottler == null;
lastState = newTaskState;
checkpointThrottler = new ThrottlingConsumer<>(
(t, whenDone) -> updateCheckpoint(t.v1(), t.v2(), whenDone),
newTaskState.getStateDoc().getReindexRequest().getCheckpointInterval(), System::nanoTime, threadPool
);
listener.onResponse(newTaskState.getStateDoc());
}

Expand Down Expand Up @@ -114,61 +123,58 @@ public void onFailure(Exception ex) {

@Override
public void onCheckpoint(ScrollableHitSource.Checkpoint checkpoint, BulkByScrollTask.Status status) {
// TODO: Need some kind of throttling here, no need to do this all the time.
// only do one checkpoint at a time, in case checkpointing is too slow.
if (semaphore.tryAcquire()) {
if (isDone) {
semaphore.release();
} else {
ReindexTaskStateDoc nextState = lastState.getStateDoc().withCheckpoint(checkpoint, status);
// TODO: This can fail due to conditional update. Need to hook into ability to cancel reindex process
long term = lastState.getPrimaryTerm();
long seqNo = lastState.getSeqNo();
reindexIndexClient.updateReindexTaskDoc(persistentTaskId, nextState, term, seqNo, new ActionListener<>() {
@Override
public void onResponse(ReindexTaskState taskState) {
lastState = taskState;
committedCallback.accept(status);
semaphore.release();
}

@Override
public void onFailure(Exception e) {
semaphore.release();
}
});
assert checkpointThrottler != null;
checkpointThrottler.accept(Tuple.tuple(checkpoint, status));
}

private void updateCheckpoint(ScrollableHitSource.Checkpoint checkpoint, BulkByScrollTask.Status status, Runnable whenDone) {
ReindexTaskStateDoc nextState = lastState.getStateDoc().withCheckpoint(checkpoint, status);
// TODO: This can fail due to conditional update. Need to hook into ability to cancel reindex process
long term = lastState.getPrimaryTerm();
long seqNo = lastState.getSeqNo();
reindexIndexClient.updateReindexTaskDoc(persistentTaskId, nextState, term, seqNo, new ActionListener<>() {
@Override
public void onResponse(ReindexTaskState taskState) {
lastState = taskState;
committedCallback.accept(status);
whenDone.run();
}
}

@Override
public void onFailure(Exception e) {
whenDone.run();
}
});
}

public void finish(@Nullable BulkByScrollResponse reindexResponse, @Nullable ElasticsearchException exception,
ActionListener<ReindexTaskStateDoc> listener) {
// TODO: Move to try acquire and a scheduled retry if there is currently contention
semaphore.acquireUninterruptibly();
if (isDone) {
semaphore.release();
assert checkpointThrottler != null;
if (isDone.compareAndSet(false, true) == false) {
listener.onFailure(new ElasticsearchException("Reindex task already finished locally"));
} else {
ReindexTaskStateDoc state = lastState.getStateDoc().withFinishedState(reindexResponse, exception);
isDone = true;
long term = lastState.getPrimaryTerm();
long seqNo = lastState.getSeqNo();
reindexIndexClient.updateReindexTaskDoc(persistentTaskId, state, term, seqNo, new ActionListener<>() {
@Override
public void onResponse(ReindexTaskState taskState) {
lastState = null;
semaphore.release();
listener.onResponse(taskState.getStateDoc());
checkpointThrottler.close(() -> writeFinishedState(reindexResponse, exception, listener));
}
}

}
private void writeFinishedState(@Nullable BulkByScrollResponse reindexResponse, @Nullable ElasticsearchException exception,
ActionListener<ReindexTaskStateDoc> listener) {
ReindexTaskStateDoc state = lastState.getStateDoc().withFinishedState(reindexResponse, exception);
long term = lastState.getPrimaryTerm();
long seqNo = lastState.getSeqNo();
reindexIndexClient.updateReindexTaskDoc(persistentTaskId, state, term, seqNo, new ActionListener<>() {
@Override
public void onResponse(ReindexTaskState taskState) {
lastState = null;
listener.onResponse(taskState.getStateDoc());

@Override
public void onFailure(Exception e) {
lastState = null;
semaphore.release();
listener.onFailure(e);
}
});
}
}

@Override
public void onFailure(Exception e) {
lastState = null;
listener.onFailure(e);
}
});
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,142 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.elasticsearch.index.reindex;

import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.threadpool.Scheduler;
import org.elasticsearch.threadpool.ThreadPool;

import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.function.LongSupplier;
import java.util.function.Supplier;

/**
* A throttling consumer that forwards input to an outbound consumer, but discarding any that arrive too quickly (but eventually sending the
* last state after the minimumInterval).
* The outbound consumer is only called by a single thread at a time. The outbound consumer should be non-blocking.
* TODO: could be moved to more generic place for reuse?
* @param <T> type of object passed through.
*/
public class ThrottlingConsumer<T> implements Consumer<T> {
private final ThreadPool threadPool;
private final BiConsumer<T, Runnable> outbound;
private final TimeValue minimumInterval;
private final Object lock = new Object();
private final LongSupplier nanoTimeSource;

// state protected by lock.
private long lastWriteTimeNanos;
private T value;
private Scheduler.ScheduledCancellable scheduledWrite;
private boolean outboundActive;
private boolean closed;
private Runnable onClosed;

public ThrottlingConsumer(BiConsumer<T, Runnable> outbound, TimeValue minimumInterval,
LongSupplier nanoTimeSource, ThreadPool threadPool) {
Supplier<ThreadContext.StoredContext> restorableContext = threadPool.getThreadContext().newRestorableContext(false);
this.outbound = (value, whenDone) -> {
try (ThreadContext.StoredContext ignored = restorableContext.get()) {
outbound.accept(value, whenDone);
}
};
this.minimumInterval = minimumInterval;
this.threadPool = threadPool;
this.nanoTimeSource = nanoTimeSource;
this.lastWriteTimeNanos = nanoTimeSource.getAsLong();
}

@Override
public void accept(T newValue) {
long now = nanoTimeSource.getAsLong();
synchronized (lock) {
if (closed) {
return;
}
this.value = newValue;
if (scheduledWrite == null) {
// schedule is non-blocking
scheduledWrite = threadPool.schedule(this::onScheduleTimeout, getDelay(now), ThreadPool.Names.SAME);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

schedule loses the thread context. Given that we are afterwards writing to an index (possibly running with security enabled), I wonder if that thread context is relevant.

}
}
}

private TimeValue getDelay(long now) {
long nanos = lastWriteTimeNanos + minimumInterval.nanos() - now;
return nanos < 0 ? TimeValue.ZERO : TimeValue.timeValueNanos(nanos);
}

private void onScheduleTimeout() {
T value;
long now = nanoTimeSource.getAsLong();
synchronized (lock) {
if (closed) {
return;
}
value = this.value;
lastWriteTimeNanos = now;
outboundActive = true;
}


outbound.accept(value, () -> {
synchronized (this) {
outboundActive = false;
if (closed == false) {
if (value != this.value) {
scheduledWrite = threadPool.schedule(this::onScheduleTimeout, minimumInterval,
ThreadPool.Names.SAME);
} else {
scheduledWrite = null;
}
}
}

// safe since onScheduleTimeout is only called single threaded
if (onClosed != null) {
onClosed.run();
}
});
}

/**
* Async close this. Any state submitted since last outbound call will be discarded (as well as any new inbound accept calls).
* @param onClosed called when closed, which guarantees no more calls on outbound consumer. Must be non-blocking.
*/
public void close(Runnable onClosed) {
synchronized (lock) {
assert closed == false : "multiple closes not supported";
closed = true;
if (scheduledWrite != null) {
if (outboundActive) {
this.onClosed = onClosed;
} else {
scheduledWrite.cancel();
}
scheduledWrite = null;
}
}
if (this.onClosed == null) {
onClosed.run();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@ public void testReindexFailover() throws Throwable {
ReindexRequestBuilder copy = reindex().source("source").destination("dest").refresh(true);
ReindexRequest reindexRequest = copy.request();
reindexRequest.setScroll(TimeValue.timeValueSeconds(scrollTimeout));
reindexRequest.setCheckpointInterval(TimeValue.timeValueMillis(100));
StartReindexJobAction.Request request = new StartReindexJobAction.Request(reindexRequest, false);

copy.source().setSize(10);
Expand Down Expand Up @@ -173,7 +174,8 @@ public void testReindexFailover() throws Throwable {
assertThat(seqNos.length(), greaterThan(docCount));
}
// The first 9 should not be replayed, we restart from at least seqNo 9.
assertThat(seqNos.length(), lessThan(Math.toIntExact(docCount + hitsAfterRestart - 9)));
assertThat("docCount: " + docCount + " hitsAfterRestart " + hitsAfterRestart, seqNos.length()-1,
lessThan(Math.toIntExact(docCount + hitsAfterRestart - 9)));

});

Expand Down
Loading