Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Only start re-assigning persistent tasks if they are not already being reassigned #76258

Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import java.io.Closeable;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;

/**
Expand All @@ -55,6 +56,7 @@ public class PersistentTasksClusterService implements ClusterStateListener, Clos
private final EnableAssignmentDecider enableDecider;
private final ThreadPool threadPool;
private final PeriodicRechecker periodicRechecker;
private final AtomicBoolean reassigningTasks = new AtomicBoolean(false);

public PersistentTasksClusterService(Settings settings, PersistentTasksExecutorRegistry registry, ClusterService clusterService,
ThreadPool threadPool) {
Expand Down Expand Up @@ -354,6 +356,9 @@ public void clusterChanged(ClusterChangedEvent event) {
* Submit a cluster state update to reassign any persistent tasks that need reassigning
*/
private void reassignPersistentTasks() {
if (this.reassigningTasks.compareAndSet(false, true) == false) {
return;
}
clusterService.submitStateUpdateTask("reassign persistent tasks", new ClusterStateUpdateTask() {
@Override
public ClusterState execute(ClusterState currentState) {
Expand All @@ -362,6 +367,7 @@ public ClusterState execute(ClusterState currentState) {

@Override
public void onFailure(String source, Exception e) {
reassigningTasks.set(false);
logger.warn("failed to reassign persistent tasks", e);
if (e instanceof NotMasterException == false) {
// There must be a task that's worth rechecking because there was one
Expand All @@ -373,6 +379,7 @@ public void onFailure(String source, Exception e) {

@Override
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
reassigningTasks.set(false);
if (isAnyTaskUnassigned(newState.getMetadata().custom(PersistentTasksCustomMetadata.TYPE))) {
periodicRechecker.rescheduleIfNecessary();
}
Expand Down