-
Notifications
You must be signed in to change notification settings - Fork 1.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fix decommission status update to non leader nodes #4800
Changes from 7 commits
ce226bd
1c97e42
9df0e7d
83ea615
1826003
edf8797
24263c5
0e89f8b
dd8b4ec
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,164 @@ | ||
/* | ||
* SPDX-License-Identifier: Apache-2.0 | ||
* | ||
* The OpenSearch Contributors require contributions made to | ||
* this file be licensed under the Apache-2.0 license or a | ||
* compatible open source license. | ||
*/ | ||
|
||
package org.opensearch.cluster.coordination; | ||
|
||
import org.apache.logging.log4j.LogManager; | ||
import org.apache.logging.log4j.Logger; | ||
import org.junit.After; | ||
import org.opensearch.action.admin.cluster.decommission.awareness.delete.DeleteDecommissionStateAction; | ||
import org.opensearch.action.admin.cluster.decommission.awareness.delete.DeleteDecommissionStateRequest; | ||
import org.opensearch.action.admin.cluster.decommission.awareness.delete.DeleteDecommissionStateResponse; | ||
import org.opensearch.action.admin.cluster.decommission.awareness.get.GetDecommissionStateAction; | ||
import org.opensearch.action.admin.cluster.decommission.awareness.get.GetDecommissionStateRequest; | ||
import org.opensearch.action.admin.cluster.decommission.awareness.get.GetDecommissionStateResponse; | ||
import org.opensearch.action.admin.cluster.decommission.awareness.put.DecommissionAction; | ||
import org.opensearch.action.admin.cluster.decommission.awareness.put.DecommissionRequest; | ||
import org.opensearch.action.admin.cluster.decommission.awareness.put.DecommissionResponse; | ||
import org.opensearch.cluster.ClusterState; | ||
import org.opensearch.cluster.decommission.DecommissionAttribute; | ||
import org.opensearch.cluster.decommission.DecommissionStatus; | ||
import org.opensearch.cluster.node.DiscoveryNode; | ||
import org.opensearch.cluster.node.DiscoveryNodeRole; | ||
import org.opensearch.cluster.service.ClusterService; | ||
import org.opensearch.common.Priority; | ||
import org.opensearch.common.settings.Settings; | ||
import org.opensearch.common.unit.TimeValue; | ||
import org.opensearch.plugins.Plugin; | ||
import org.opensearch.test.OpenSearchIntegTestCase; | ||
import org.opensearch.test.transport.MockTransportService; | ||
|
||
import java.util.Collection; | ||
import java.util.Collections; | ||
import java.util.Iterator; | ||
import java.util.List; | ||
import java.util.concurrent.ExecutionException; | ||
|
||
import static org.opensearch.test.NodeRoles.onlyRole; | ||
import static org.opensearch.test.OpenSearchIntegTestCase.client; | ||
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertNoTimeout; | ||
|
||
@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0) | ||
public class AwarenessAttributeDecommissionIT extends OpenSearchIntegTestCase { | ||
private final Logger logger = LogManager.getLogger(AwarenessAttributeDecommissionIT.class); | ||
|
||
@Override | ||
protected Collection<Class<? extends Plugin>> nodePlugins() { | ||
return Collections.singletonList(MockTransportService.TestPlugin.class); | ||
} | ||
|
||
@After | ||
public void cleanup() throws Exception { | ||
assertNoTimeout(client().admin().cluster().prepareHealth().get()); | ||
} | ||
|
||
public void testDecommissionStatusUpdatePublishedToAllNodes() throws ExecutionException, InterruptedException { | ||
Settings commonSettings = Settings.builder() | ||
.put("cluster.routing.allocation.awareness.attributes", "zone") | ||
.put("cluster.routing.allocation.awareness.force.zone.values", "a,b,c") | ||
.build(); | ||
|
||
logger.info("--> start 3 cluster manager nodes on zones 'a' & 'b' & 'c'"); | ||
List<String> clusterManagerNodes = internalCluster().startNodes( | ||
Settings.builder() | ||
.put(commonSettings) | ||
.put("node.attr.zone", "a") | ||
.put(onlyRole(commonSettings, DiscoveryNodeRole.CLUSTER_MANAGER_ROLE)) | ||
.build(), | ||
Settings.builder() | ||
.put(commonSettings) | ||
.put("node.attr.zone", "b") | ||
.put(onlyRole(commonSettings, DiscoveryNodeRole.CLUSTER_MANAGER_ROLE)) | ||
.build(), | ||
Settings.builder() | ||
.put(commonSettings) | ||
.put("node.attr.zone", "c") | ||
.put(onlyRole(commonSettings, DiscoveryNodeRole.CLUSTER_MANAGER_ROLE)) | ||
.build() | ||
); | ||
|
||
logger.info("--> start 3 data nodes on zones 'a' & 'b' & 'c'"); | ||
List<String> dataNodes = internalCluster().startNodes( | ||
Settings.builder() | ||
.put(commonSettings) | ||
.put("node.attr.zone", "a") | ||
.put(onlyRole(commonSettings, DiscoveryNodeRole.DATA_ROLE)) | ||
.build(), | ||
Settings.builder() | ||
.put(commonSettings) | ||
.put("node.attr.zone", "b") | ||
.put(onlyRole(commonSettings, DiscoveryNodeRole.DATA_ROLE)) | ||
.build(), | ||
Settings.builder() | ||
.put(commonSettings) | ||
.put("node.attr.zone", "c") | ||
.put(onlyRole(commonSettings, DiscoveryNodeRole.DATA_ROLE)) | ||
.build() | ||
); | ||
|
||
ensureStableCluster(6); | ||
|
||
logger.info("--> starting decommissioning nodes in zone {}", 'c'); | ||
DecommissionAttribute decommissionAttribute = new DecommissionAttribute("zone", "c"); | ||
DecommissionRequest decommissionRequest = new DecommissionRequest(decommissionAttribute); | ||
DecommissionResponse decommissionResponse = client().execute(DecommissionAction.INSTANCE, decommissionRequest).get(); | ||
assertTrue(decommissionResponse.isAcknowledged()); | ||
|
||
// Will wait for all events to complete | ||
client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).get(); | ||
|
||
// assert that decommission status is successful | ||
GetDecommissionStateResponse response = client().execute(GetDecommissionStateAction.INSTANCE, new GetDecommissionStateRequest()) | ||
.get(); | ||
assertEquals(response.getDecommissionedAttribute(), decommissionAttribute); | ||
assertEquals(response.getDecommissionStatus(), DecommissionStatus.SUCCESSFUL); | ||
|
||
ClusterState clusterState = client(clusterManagerNodes.get(0)).admin().cluster().prepareState().execute().actionGet().getState(); | ||
assertEquals(4, clusterState.nodes().getSize()); | ||
|
||
// assert status on nodes that are part of cluster currently | ||
Iterator<DiscoveryNode> discoveryNodeIterator = clusterState.nodes().getNodes().valuesIt(); | ||
while (discoveryNodeIterator.hasNext()) { | ||
// assert no node has decommissioned attribute | ||
DiscoveryNode node = discoveryNodeIterator.next(); | ||
assertNotEquals(node.getAttributes().get("zone"), "c"); | ||
|
||
// assert all the nodes has status as SUCCESSFUL | ||
ClusterService localNodeClusterService = internalCluster().getInstance(ClusterService.class, node.getName()); | ||
assertEquals( | ||
localNodeClusterService.state().metadata().decommissionAttributeMetadata().status(), | ||
DecommissionStatus.SUCCESSFUL | ||
); | ||
} | ||
|
||
// assert status on decommissioned node | ||
// Here we will verify that until it got kicked out, it received appropriate status updates | ||
// decommissioned nodes hence will have status as IN_PROGRESS as it will be kicked out later after this | ||
// and won't receive status update to SUCCESSFUL | ||
String randomDecommissionedNode = randomFrom(clusterManagerNodes.get(2), dataNodes.get(2)); | ||
ClusterService decommissionedNodeClusterService = internalCluster().getInstance(ClusterService.class, randomDecommissionedNode); | ||
assertEquals( | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This assert is done with assumption that decommission can take some time and during this check status would mostly be in progress? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. No, when the decommissioned nodes were kicked out all the nodes would be having status as |
||
decommissionedNodeClusterService.state().metadata().decommissionAttributeMetadata().status(), | ||
DecommissionStatus.IN_PROGRESS | ||
); | ||
|
||
// Will wait for all events to complete | ||
client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).get(); | ||
|
||
// Recommissioning the zone back to gracefully succeed the test once above tests succeeds | ||
DeleteDecommissionStateResponse deleteDecommissionStateResponse = client(clusterManagerNodes.get(0)).execute( | ||
DeleteDecommissionStateAction.INSTANCE, | ||
new DeleteDecommissionStateRequest() | ||
).get(); | ||
assertTrue(deleteDecommissionStateResponse.isAcknowledged()); | ||
|
||
// will wait for cluster to stabilise with a timeout of 2 min (findPeerInterval for decommissioned nodes) | ||
// as by then all nodes should have joined the cluster | ||
ensureStableCluster(6, TimeValue.timeValueMinutes(2)); | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -79,35 +79,29 @@ public DecommissionStatus status() { | |
/** | ||
* Returns instance of the metadata with updated status | ||
* @param newStatus status to be updated with | ||
* @return instance with valid status | ||
*/ | ||
// synchronized is strictly speaking not needed (this is called by a single thread), but just to be safe | ||
public synchronized DecommissionAttributeMetadata setUpdatedStatus(DecommissionStatus newStatus) { | ||
// if the current status is the expected status already, we return the same instance | ||
if (newStatus.equals(status)) { | ||
return this; | ||
public synchronized void validateNewStatus(DecommissionStatus newStatus) { | ||
// if the current status is the expected status already or new status is FAILED, we let the check pass | ||
if (newStatus.equals(status) || newStatus.equals(DecommissionStatus.FAILED)) { | ||
return; | ||
Comment on lines
+85
to
+87
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is assuming that all steps can have a self-loop for state transitions There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Didn't get this. This is added to ensure that if any step wants to mark the status as There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. And this behaviour is same as before. Had to refactor this method a bit because we were updating the same instance which led to relative diff as 0 for decommission state update There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The self loops which we might come up during multiple concurrent requests is handled seperately as part of this PR #4684. Today we need status to get into same state as per current service implementation There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The other condition makes it look paranoid |
||
} | ||
// We don't expect that INIT will be new status, as it is registered only when starting the decommission action | ||
switch (newStatus) { | ||
case IN_PROGRESS: | ||
validateAndSetStatus(DecommissionStatus.INIT, newStatus); | ||
validateStatus(DecommissionStatus.INIT, newStatus); | ||
break; | ||
case SUCCESSFUL: | ||
validateAndSetStatus(DecommissionStatus.IN_PROGRESS, newStatus); | ||
break; | ||
case FAILED: | ||
// we don't need to validate here and directly update status to FAILED | ||
this.status = newStatus; | ||
validateStatus(DecommissionStatus.IN_PROGRESS, newStatus); | ||
break; | ||
default: | ||
throw new IllegalArgumentException( | ||
"illegal decommission status [" + newStatus.status() + "] requested for updating metadata" | ||
); | ||
} | ||
return this; | ||
} | ||
|
||
private void validateAndSetStatus(DecommissionStatus expected, DecommissionStatus next) { | ||
private void validateStatus(DecommissionStatus expected, DecommissionStatus next) { | ||
if (status.equals(expected) == false) { | ||
assert false : "can't move decommission status to [" | ||
+ next | ||
|
@@ -120,7 +114,6 @@ private void validateAndSetStatus(DecommissionStatus expected, DecommissionStatu | |
"can't move decommission status to [" + next + "]. current status: [" + status + "] (expected [" + expected + "])" | ||
); | ||
} | ||
status = next; | ||
} | ||
|
||
@Override | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added it in this package as all the integ test for decommissioning the zone need access to pkg private methods of Coordinator.
PR for integ tests - #4715