diff --git a/server/src/main/java/org/opensearch/cluster/coordination/Coordinator.java b/server/src/main/java/org/opensearch/cluster/coordination/Coordinator.java index fbb345ea3a441..fd52f48c7b5f8 100644 --- a/server/src/main/java/org/opensearch/cluster/coordination/Coordinator.java +++ b/server/src/main/java/org/opensearch/cluster/coordination/Coordinator.java @@ -105,7 +105,7 @@ import java.util.stream.StreamSupport; import static org.opensearch.cluster.coordination.NoClusterManagerBlockService.NO_CLUSTER_MANAGER_BLOCK_ID; -import static org.opensearch.cluster.decommission.DecommissionService.nodeCommissioned; +import static org.opensearch.cluster.decommission.DecommissionHelper.nodeCommissioned; import static org.opensearch.gateway.ClusterStateUpdaters.hideStateIfNotRecovered; import static org.opensearch.gateway.GatewayService.STATE_NOT_RECOVERED_BLOCK; import static org.opensearch.monitor.StatusInfo.Status.UNHEALTHY; diff --git a/server/src/main/java/org/opensearch/cluster/coordination/JoinTaskExecutor.java b/server/src/main/java/org/opensearch/cluster/coordination/JoinTaskExecutor.java index 4cb6c7b255449..b9c64a66d73b0 100644 --- a/server/src/main/java/org/opensearch/cluster/coordination/JoinTaskExecutor.java +++ b/server/src/main/java/org/opensearch/cluster/coordination/JoinTaskExecutor.java @@ -60,7 +60,7 @@ import java.util.function.BiConsumer; import java.util.stream.Collectors; -import static org.opensearch.cluster.decommission.DecommissionService.nodeCommissioned; +import static org.opensearch.cluster.decommission.DecommissionHelper.nodeCommissioned; import static org.opensearch.gateway.GatewayService.STATE_NOT_RECOVERED_BLOCK; /** diff --git a/server/src/main/java/org/opensearch/cluster/decommission/DecommissionHelper.java b/server/src/main/java/org/opensearch/cluster/decommission/DecommissionHelper.java new file mode 100644 index 0000000000000..818c7d5c074f3 --- /dev/null +++ b/server/src/main/java/org/opensearch/cluster/decommission/DecommissionHelper.java @@ -0,0 +1,108 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.cluster.decommission; + +import org.opensearch.action.admin.cluster.configuration.AddVotingConfigExclusionsRequest; +import org.opensearch.cluster.ClusterState; +import org.opensearch.cluster.coordination.CoordinationMetadata.VotingConfigExclusion; +import org.opensearch.cluster.metadata.Metadata; +import org.opensearch.cluster.node.DiscoveryNode; +import org.opensearch.common.Strings; +import org.opensearch.common.unit.TimeValue; + +import java.util.HashSet; +import java.util.Iterator; +import java.util.Set; + +import static org.opensearch.action.admin.cluster.configuration.AddVotingConfigExclusionsHelper.resolveVotingConfigExclusionsAndCheckMaximum; +import static org.opensearch.action.admin.cluster.configuration.AddVotingConfigExclusionsHelper.updateExclusionAndGetState; + +/** + * Static helper utilities to execute decommission + */ +public class DecommissionHelper { + + static ClusterState registerDecommissionAttributeInClusterState(ClusterState currentState, DecommissionAttribute decommissionAttribute) { + DecommissionAttributeMetadata decommissionAttributeMetadata = new DecommissionAttributeMetadata(decommissionAttribute); + return ClusterState.builder(currentState) + .metadata(Metadata.builder(currentState.metadata()).decommissionAttributeMetadata(decommissionAttributeMetadata)) + .build(); + } + + static ClusterState addVotingConfigExclusionsForToBeDecommissionedClusterManagerNodes( + ClusterState currentState, + Set nodeIdsToBeExcluded, + TimeValue decommissionActionTimeout + ) { + AddVotingConfigExclusionsRequest request = new AddVotingConfigExclusionsRequest( + Strings.EMPTY_ARRAY, + nodeIdsToBeExcluded.toArray(String[]::new), + Strings.EMPTY_ARRAY, + decommissionActionTimeout + ); + // TODO - update max count + Set resolvedExclusion = resolveVotingConfigExclusionsAndCheckMaximum(request, currentState, 10); + return updateExclusionAndGetState(currentState, resolvedExclusion, 10); + } + + static Set filterNodesWithDecommissionAttribute( + ClusterState clusterState, + DecommissionAttribute decommissionAttribute, + boolean onlyClusterManagerNodes + ) { + Set nodesWithDecommissionAttribute = new HashSet<>(); + Iterator nodesIter = onlyClusterManagerNodes + ? clusterState.nodes().getClusterManagerNodes().valuesIt() + : clusterState.nodes().getNodes().valuesIt(); + + while (nodesIter.hasNext()) { + final DiscoveryNode node = nodesIter.next(); + if (nodeHasDecommissionedAttribute(node, decommissionAttribute)) { + nodesWithDecommissionAttribute.add(node); + } + } + return nodesWithDecommissionAttribute; + } + + /** + * Utility method to check if the node has decommissioned attribute + * + * @param discoveryNode node to check on + * @param decommissionAttribute attribute to be checked with + * @return true or false based on whether node has decommissioned attribute + */ + public static boolean nodeHasDecommissionedAttribute(DiscoveryNode discoveryNode, DecommissionAttribute decommissionAttribute) { + String nodeAttributeValue = discoveryNode.getAttributes().get(decommissionAttribute.attributeName()); + return nodeAttributeValue != null && nodeAttributeValue.equals(decommissionAttribute.attributeValue()); + } + + /** + * Utility method to check if the node is commissioned or not + * + * @param discoveryNode node to check on + * @param metadata metadata present current which will be used to check the commissioning status of the node + * @return if the node is commissioned or not + */ + public static boolean nodeCommissioned(DiscoveryNode discoveryNode, Metadata metadata) { + DecommissionAttributeMetadata decommissionAttributeMetadata = metadata.decommissionAttributeMetadata(); + if (decommissionAttributeMetadata != null) { + DecommissionAttribute decommissionAttribute = decommissionAttributeMetadata.decommissionAttribute(); + DecommissionStatus status = decommissionAttributeMetadata.status(); + if (decommissionAttribute != null && status != null) { + if (nodeHasDecommissionedAttribute(discoveryNode, decommissionAttribute) + && (status.equals(DecommissionStatus.IN_PROGRESS) + || status.equals(DecommissionStatus.SUCCESSFUL) + || status.equals(DecommissionStatus.DRAINING))) { + return false; + } + } + } + return true; + } +} diff --git a/server/src/main/java/org/opensearch/cluster/decommission/DecommissionService.java b/server/src/main/java/org/opensearch/cluster/decommission/DecommissionService.java index 85030a1e902db..ed57bb19105aa 100644 --- a/server/src/main/java/org/opensearch/cluster/decommission/DecommissionService.java +++ b/server/src/main/java/org/opensearch/cluster/decommission/DecommissionService.java @@ -18,6 +18,7 @@ import org.opensearch.action.admin.cluster.decommission.awareness.put.DecommissionRequest; import org.opensearch.cluster.ClusterState; import org.opensearch.cluster.ClusterStateObserver; +import org.opensearch.cluster.ClusterStateObserver.Listener; import org.opensearch.cluster.ClusterStateUpdateTask; import org.opensearch.cluster.NotClusterManagerException; import org.opensearch.cluster.metadata.Metadata; @@ -35,14 +36,16 @@ import org.opensearch.transport.TransportService; import java.util.HashMap; -import java.util.HashSet; -import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Set; import java.util.function.Predicate; import java.util.stream.Collectors; +import static org.opensearch.cluster.decommission.DecommissionHelper.addVotingConfigExclusionsForToBeDecommissionedClusterManagerNodes; +import static org.opensearch.cluster.decommission.DecommissionHelper.filterNodesWithDecommissionAttribute; +import static org.opensearch.cluster.decommission.DecommissionHelper.nodeHasDecommissionedAttribute; +import static org.opensearch.cluster.decommission.DecommissionHelper.registerDecommissionAttributeInClusterState; import static org.opensearch.cluster.routing.allocation.decider.AwarenessAllocationDecider.CLUSTER_ROUTING_ALLOCATION_AWARENESS_ATTRIBUTE_SETTING; import static org.opensearch.cluster.routing.allocation.decider.AwarenessAllocationDecider.CLUSTER_ROUTING_ALLOCATION_AWARENESS_FORCE_GROUP_SETTING; @@ -126,6 +129,9 @@ public void startDecommissionAction( final DecommissionAttribute decommissionAttribute = decommissionRequest.getDecommissionAttribute(); // register the metadata with status as INIT as first step clusterService.submitStateUpdateTask("decommission [" + decommissionAttribute + "]", new ClusterStateUpdateTask(Priority.URGENT) { + + private Set nodeIdsToBeExcluded; + @Override public ClusterState execute(ClusterState currentState) { // validates if correct awareness attributes and forced awareness attribute set to the cluster before starting action @@ -135,11 +141,19 @@ public ClusterState execute(ClusterState currentState) { ensureEligibleRequest(decommissionAttributeMetadata, decommissionAttribute); // ensure attribute is weighed away ensureToBeDecommissionedAttributeWeighedAway(currentState, decommissionAttribute); - decommissionAttributeMetadata = new DecommissionAttributeMetadata(decommissionAttribute); - logger.info("registering decommission metadata [{}] to execute action", decommissionAttributeMetadata.toString()); - return ClusterState.builder(currentState) - .metadata(Metadata.builder(currentState.metadata()).decommissionAttributeMetadata(decommissionAttributeMetadata)) - .build(); + ClusterState newState = registerDecommissionAttributeInClusterState(currentState, decommissionAttribute); + Set clusterManagerNodesToBeDecommissioned = filterNodesWithDecommissionAttribute( + currentState, decommissionAttribute, true + ); + logger.info( + "resolved cluster manager eligible nodes [{}] that should be added to Voting Config Exclusion", + clusterManagerNodesToBeDecommissioned.toString() + ); + // add all 'to-be-decommissioned' cluster manager eligible nodes to voting config exclusion + nodeIdsToBeExcluded = clusterManagerNodesToBeDecommissioned.stream().map(DiscoveryNode::getId).collect(Collectors.toSet()); + newState = addVotingConfigExclusionsForToBeDecommissionedClusterManagerNodes(newState, nodeIdsToBeExcluded, TimeValue.timeValueSeconds(30)); + logger.info("registering decommission metadata [{}] to execute action", newState.metadata().decommissionAttributeMetadata().toString()); + return newState; } @Override @@ -163,155 +177,103 @@ public void clusterStateProcessed(String source, ClusterState oldState, ClusterS decommissionAttributeMetadata.decommissionAttribute(), decommissionAttributeMetadata.status() ); - decommissionClusterManagerNodes(decommissionRequest, listener); - } - }); - } - - private synchronized void decommissionClusterManagerNodes( - final DecommissionRequest decommissionRequest, - ActionListener listener - ) { - final DecommissionAttribute decommissionAttribute = decommissionRequest.getDecommissionAttribute(); - ClusterState state = clusterService.getClusterApplierService().state(); - // since here metadata is already registered with INIT, we can guarantee that no new node with decommission attribute can further - // join the cluster - // and hence in further request lifecycle we are sure that no new to-be-decommission leader will join the cluster - Set clusterManagerNodesToBeDecommissioned = filterNodesWithDecommissionAttribute(state, decommissionAttribute, true); - logger.info( - "resolved cluster manager eligible nodes [{}] that should be removed from Voting Configuration", - clusterManagerNodesToBeDecommissioned.toString() - ); - - // remove all 'to-be-decommissioned' cluster manager eligible nodes from voting config - Set nodeIdsToBeExcluded = clusterManagerNodesToBeDecommissioned.stream() - .map(DiscoveryNode::getId) - .collect(Collectors.toSet()); - - final Predicate allNodesRemovedAndAbdicated = clusterState -> { - final Set votingConfigNodeIds = clusterState.getLastCommittedConfiguration().getNodeIds(); - return nodeIdsToBeExcluded.stream().noneMatch(votingConfigNodeIds::contains) - && nodeIdsToBeExcluded.contains(clusterState.nodes().getClusterManagerNodeId()) == false - && clusterState.nodes().getClusterManagerNodeId() != null; - }; - ActionListener exclusionListener = new ActionListener() { - @Override - public void onResponse(Void unused) { - if (clusterService.getClusterApplierService().state().nodes().isLocalNodeElectedClusterManager()) { - if (nodeHasDecommissionedAttribute(clusterService.localNode(), decommissionAttribute)) { - // this is an unexpected state, as after exclusion of nodes having decommission attribute, - // this local node shouldn't have had the decommission attribute. Will send the failure response to the user - String errorMsg = - "unexpected state encountered [local node is to-be-decommissioned leader] while executing decommission request"; - logger.error(errorMsg); - // will go ahead and clear the voting config and mark the status as false - clearVotingConfigExclusionAndUpdateStatus(false, false); - // we can send the failure response to the user here - listener.onFailure(new IllegalStateException(errorMsg)); - } else { - logger.info("will attempt to fail decommissioned nodes as local node is eligible to process the request"); - // we are good here to send the response now as the request is processed by an eligible active leader - // and to-be-decommissioned cluster manager is no more part of Voting Configuration and no more to-be-decommission - // nodes can be part of Voting Config - listener.onResponse(new DecommissionResponse(true)); - drainNodesWithDecommissionedAttribute(decommissionRequest); - } - } else { - // explicitly calling listener.onFailure with NotClusterManagerException as the local node is not the cluster manager - // this will ensures that request is retried until cluster manager times out - logger.info( - "local node is not eligible to process the request, " - + "throwing NotClusterManagerException to attempt a retry on an eligible node" - ); - listener.onFailure( - new NotClusterManagerException( - "node [" - + transportService.getLocalNode().toString() - + "] not eligible to execute decommission request. Will retry until timeout." - ) - ); - } - } - - @Override - public void onFailure(Exception e) { - listener.onFailure(e); - // attempting to mark the status as FAILED - clearVotingConfigExclusionAndUpdateStatus(false, false); - } - }; + final ClusterStateObserver observer = new ClusterStateObserver( + clusterService, + TimeValue.timeValueSeconds(30), // TODO update + logger, + threadPool.getThreadContext() + ); - if (allNodesRemovedAndAbdicated.test(state)) { - exclusionListener.onResponse(null); - } else { - logger.debug("sending transport request to remove nodes [{}] from voting config", nodeIdsToBeExcluded.toString()); - // send a transport request to exclude to-be-decommissioned cluster manager eligible nodes from voting config - decommissionController.excludeDecommissionedNodesFromVotingConfig(nodeIdsToBeExcluded, new ActionListener() { - @Override - public void onResponse(Void unused) { - logger.info( - "successfully removed decommissioned cluster manager eligible nodes [{}] from voting config ", - clusterManagerNodesToBeDecommissioned.toString() - ); - final ClusterStateObserver abdicationObserver = new ClusterStateObserver( - clusterService, - TimeValue.timeValueSeconds(60L), - logger, - threadPool.getThreadContext() - ); - final ClusterStateObserver.Listener abdicationListener = new ClusterStateObserver.Listener() { - @Override - public void onNewClusterState(ClusterState state) { - logger.debug("to-be-decommissioned node is no more the active leader"); - exclusionListener.onResponse(null); - } + final Predicate allNodesRemovedAndAbdicated = clusterState -> { + final Set votingConfigNodeIds = clusterState.getLastCommittedConfiguration().getNodeIds(); + return nodeIdsToBeExcluded.stream().noneMatch(votingConfigNodeIds::contains) // nodes are excluded from voting config + && clusterState.nodes().getClusterManagerNodeId() != null // a master is elected + && nodeIdsToBeExcluded.contains(clusterState.nodes().getClusterManagerNodeId()) == false; // none of excluded node is elected master - @Override - public void onClusterServiceClose() { - String errorMsg = "cluster service closed while waiting for abdication of to-be-decommissioned leader"; - logger.warn(errorMsg); - listener.onFailure(new DecommissioningFailedException(decommissionAttribute, errorMsg)); - } + }; - @Override - public void onTimeout(TimeValue timeout) { - logger.info("timed out while waiting for abdication of to-be-decommissioned leader"); - clearVotingConfigExclusionAndUpdateStatus(false, false); + final Listener clusterStateListener = new Listener() { + @Override + public void onNewClusterState(ClusterState state) { + logger.info( + "successfully removed decommissioned cluster manager eligible nodes [{}] from voting config ", + nodeIdsToBeExcluded.toString() + ); + if (state.nodes().isLocalNodeElectedClusterManager()) { + if (nodeHasDecommissionedAttribute(clusterService.localNode(), decommissionAttribute)) { + // this is an unexpected state, as after exclusion of nodes having decommission attribute, + // this local node shouldn't have had the decommission attribute. Will send the failure response to the user + String errorMsg = + "unexpected state encountered [local node is to-be-decommissioned leader] while executing decommission request"; + logger.error(errorMsg); + // will go ahead and clear the voting config and mark the status as false + clearVotingConfigExclusionAndUpdateStatus(false, false); + // we can send the failure response to the user here + listener.onFailure(new IllegalStateException(errorMsg)); + } else { + logger.info("will attempt to fail decommissioned nodes as local node is eligible to process the request"); + // we are good here to send the response now as the request is processed by an eligible active leader + // and to-be-decommissioned cluster manager is no more part of Voting Configuration and no more to-be-decommission + // nodes can be part of Voting Config + listener.onResponse(new DecommissionResponse(true)); + drainNodesWithDecommissionedAttribute(decommissionRequest); + } + } else { + // explicitly calling listener.onFailure with NotClusterManagerException as the local node is not the cluster manager + // this will ensures that request is retried until cluster manager times out + logger.info( + "local node is not eligible to process the request, " + + "throwing NotClusterManagerException to attempt a retry on an eligible node" + ); listener.onFailure( - new OpenSearchTimeoutException( - "timed out [{}] while waiting for abdication of to-be-decommissioned leader", - timeout.toString() + new NotClusterManagerException( + "node [" + + transportService.getLocalNode().toString() + + "] not eligible to execute decommission request. Will retry until timeout." ) ); } - }; - // In case the cluster state is already processed even before this code is executed - // therefore testing first before attaching the listener - ClusterState currentState = clusterService.getClusterApplierService().state(); - if (allNodesRemovedAndAbdicated.test(currentState)) { - abdicationListener.onNewClusterState(currentState); - } else { - logger.debug("waiting to abdicate to-be-decommissioned leader"); - abdicationObserver.waitForNextChange(abdicationListener, allNodesRemovedAndAbdicated); } - } - @Override - public void onFailure(Exception e) { - logger.error( - new ParameterizedMessage( - "failure in removing to-be-decommissioned cluster manager eligible nodes [{}] from voting config", + @Override + public void onClusterServiceClose() { + String errorMsg = "cluster service closed while waiting for abdication of to-be-decommissioned leader"; + logger.warn(errorMsg); + listener.onFailure(new DecommissioningFailedException(decommissionAttribute, errorMsg)); + } + + @Override + public void onTimeout(TimeValue timeout) { + logger.error( + "timed out [{}] while removing to-be-decommissioned cluster manager eligible nodes [{}] from voting config", + timeout.toString(), nodeIdsToBeExcluded.toString() - ), - e - ); - exclusionListener.onFailure(e); + ); + listener.onFailure( + new OpenSearchTimeoutException( + "timed out [{}] while removing to-be-decommissioned cluster manager eligible nodes [{}] from voting config", + timeout.toString(), + nodeIdsToBeExcluded.toString() + ) + ); + clearVotingConfigExclusionAndUpdateStatus(false, false); + } + }; + + // In case the cluster state is already processed even before this code is executed + // therefore testing first before attaching the listener + if (allNodesRemovedAndAbdicated.test(newState)) { + clusterStateListener.onNewClusterState(newState); + } else { + logger.debug("waiting to abdicate to-be-decommissioned leader"); + observer.waitForNextChange(clusterStateListener, allNodesRemovedAndAbdicated); } - }); - } + } + }); } + // TODO - after registering the new status check if any node which is not excluded still present in decommissioned zone. If yes, start the action again (retry) void drainNodesWithDecommissionedAttribute(DecommissionRequest decommissionRequest) { ClusterState state = clusterService.getClusterApplierService().state(); Set decommissionedNodes = filterNodesWithDecommissionAttribute( @@ -434,25 +396,6 @@ public void onFailure(Exception e) { }, waitForRemoval); } - private Set filterNodesWithDecommissionAttribute( - ClusterState clusterState, - DecommissionAttribute decommissionAttribute, - boolean onlyClusterManagerNodes - ) { - Set nodesWithDecommissionAttribute = new HashSet<>(); - Iterator nodesIter = onlyClusterManagerNodes - ? clusterState.nodes().getClusterManagerNodes().valuesIt() - : clusterState.nodes().getNodes().valuesIt(); - - while (nodesIter.hasNext()) { - final DiscoveryNode node = nodesIter.next(); - if (nodeHasDecommissionedAttribute(node, decommissionAttribute)) { - nodesWithDecommissionAttribute.add(node); - } - } - return nodesWithDecommissionAttribute; - } - private static void validateAwarenessAttribute( final DecommissionAttribute decommissionAttribute, List awarenessAttributes, @@ -617,40 +560,4 @@ public void clusterStateProcessed(String source, ClusterState oldState, ClusterS } }); } - - /** - * Utility method to check if the node has decommissioned attribute - * - * @param discoveryNode node to check on - * @param decommissionAttribute attribute to be checked with - * @return true or false based on whether node has decommissioned attribute - */ - public static boolean nodeHasDecommissionedAttribute(DiscoveryNode discoveryNode, DecommissionAttribute decommissionAttribute) { - String nodeAttributeValue = discoveryNode.getAttributes().get(decommissionAttribute.attributeName()); - return nodeAttributeValue != null && nodeAttributeValue.equals(decommissionAttribute.attributeValue()); - } - - /** - * Utility method to check if the node is commissioned or not - * - * @param discoveryNode node to check on - * @param metadata metadata present current which will be used to check the commissioning status of the node - * @return if the node is commissioned or not - */ - public static boolean nodeCommissioned(DiscoveryNode discoveryNode, Metadata metadata) { - DecommissionAttributeMetadata decommissionAttributeMetadata = metadata.decommissionAttributeMetadata(); - if (decommissionAttributeMetadata != null) { - DecommissionAttribute decommissionAttribute = decommissionAttributeMetadata.decommissionAttribute(); - DecommissionStatus status = decommissionAttributeMetadata.status(); - if (decommissionAttribute != null && status != null) { - if (nodeHasDecommissionedAttribute(discoveryNode, decommissionAttribute) - && (status.equals(DecommissionStatus.IN_PROGRESS) - || status.equals(DecommissionStatus.SUCCESSFUL) - || status.equals(DecommissionStatus.DRAINING))) { - return false; - } - } - } - return true; - } }