Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Only start re-assigning persistent tasks if they are not already being reassigned #76258

Merged
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import java.io.Closeable;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;

/**
Expand All @@ -55,6 +56,7 @@ public class PersistentTasksClusterService implements ClusterStateListener, Clos
private final EnableAssignmentDecider enableDecider;
private final ThreadPool threadPool;
private final PeriodicRechecker periodicRechecker;
private final AtomicBoolean reassigningTasks = new AtomicBoolean(false);

public PersistentTasksClusterService(Settings settings, PersistentTasksExecutorRegistry registry, ClusterService clusterService,
ThreadPool threadPool) {
Expand Down Expand Up @@ -353,7 +355,10 @@ public void clusterChanged(ClusterChangedEvent event) {
/**
* Submit a cluster state update to reassign any persistent tasks that need reassigning
*/
private void reassignPersistentTasks() {
void reassignPersistentTasks() {
if (this.reassigningTasks.compareAndSet(false, true) == false) {
return;
}
clusterService.submitStateUpdateTask("reassign persistent tasks", new ClusterStateUpdateTask() {
@Override
public ClusterState execute(ClusterState currentState) {
Expand All @@ -362,6 +367,7 @@ public ClusterState execute(ClusterState currentState) {

@Override
public void onFailure(String source, Exception e) {
reassigningTasks.set(false);
logger.warn("failed to reassign persistent tasks", e);
if (e instanceof NotMasterException == false) {
// There must be a task that's worth rechecking because there was one
Expand All @@ -373,6 +379,7 @@ public void onFailure(String source, Exception e) {

@Override
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
reassigningTasks.set(false);
if (isAnyTaskUnassigned(newState.getMetadata().custom(PersistentTasksCustomMetadata.TYPE))) {
periodicRechecker.rescheduleIfNecessary();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
Expand All @@ -69,8 +70,13 @@
import static org.hamcrest.core.Is.is;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyString;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.atLeastOnce;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoMoreInteractions;
import static org.mockito.Mockito.when;

public class PersistentTasksClusterServiceTests extends ESTestCase {
Expand Down Expand Up @@ -429,7 +435,7 @@ public void testPeriodicRecheck() throws Exception {
nonClusterStateCondition = false;

boolean shouldSimulateFailure = randomBoolean();
ClusterService recheckTestClusterService = createRecheckTestClusterService(clusterState, shouldSimulateFailure);
ClusterService recheckTestClusterService = createStateUpdateClusterState(clusterState, shouldSimulateFailure);
PersistentTasksClusterService service = createService(recheckTestClusterService,
(params, candidateNodes, currentState) -> assignBasedOnNonClusterStateCondition(candidateNodes));

Expand Down Expand Up @@ -477,7 +483,7 @@ public void testPeriodicRecheckOffMaster() {
ClusterState clusterState = builder.metadata(metadata).nodes(nodes).build();
nonClusterStateCondition = false;

ClusterService recheckTestClusterService = createRecheckTestClusterService(clusterState, false);
ClusterService recheckTestClusterService = createStateUpdateClusterState(clusterState, false);
PersistentTasksClusterService service = createService(recheckTestClusterService,
(params, candidateNodes, currentState) -> assignBasedOnNonClusterStateCondition(candidateNodes));

Expand Down Expand Up @@ -639,7 +645,51 @@ public void testTasksNotAssignedToShuttingDownNodes() {
Sets.haveEmptyIntersection(shutdownNodes, nodesWithTasks));
}

private ClusterService createRecheckTestClusterService(ClusterState initialState, boolean shouldSimulateFailure) {
public void testReassignOnlyOnce() throws Exception {
CountDownLatch latch = new CountDownLatch(1);
ClusterState initialState = initialState();
ClusterState.Builder builder = ClusterState.builder(initialState);
PersistentTasksCustomMetadata.Builder tasks = PersistentTasksCustomMetadata.builder(
initialState.metadata().custom(PersistentTasksCustomMetadata.TYPE)
);
DiscoveryNodes.Builder nodes = DiscoveryNodes.builder(initialState.nodes());
addTestNodes(nodes, randomIntBetween(1, 3));
addTask(tasks, "assign_based_on_non_cluster_state_condition", null);
Metadata.Builder metadata = Metadata.builder(initialState.metadata()).putCustom(PersistentTasksCustomMetadata.TYPE, tasks.build());
ClusterState clusterState = builder.metadata(metadata).nodes(nodes).build();

boolean shouldSimulateFailure = randomBoolean();
ClusterService recheckTestClusterService = createStateUpdateClusterState(clusterState, shouldSimulateFailure, latch);
PersistentTasksClusterService service = createService(
recheckTestClusterService,
(params, candidateNodes, currentState) -> assignBasedOnNonClusterStateCondition(candidateNodes)
);
verify(recheckTestClusterService, atLeastOnce()).getClusterSettings();
verify(recheckTestClusterService, atLeastOnce()).addListener(any());
Thread t1 = new Thread(service::reassignPersistentTasks);
Thread t2 = new Thread(service::reassignPersistentTasks);
try {
t1.start();
t2.start();
// Make sure we have at least one reassign check before we count down the latch
assertBusy(() -> verify(recheckTestClusterService, atLeastOnce()).submitStateUpdateTask(eq("reassign persistent tasks"), any()));
} finally {
benwtrent marked this conversation as resolved.
Show resolved Hide resolved
latch.countDown();
t1.join();
t2.join();
service.reassignPersistentTasks();
// verify that our reassignment is possible again, here we have once from the previous reassignment in the `try` block
// And one from the line above once the other threads have joined
assertBusy(() -> verify(recheckTestClusterService, times(2)).submitStateUpdateTask(eq("reassign persistent tasks"), any()));
verifyNoMoreInteractions(recheckTestClusterService);
benwtrent marked this conversation as resolved.
Show resolved Hide resolved
}
}

private ClusterService createStateUpdateClusterState(ClusterState initialState, boolean shouldSimulateFailure) {
return createStateUpdateClusterState(initialState, shouldSimulateFailure, null);
}

private ClusterService createStateUpdateClusterState(ClusterState initialState, boolean shouldSimulateFailure, CountDownLatch await) {
AtomicBoolean testFailureNextTime = new AtomicBoolean(shouldSimulateFailure);
AtomicReference<ClusterState> state = new AtomicReference<>(initialState);
ClusterService recheckTestClusterService = mock(ClusterService.class);
Expand All @@ -651,6 +701,9 @@ private ClusterService createRecheckTestClusterService(ClusterState initialState
ClusterStateUpdateTask task = (ClusterStateUpdateTask) invocationOnMock.getArguments()[1];
ClusterState before = state.get();
ClusterState after = task.execute(before);
if (await != null) {
await.await();
}
if (testFailureNextTime.compareAndSet(true, false)) {
task.onFailure("testing failure", new RuntimeException("foo"));
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,11 @@

import java.util.ArrayList;
import java.util.Collection;
import java.util.LinkedList;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Objects;
import java.util.TreeMap;
import java.util.function.Function;
import java.util.stream.Collectors;

Expand Down Expand Up @@ -156,7 +156,7 @@ public PersistentTasksCustomMetadata.Assignment selectNode(Long estimatedMemoryF
logger.debug(reason);
return new PersistentTasksCustomMetadata.Assignment(null, reason);
}
List<String> reasons = new LinkedList<>();
Map<String, String> reasons = new TreeMap<>();
long maxAvailableMemory = Long.MIN_VALUE;
DiscoveryNode minLoadedNodeByMemory = null;
long requiredMemoryForJob = estimatedMemoryFootprint;
Expand All @@ -166,7 +166,7 @@ public PersistentTasksCustomMetadata.Assignment selectNode(Long estimatedMemoryF
String reason = nodeFilter.apply(node);
if (reason != null) {
logger.trace(reason);
reasons.add(reason);
reasons.put(node.getName(), reason);
continue;
}
NodeLoad currentLoad = nodeLoadDetector.detectNodeLoad(
Expand All @@ -180,7 +180,7 @@ public PersistentTasksCustomMetadata.Assignment selectNode(Long estimatedMemoryF
if (currentLoad.getError() != null) {
reason = createReason(jobId, nodeNameAndMlAttributes(node), currentLoad.getError());
logger.trace(reason);
reasons.add(reason);
reasons.put(node.getName(), reason);
continue;
}
// Assuming the node is eligible at all, check loading
Expand All @@ -194,7 +194,7 @@ public PersistentTasksCustomMetadata.Assignment selectNode(Long estimatedMemoryF
currentLoad.getNumAllocatingJobs(),
maxConcurrentJobAllocations);
logger.trace(reason);
reasons.add(reason);
reasons.put(node.getName(), reason);
continue;
}

Expand All @@ -206,7 +206,7 @@ public PersistentTasksCustomMetadata.Assignment selectNode(Long estimatedMemoryF
MAX_OPEN_JOBS_PER_NODE.getKey(),
maxNumberOfOpenJobs);
logger.trace(reason);
reasons.add(reason);
reasons.put(node.getName(), reason);
continue;
}

Expand All @@ -215,7 +215,7 @@ public PersistentTasksCustomMetadata.Assignment selectNode(Long estimatedMemoryF
nodeNameAndMlAttributes(node),
"This node is not providing accurate information to determine is load by memory.");
logger.trace(reason);
reasons.add(reason);
reasons.put(node.getName(),reason);
continue;
}

Expand All @@ -224,7 +224,7 @@ public PersistentTasksCustomMetadata.Assignment selectNode(Long estimatedMemoryF
nodeNameAndMlAttributes(node),
"This node is indicating that it has no native memory for machine learning.");
logger.trace(reason);
reasons.add(reason);
reasons.put(node.getName(),reason);
continue;
}

Expand All @@ -247,7 +247,7 @@ public PersistentTasksCustomMetadata.Assignment selectNode(Long estimatedMemoryF
requiredMemoryForJob,
ByteSizeValue.ofBytes(requiredMemoryForJob).toString());
logger.trace(reason);
reasons.add(reason);
reasons.put(node.getName(),reason);
continue;
}

Expand All @@ -260,15 +260,15 @@ public PersistentTasksCustomMetadata.Assignment selectNode(Long estimatedMemoryF
return createAssignment(
estimatedMemoryFootprint,
minLoadedNodeByMemory,
reasons,
reasons.values(),
maxNodeSize > 0L ?
NativeMemoryCalculator.allowedBytesForMl(maxNodeSize, maxMachineMemoryPercent, useAutoMemoryPercentage) :
Long.MAX_VALUE);
}

PersistentTasksCustomMetadata.Assignment createAssignment(long estimatedMemoryUsage,
DiscoveryNode minLoadedNode,
List<String> reasons,
Collection<String> reasons,
long biggestPossibleJob) {
if (minLoadedNode == null) {
String explanation = String.join("|", reasons);
Expand Down
Loading