Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fail weight update when decommission ongoing and fail decommission when attribute not weighed away #4839

Merged
merged 8 commits into from
Oct 20, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ Inspired from [Keep a Changelog](https://keepachangelog.com/en/1.0.0/)
- Refactor Base Action class javadocs to OpenSearch.API ([#4732](https://github.com/opensearch-project/OpenSearch/pull/4732))
- Migrate client transports to Apache HttpClient / Core 5.x ([#4459](https://github.com/opensearch-project/OpenSearch/pull/4459))
- Refactored BalancedAllocator.Balancer to LocalShardsBalancer ([#4761](https://github.com/opensearch-project/OpenSearch/pull/4761))
- Fail weight update when decommission ongoing and fail decommission when attribute not weighed away ([#4839](https://github.com/opensearch-project/OpenSearch/pull/4839))
### Deprecated
### Removed
- Remove deprecated code to add node name into log pattern of log4j property file ([#4568](https://github.com/opensearch-project/OpenSearch/pull/4568))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,15 @@
import org.opensearch.action.admin.cluster.decommission.awareness.put.DecommissionAction;
import org.opensearch.action.admin.cluster.decommission.awareness.put.DecommissionRequest;
import org.opensearch.action.admin.cluster.decommission.awareness.put.DecommissionResponse;
import org.opensearch.action.admin.cluster.health.ClusterHealthResponse;
import org.opensearch.action.admin.cluster.shards.routing.weighted.put.ClusterPutWeightedRoutingResponse;
import org.opensearch.cluster.ClusterState;
import org.opensearch.cluster.decommission.DecommissionAttribute;
import org.opensearch.cluster.decommission.DecommissionStatus;
import org.opensearch.cluster.decommission.DecommissioningFailedException;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.cluster.node.DiscoveryNodeRole;
import org.opensearch.cluster.routing.WeightedRouting;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.Priority;
import org.opensearch.common.settings.Settings;
Expand All @@ -37,6 +41,7 @@
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;

import static org.opensearch.test.NodeRoles.onlyRole;
Expand Down Expand Up @@ -102,6 +107,17 @@ public void testDecommissionStatusUpdatePublishedToAllNodes() throws ExecutionEx

ensureStableCluster(6);

logger.info("--> setting shard routing weights for weighted round robin");
Map<String, Double> weights = Map.of("a", 1.0, "b", 1.0, "c", 0.0);
WeightedRouting weightedRouting = new WeightedRouting("zone", weights);

ClusterPutWeightedRoutingResponse weightedRoutingResponse = client().admin()
.cluster()
.prepareWeightedRouting()
.setWeightedRouting(weightedRouting)
.get();
assertTrue(weightedRoutingResponse.isAcknowledged());

logger.info("--> starting decommissioning nodes in zone {}", 'c');
DecommissionAttribute decommissionAttribute = new DecommissionAttribute("zone", "c");
DecommissionRequest decommissionRequest = new DecommissionRequest(decommissionAttribute);
Expand Down Expand Up @@ -162,4 +178,57 @@ public void testDecommissionStatusUpdatePublishedToAllNodes() throws ExecutionEx
// as by then all nodes should have joined the cluster
ensureStableCluster(6, TimeValue.timeValueMinutes(2));
}

public void testDecommissionFailedWhenAttributeNotWeighedAway() throws Exception {
Settings commonSettings = Settings.builder()
.put("cluster.routing.allocation.awareness.attributes", "zone")
.put("cluster.routing.allocation.awareness.force.zone.values", "a,b,c")
.build();
// Start 3 cluster manager eligible nodes
internalCluster().startClusterManagerOnlyNodes(3, Settings.builder().put(commonSettings).build());
// start 3 data nodes
internalCluster().startDataOnlyNodes(3, Settings.builder().put(commonSettings).build());
ensureStableCluster(6);
ClusterHealthResponse health = client().admin()
.cluster()
.prepareHealth()
.setWaitForEvents(Priority.LANGUID)
.setWaitForGreenStatus()
.setWaitForNodes(Integer.toString(6))
.execute()
.actionGet();
assertFalse(health.isTimedOut());

DecommissionAttribute decommissionAttribute = new DecommissionAttribute("zone", "c");
DecommissionRequest decommissionRequest = new DecommissionRequest(decommissionAttribute);
assertBusy(() -> {
DecommissioningFailedException ex = expectThrows(
DecommissioningFailedException.class,
() -> client().execute(DecommissionAction.INSTANCE, decommissionRequest).actionGet()
);
assertTrue(
ex.getMessage()
.contains("no weights are set to the attribute. Please set appropriate weights before triggering decommission action")
);
});

logger.info("--> setting shard routing weights for weighted round robin");
Map<String, Double> weights = Map.of("a", 1.0, "b", 1.0, "c", 1.0);
WeightedRouting weightedRouting = new WeightedRouting("zone", weights);

ClusterPutWeightedRoutingResponse weightedRoutingResponse = client().admin()
.cluster()
.prepareWeightedRouting()
.setWeightedRouting(weightedRouting)
.get();
assertTrue(weightedRoutingResponse.isAcknowledged());

assertBusy(() -> {
DecommissioningFailedException ex = expectThrows(
DecommissioningFailedException.class,
() -> client().execute(DecommissionAction.INSTANCE, decommissionRequest).actionGet()
);
assertTrue(ex.getMessage().contains("weight for decommissioned attribute is expected to be [0.0] but found [1.0]"));
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,9 @@
import org.opensearch.cluster.ClusterStateUpdateTask;
import org.opensearch.cluster.NotClusterManagerException;
import org.opensearch.cluster.metadata.Metadata;
import org.opensearch.cluster.metadata.WeightedRoutingMetadata;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.cluster.routing.WeightedRouting;
import org.opensearch.cluster.routing.allocation.AllocationService;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.Priority;
Expand Down Expand Up @@ -129,6 +131,8 @@ public ClusterState execute(ClusterState currentState) {
DecommissionAttributeMetadata decommissionAttributeMetadata = currentState.metadata().decommissionAttributeMetadata();
// check that request is eligible to proceed
ensureEligibleRequest(decommissionAttributeMetadata, decommissionAttribute);
// ensure attribute is weighed away
ensureToBeDecommissionedAttributeWeighedAway(currentState, decommissionAttribute);
decommissionAttributeMetadata = new DecommissionAttributeMetadata(decommissionAttribute);
logger.info("registering decommission metadata [{}] to execute action", decommissionAttributeMetadata.toString());
return ClusterState.builder(currentState)
Expand Down Expand Up @@ -413,6 +417,30 @@ private static void validateAwarenessAttribute(
}
}

private static void ensureToBeDecommissionedAttributeWeighedAway(ClusterState state, DecommissionAttribute decommissionAttribute) {
WeightedRoutingMetadata weightedRoutingMetadata = state.metadata().weightedRoutingMetadata();
if (weightedRoutingMetadata == null) {
throw new DecommissioningFailedException(
decommissionAttribute,
"no weights are set to the attribute. Please set appropriate weights before triggering decommission action"
);
}
WeightedRouting weightedRouting = weightedRoutingMetadata.getWeightedRouting();
if (weightedRouting.attributeName().equals(decommissionAttribute.attributeName()) == false) {
throw new DecommissioningFailedException(
decommissionAttribute,
"no weights are specified to attribute [" + decommissionAttribute.attributeName() + "]"
);
}
Double attributeValueWeight = weightedRouting.weights().get(decommissionAttribute.attributeValue());
if (attributeValueWeight == null || attributeValueWeight.equals(0.0) == false) {
throw new DecommissioningFailedException(
decommissionAttribute,
"weight for decommissioned attribute is expected to be [0.0] but found [" + attributeValueWeight + "]"
);
}
}

private static void ensureEligibleRequest(
DecommissionAttributeMetadata decommissionAttributeMetadata,
DecommissionAttribute requestedDecommissionAttribute
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
import org.opensearch.cluster.ClusterState;
import org.opensearch.cluster.ClusterStateUpdateTask;
import org.opensearch.cluster.ack.ClusterStateUpdateResponse;
import org.opensearch.cluster.decommission.DecommissionAttributeMetadata;
import org.opensearch.cluster.decommission.DecommissionStatus;
import org.opensearch.cluster.metadata.Metadata;
import org.opensearch.cluster.metadata.WeightedRoutingMetadata;
import org.opensearch.cluster.routing.allocation.decider.AwarenessAllocationDecider;
Expand Down Expand Up @@ -68,6 +70,8 @@ public void registerWeightedRoutingMetadata(
clusterService.submitStateUpdateTask("update_weighted_routing", new ClusterStateUpdateTask(Priority.URGENT) {
@Override
public ClusterState execute(ClusterState currentState) {
// verify currently no decommission action is ongoing
ensureNoOngoingDecommissionAction(currentState);
Metadata metadata = currentState.metadata();
Metadata.Builder mdBuilder = Metadata.builder(currentState.metadata());
WeightedRoutingMetadata weightedRoutingMetadata = metadata.custom(WeightedRoutingMetadata.TYPE);
Expand Down Expand Up @@ -154,4 +158,15 @@ public void verifyAwarenessAttribute(String attributeName) {
throw validationException;
}
}

public void ensureNoOngoingDecommissionAction(ClusterState state) {
DecommissionAttributeMetadata decommissionAttributeMetadata = state.metadata().decommissionAttributeMetadata();
if (decommissionAttributeMetadata != null && decommissionAttributeMetadata.status().equals(DecommissionStatus.FAILED) == false) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What if status is SUCCESSFUL?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Only when Decommission has failed, we can allow the weight update can go through. Not for any other status

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isn't SUCCESSFUL a final state? The error message of this if condition says: a decommission action is ongoing with status...

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A succesful decommission state means the transport execution has completed. The nodes are decommissioned in this state (not able to start pre voting or join). Hence DecommissionAction ongoing means a zone is decommissioned and the cluster rejects pings from decommissioned node

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If decommission status is FAILED can certain nodes be out of service or decommissioned while others not as in partially failed?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, if the decommission is FAILED, joins or prevoting is not blocked on those nodes and hence if some nodes are out of service (which can happen), the weights request can go through

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But then the zone isn't fully equipped to take traffic right like 90% nodes are out of service and we place weights 10:1:1 when 10 is for the zone only operating at 10% capacity?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No

throw new IllegalStateException(
"a decommission action is ongoing with status ["
+ decommissionAttributeMetadata.status().status()
+ "], cannot update weight during this state"
);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,17 @@
import org.opensearch.cluster.ClusterState;
import org.opensearch.cluster.coordination.CoordinationMetadata;
import org.opensearch.cluster.metadata.Metadata;
import org.opensearch.cluster.metadata.WeightedRoutingMetadata;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.cluster.node.DiscoveryNodeRole;
import org.opensearch.cluster.node.DiscoveryNodes;
import org.opensearch.cluster.routing.WeightedRouting;
import org.opensearch.cluster.routing.allocation.AllocationService;
import org.opensearch.cluster.routing.allocation.decider.AwarenessAllocationDecider;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.settings.ClusterSettings;
import org.opensearch.common.settings.Settings;
import org.opensearch.test.ClusterServiceUtils;
import org.opensearch.test.OpenSearchTestCase;
import org.opensearch.test.transport.MockTransport;
import org.opensearch.threadpool.TestThreadPool;
Expand Down Expand Up @@ -169,6 +172,56 @@ public void onFailure(Exception e) {
assertTrue(countDownLatch.await(30, TimeUnit.SECONDS));
}

public void testDecommissionNotStartedWithoutWeighingAwayAttribute_1() throws InterruptedException {
Map<String, Double> weights = Map.of("zone_1", 1.0, "zone_2", 1.0, "zone_3", 0.0);
setWeightedRoutingWeights(weights);
final CountDownLatch countDownLatch = new CountDownLatch(1);
DecommissionAttribute decommissionAttribute = new DecommissionAttribute("zone", "zone_1");
ActionListener<DecommissionResponse> listener = new ActionListener<DecommissionResponse>() {
@Override
public void onResponse(DecommissionResponse decommissionResponse) {
fail("on response shouldn't have been called");
}

@Override
public void onFailure(Exception e) {
assertTrue(e instanceof DecommissioningFailedException);
assertThat(
e.getMessage(),
Matchers.containsString("weight for decommissioned attribute is expected to be [0.0] but found [1.0]")
);
countDownLatch.countDown();
}
};
decommissionService.startDecommissionAction(decommissionAttribute, listener);
assertTrue(countDownLatch.await(30, TimeUnit.SECONDS));
}

public void testDecommissionNotStartedWithoutWeighingAwayAttribute_2() throws InterruptedException {
final CountDownLatch countDownLatch = new CountDownLatch(1);
DecommissionAttribute decommissionAttribute = new DecommissionAttribute("zone", "zone_1");
ActionListener<DecommissionResponse> listener = new ActionListener<DecommissionResponse>() {
@Override
public void onResponse(DecommissionResponse decommissionResponse) {
fail("on response shouldn't have been called");
}

@Override
public void onFailure(Exception e) {
assertTrue(e instanceof DecommissioningFailedException);
assertThat(
e.getMessage(),
Matchers.containsString(
"no weights are set to the attribute. Please set appropriate weights before triggering decommission action"
)
);
countDownLatch.countDown();
}
};
decommissionService.startDecommissionAction(decommissionAttribute, listener);
assertTrue(countDownLatch.await(30, TimeUnit.SECONDS));
}

@SuppressWarnings("unchecked")
public void testDecommissioningFailedWhenAnotherAttributeDecommissioningSuccessful() throws InterruptedException {
final CountDownLatch countDownLatch = new CountDownLatch(1);
Expand Down Expand Up @@ -286,6 +339,17 @@ public void onFailure(Exception e) {
assertTrue(countDownLatch.await(30, TimeUnit.SECONDS));
}

private void setWeightedRoutingWeights(Map<String, Double> weights) {
ClusterState clusterState = clusterService.state();
WeightedRouting weightedRouting = new WeightedRouting("zone", weights);
WeightedRoutingMetadata weightedRoutingMetadata = new WeightedRoutingMetadata(weightedRouting);
Metadata.Builder metadataBuilder = Metadata.builder(clusterState.metadata());
metadataBuilder.putCustom(WeightedRoutingMetadata.TYPE, weightedRoutingMetadata);
clusterState = ClusterState.builder(clusterState).metadata(metadataBuilder).build();
ClusterState.Builder builder = ClusterState.builder(clusterState);
ClusterServiceUtils.setState(clusterService, builder);
}

private ClusterState addDataNodes(ClusterState clusterState, String zone, String... nodeIds) {
DiscoveryNodes.Builder nodeBuilder = DiscoveryNodes.builder(clusterState.nodes());
org.opensearch.common.collect.List.of(nodeIds).forEach(nodeId -> nodeBuilder.add(newDataNode(nodeId, singletonMap("zone", zone))));
Expand Down
Loading