Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Don't assign persistent tasks to nodes shutting down #72260

Merged
merged 5 commits into from
Apr 28, 2021
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@

import org.elasticsearch.Version;
import org.elasticsearch.cluster.AbstractDiffable;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.Diff;
import org.elasticsearch.cluster.DiffableUtils;
import org.elasticsearch.cluster.NamedDiff;
Expand All @@ -27,6 +28,7 @@
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.TreeMap;
import java.util.function.Function;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -66,6 +68,12 @@ public static NamedDiff<Metadata.Custom> readDiffFrom(StreamInput in) throws IOE
return new NodeShutdownMetadataDiff(in);
}

public static Optional<NodesShutdownMetadata> getShutdowns(final ClusterState state) {
return Optional.ofNullable(state)
dakrone marked this conversation as resolved.
Show resolved Hide resolved
.map(ClusterState::metadata)
.map(m -> m.custom(TYPE));
}

private final Map<String, SingleNodeShutdownMetadata> nodes;

public NodesShutdownMetadata(Map<String, SingleNodeShutdownMetadata> nodes) {
Expand All @@ -84,7 +92,7 @@ public void writeTo(StreamOutput out) throws IOException {
/**
* @return A map of NodeID to shutdown metadata.
*/
public Map<String, SingleNodeShutdownMetadata> getAllNodeMetdataMap() {
public Map<String, SingleNodeShutdownMetadata> getAllNodeMetadataMap() {
return Collections.unmodifiableMap(nodes);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
Expand All @@ -33,6 +34,7 @@
import java.util.Set;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;

Expand Down Expand Up @@ -153,6 +155,14 @@ public ImmutableOpenMap<String, DiscoveryNode> getCoordinatingOnlyNodes() {
return nodes.build();
}

/**
* Return all the nodes as a collection
* @return
*/
public Collection<DiscoveryNode> getAllNodes() {
return StreamSupport.stream(this.spliterator(), false).collect(Collectors.toUnmodifiableList());
}

/**
* Returns a stream of all nodes, with master nodes at the front
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,11 @@
import org.elasticsearch.cluster.ClusterStateUpdateTask;
import org.elasticsearch.cluster.NotMasterException;
import org.elasticsearch.cluster.metadata.Metadata;
import org.elasticsearch.cluster.metadata.NodesShutdownMetadata;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Randomness;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
Expand All @@ -33,7 +35,9 @@
import org.elasticsearch.threadpool.ThreadPool;

import java.io.Closeable;
import java.util.List;
import java.util.Objects;
import java.util.stream.Collectors;

/**
* Component that runs only on the master node and is responsible for assigning running tasks to nodes
Expand All @@ -48,15 +52,15 @@ public class PersistentTasksClusterService implements ClusterStateListener, Clos

private final ClusterService clusterService;
private final PersistentTasksExecutorRegistry registry;
private final EnableAssignmentDecider decider;
private final EnableAssignmentDecider enableDecider;
private final ThreadPool threadPool;
private final PeriodicRechecker periodicRechecker;

public PersistentTasksClusterService(Settings settings, PersistentTasksExecutorRegistry registry, ClusterService clusterService,
ThreadPool threadPool) {
this.clusterService = clusterService;
this.registry = registry;
this.decider = new EnableAssignmentDecider(settings, clusterService.getClusterSettings());
this.enableDecider = new EnableAssignmentDecider(settings, clusterService.getClusterSettings());
this.threadPool = threadPool;
this.periodicRechecker = new PeriodicRechecker(CLUSTER_TASKS_ALLOCATION_RECHECK_INTERVAL_SETTING.get(settings));
if (DiscoveryNode.isMasterNode(settings)) {
Expand Down Expand Up @@ -298,12 +302,37 @@ private <Params extends PersistentTaskParams> Assignment createAssignment(final
final ClusterState currentState) {
PersistentTasksExecutor<Params> persistentTasksExecutor = registry.getPersistentTaskExecutorSafe(taskName);

AssignmentDecision decision = decider.canAssign();
AssignmentDecision decision = enableDecider.canAssign();
if (decision.getType() == AssignmentDecision.Type.NO) {
return unassignedAssignment("persistent task [" + taskName + "] cannot be assigned [" + decision.getReason() + "]");
}

return persistentTasksExecutor.getAssignment(taskParams, currentState);
// Filter all nodes that are marked as shutting down, because we do not
// want to assign a persistent task to a node that will shortly be
// leaving the cluster
final List<DiscoveryNode> candidateNodes = currentState.nodes().mastersFirstStream()
dakrone marked this conversation as resolved.
Show resolved Hide resolved
.filter(dn -> isNodeShuttingDown(currentState, dn.getId()) == false)
.collect(Collectors.toList());
// Task assignment should not rely on node order
Randomness.shuffle(candidateNodes);

final Assignment assignment = persistentTasksExecutor.getAssignment(taskParams, candidateNodes, currentState);
assert (assignment == null || isNodeShuttingDown(currentState, assignment.getExecutorNode()) == false) :
"expected task [" + taskName + "] to be assigned to a node that is not marked as shutting down, but " +
assignment.getExecutorNode() + " is currently marked as shutting down";
return assignment;
}

/**
* Returns true if the given node is marked as shutting down with any
* shutdown type.
*/
static boolean isNodeShuttingDown(final ClusterState state, final String nodeId) {
// Right now we make no distinction between the type of shutdown, but maybe in the future we might?
return NodesShutdownMetadata.getShutdowns(state)
.map(NodesShutdownMetadata::getAllNodeMetadataMap)
.map(allNodes -> allNodes.get(nodeId))
.isPresent();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import org.elasticsearch.persistent.PersistentTasksCustomMetadata.PersistentTask;
import org.elasticsearch.tasks.TaskId;

import java.util.Collection;
import java.util.Map;
import java.util.function.Predicate;

Expand All @@ -41,10 +42,10 @@ public String getTaskName() {
/**
* Returns the node id where the params has to be executed,
* <p>
* The default implementation returns the least loaded data node
* The default implementation returns the least loaded data node from amongst the collection of candidate nodes
*/
public Assignment getAssignment(Params params, ClusterState clusterState) {
DiscoveryNode discoveryNode = selectLeastLoadedNode(clusterState, DiscoveryNode::canContainData);
public Assignment getAssignment(Params params, Collection<DiscoveryNode> candidateNodes, ClusterState clusterState) {
DiscoveryNode discoveryNode = selectLeastLoadedNode(clusterState, candidateNodes, DiscoveryNode::canContainData);
if (discoveryNode == null) {
return NO_NODE_FOUND;
} else {
Expand All @@ -53,13 +54,16 @@ public Assignment getAssignment(Params params, ClusterState clusterState) {
}

/**
* Finds the least loaded node that satisfies the selector criteria
* Finds the least loaded node from amongs the candidate node collection
* that satisfies the selector criteria
*/
protected DiscoveryNode selectLeastLoadedNode(ClusterState clusterState, Predicate<DiscoveryNode> selector) {
protected DiscoveryNode selectLeastLoadedNode(ClusterState clusterState,
Collection<DiscoveryNode> candidateNodes,
Predicate<DiscoveryNode> selector) {
long minLoad = Long.MAX_VALUE;
DiscoveryNode minLoadedNode = null;
PersistentTasksCustomMetadata persistentTasks = clusterState.getMetadata().custom(PersistentTasksCustomMetadata.TYPE);
for (DiscoveryNode node : clusterState.getNodes()) {
for (DiscoveryNode node : candidateNodes) {
if (selector.test(node)) {
if (persistentTasks == null) {
// We don't have any task running yet, pick the first available node
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,8 @@ public void testInsertNewNodeShutdownMetadata() {

nodesShutdownMetadata = nodesShutdownMetadata.putSingleNodeMetadata(newNodeMetadata);

assertThat(nodesShutdownMetadata.getAllNodeMetdataMap().get(newNodeMetadata.getNodeId()), equalTo(newNodeMetadata));
assertThat(nodesShutdownMetadata.getAllNodeMetdataMap().values(), contains(newNodeMetadata));
assertThat(nodesShutdownMetadata.getAllNodeMetadataMap().get(newNodeMetadata.getNodeId()), equalTo(newNodeMetadata));
assertThat(nodesShutdownMetadata.getAllNodeMetadataMap().values(), contains(newNodeMetadata));
}

public void testRemoveShutdownMetadata() {
Expand All @@ -52,9 +52,9 @@ public void testRemoveShutdownMetadata() {
SingleNodeShutdownMetadata nodeToRemove = randomFrom(nodes);
nodesShutdownMetadata = nodesShutdownMetadata.removeSingleNodeMetadata(nodeToRemove.getNodeId());

assertThat(nodesShutdownMetadata.getAllNodeMetdataMap().get(nodeToRemove.getNodeId()), nullValue());
assertThat(nodesShutdownMetadata.getAllNodeMetdataMap().values(), hasSize(nodes.size() - 1));
assertThat(nodesShutdownMetadata.getAllNodeMetdataMap().values(), not(hasItem(nodeToRemove)));
assertThat(nodesShutdownMetadata.getAllNodeMetadataMap().get(nodeToRemove.getNodeId()), nullValue());
assertThat(nodesShutdownMetadata.getAllNodeMetadataMap().values(), hasSize(nodes.size() - 1));
assertThat(nodesShutdownMetadata.getAllNodeMetadataMap().values(), not(hasItem(nodeToRemove)));
}

@Override
Expand Down
Loading