diff --git a/CHANGELOG.md b/CHANGELOG.md index 4c4c7c80e5410..40147837b015c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -91,6 +91,7 @@ Inspired from [Keep a Changelog](https://keepachangelog.com/en/1.0.0/) - Refactor Base Action class javadocs to OpenSearch.API ([#4732](https://github.com/opensearch-project/OpenSearch/pull/4732)) - Migrate client transports to Apache HttpClient / Core 5.x ([#4459](https://github.com/opensearch-project/OpenSearch/pull/4459)) - Refactored BalancedAllocator.Balancer to LocalShardsBalancer ([#4761](https://github.com/opensearch-project/OpenSearch/pull/4761)) +- Fail weight update when decommission ongoing and fail decommission when attribute not weighed away ([#4839](https://github.com/opensearch-project/OpenSearch/pull/4839)) ### Deprecated ### Removed - Remove deprecated code to add node name into log pattern of log4j property file ([#4568](https://github.com/opensearch-project/OpenSearch/pull/4568)) diff --git a/server/src/internalClusterTest/java/org/opensearch/cluster/coordination/AwarenessAttributeDecommissionIT.java b/server/src/internalClusterTest/java/org/opensearch/cluster/coordination/AwarenessAttributeDecommissionIT.java index a2270d63ba6fa..2dc964e3e8845 100644 --- a/server/src/internalClusterTest/java/org/opensearch/cluster/coordination/AwarenessAttributeDecommissionIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/cluster/coordination/AwarenessAttributeDecommissionIT.java @@ -20,11 +20,15 @@ import org.opensearch.action.admin.cluster.decommission.awareness.put.DecommissionAction; import org.opensearch.action.admin.cluster.decommission.awareness.put.DecommissionRequest; import org.opensearch.action.admin.cluster.decommission.awareness.put.DecommissionResponse; +import org.opensearch.action.admin.cluster.health.ClusterHealthResponse; +import org.opensearch.action.admin.cluster.shards.routing.weighted.put.ClusterPutWeightedRoutingResponse; import org.opensearch.cluster.ClusterState; import org.opensearch.cluster.decommission.DecommissionAttribute; import org.opensearch.cluster.decommission.DecommissionStatus; +import org.opensearch.cluster.decommission.DecommissioningFailedException; import org.opensearch.cluster.node.DiscoveryNode; import org.opensearch.cluster.node.DiscoveryNodeRole; +import org.opensearch.cluster.routing.WeightedRouting; import org.opensearch.cluster.service.ClusterService; import org.opensearch.common.Priority; import org.opensearch.common.settings.Settings; @@ -37,6 +41,7 @@ import java.util.Collections; import java.util.Iterator; import java.util.List; +import java.util.Map; import java.util.concurrent.ExecutionException; import static org.opensearch.test.NodeRoles.onlyRole; @@ -102,6 +107,17 @@ public void testDecommissionStatusUpdatePublishedToAllNodes() throws ExecutionEx ensureStableCluster(6); + logger.info("--> setting shard routing weights for weighted round robin"); + Map weights = Map.of("a", 1.0, "b", 1.0, "c", 0.0); + WeightedRouting weightedRouting = new WeightedRouting("zone", weights); + + ClusterPutWeightedRoutingResponse weightedRoutingResponse = client().admin() + .cluster() + .prepareWeightedRouting() + .setWeightedRouting(weightedRouting) + .get(); + assertTrue(weightedRoutingResponse.isAcknowledged()); + logger.info("--> starting decommissioning nodes in zone {}", 'c'); DecommissionAttribute decommissionAttribute = new DecommissionAttribute("zone", "c"); DecommissionRequest decommissionRequest = new DecommissionRequest(decommissionAttribute); @@ -162,4 +178,57 @@ public void testDecommissionStatusUpdatePublishedToAllNodes() throws ExecutionEx // as by then all nodes should have joined the cluster ensureStableCluster(6, TimeValue.timeValueMinutes(2)); } + + public void testDecommissionFailedWhenAttributeNotWeighedAway() throws Exception { + Settings commonSettings = Settings.builder() + .put("cluster.routing.allocation.awareness.attributes", "zone") + .put("cluster.routing.allocation.awareness.force.zone.values", "a,b,c") + .build(); + // Start 3 cluster manager eligible nodes + internalCluster().startClusterManagerOnlyNodes(3, Settings.builder().put(commonSettings).build()); + // start 3 data nodes + internalCluster().startDataOnlyNodes(3, Settings.builder().put(commonSettings).build()); + ensureStableCluster(6); + ClusterHealthResponse health = client().admin() + .cluster() + .prepareHealth() + .setWaitForEvents(Priority.LANGUID) + .setWaitForGreenStatus() + .setWaitForNodes(Integer.toString(6)) + .execute() + .actionGet(); + assertFalse(health.isTimedOut()); + + DecommissionAttribute decommissionAttribute = new DecommissionAttribute("zone", "c"); + DecommissionRequest decommissionRequest = new DecommissionRequest(decommissionAttribute); + assertBusy(() -> { + DecommissioningFailedException ex = expectThrows( + DecommissioningFailedException.class, + () -> client().execute(DecommissionAction.INSTANCE, decommissionRequest).actionGet() + ); + assertTrue( + ex.getMessage() + .contains("no weights are set to the attribute. Please set appropriate weights before triggering decommission action") + ); + }); + + logger.info("--> setting shard routing weights for weighted round robin"); + Map weights = Map.of("a", 1.0, "b", 1.0, "c", 1.0); + WeightedRouting weightedRouting = new WeightedRouting("zone", weights); + + ClusterPutWeightedRoutingResponse weightedRoutingResponse = client().admin() + .cluster() + .prepareWeightedRouting() + .setWeightedRouting(weightedRouting) + .get(); + assertTrue(weightedRoutingResponse.isAcknowledged()); + + assertBusy(() -> { + DecommissioningFailedException ex = expectThrows( + DecommissioningFailedException.class, + () -> client().execute(DecommissionAction.INSTANCE, decommissionRequest).actionGet() + ); + assertTrue(ex.getMessage().contains("weight for decommissioned attribute is expected to be [0.0] but found [1.0]")); + }); + } } diff --git a/server/src/main/java/org/opensearch/cluster/decommission/DecommissionService.java b/server/src/main/java/org/opensearch/cluster/decommission/DecommissionService.java index b2c8bfbc0cdc8..e6639ae058066 100644 --- a/server/src/main/java/org/opensearch/cluster/decommission/DecommissionService.java +++ b/server/src/main/java/org/opensearch/cluster/decommission/DecommissionService.java @@ -20,7 +20,9 @@ import org.opensearch.cluster.ClusterStateUpdateTask; import org.opensearch.cluster.NotClusterManagerException; import org.opensearch.cluster.metadata.Metadata; +import org.opensearch.cluster.metadata.WeightedRoutingMetadata; import org.opensearch.cluster.node.DiscoveryNode; +import org.opensearch.cluster.routing.WeightedRouting; import org.opensearch.cluster.routing.allocation.AllocationService; import org.opensearch.cluster.service.ClusterService; import org.opensearch.common.Priority; @@ -129,6 +131,8 @@ public ClusterState execute(ClusterState currentState) { DecommissionAttributeMetadata decommissionAttributeMetadata = currentState.metadata().decommissionAttributeMetadata(); // check that request is eligible to proceed ensureEligibleRequest(decommissionAttributeMetadata, decommissionAttribute); + // ensure attribute is weighed away + ensureToBeDecommissionedAttributeWeighedAway(currentState, decommissionAttribute); decommissionAttributeMetadata = new DecommissionAttributeMetadata(decommissionAttribute); logger.info("registering decommission metadata [{}] to execute action", decommissionAttributeMetadata.toString()); return ClusterState.builder(currentState) @@ -413,6 +417,30 @@ private static void validateAwarenessAttribute( } } + private static void ensureToBeDecommissionedAttributeWeighedAway(ClusterState state, DecommissionAttribute decommissionAttribute) { + WeightedRoutingMetadata weightedRoutingMetadata = state.metadata().weightedRoutingMetadata(); + if (weightedRoutingMetadata == null) { + throw new DecommissioningFailedException( + decommissionAttribute, + "no weights are set to the attribute. Please set appropriate weights before triggering decommission action" + ); + } + WeightedRouting weightedRouting = weightedRoutingMetadata.getWeightedRouting(); + if (weightedRouting.attributeName().equals(decommissionAttribute.attributeName()) == false) { + throw new DecommissioningFailedException( + decommissionAttribute, + "no weights are specified to attribute [" + decommissionAttribute.attributeName() + "]" + ); + } + Double attributeValueWeight = weightedRouting.weights().get(decommissionAttribute.attributeValue()); + if (attributeValueWeight == null || attributeValueWeight.equals(0.0) == false) { + throw new DecommissioningFailedException( + decommissionAttribute, + "weight for decommissioned attribute is expected to be [0.0] but found [" + attributeValueWeight + "]" + ); + } + } + private static void ensureEligibleRequest( DecommissionAttributeMetadata decommissionAttributeMetadata, DecommissionAttribute requestedDecommissionAttribute diff --git a/server/src/main/java/org/opensearch/cluster/routing/WeightedRoutingService.java b/server/src/main/java/org/opensearch/cluster/routing/WeightedRoutingService.java index 54f2c81aea384..6acb4a1e832cb 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/WeightedRoutingService.java +++ b/server/src/main/java/org/opensearch/cluster/routing/WeightedRoutingService.java @@ -19,6 +19,8 @@ import org.opensearch.cluster.ClusterState; import org.opensearch.cluster.ClusterStateUpdateTask; import org.opensearch.cluster.ack.ClusterStateUpdateResponse; +import org.opensearch.cluster.decommission.DecommissionAttributeMetadata; +import org.opensearch.cluster.decommission.DecommissionStatus; import org.opensearch.cluster.metadata.Metadata; import org.opensearch.cluster.metadata.WeightedRoutingMetadata; import org.opensearch.cluster.routing.allocation.decider.AwarenessAllocationDecider; @@ -68,6 +70,8 @@ public void registerWeightedRoutingMetadata( clusterService.submitStateUpdateTask("update_weighted_routing", new ClusterStateUpdateTask(Priority.URGENT) { @Override public ClusterState execute(ClusterState currentState) { + // verify currently no decommission action is ongoing + ensureNoOngoingDecommissionAction(currentState); Metadata metadata = currentState.metadata(); Metadata.Builder mdBuilder = Metadata.builder(currentState.metadata()); WeightedRoutingMetadata weightedRoutingMetadata = metadata.custom(WeightedRoutingMetadata.TYPE); @@ -154,4 +158,15 @@ public void verifyAwarenessAttribute(String attributeName) { throw validationException; } } + + public void ensureNoOngoingDecommissionAction(ClusterState state) { + DecommissionAttributeMetadata decommissionAttributeMetadata = state.metadata().decommissionAttributeMetadata(); + if (decommissionAttributeMetadata != null && decommissionAttributeMetadata.status().equals(DecommissionStatus.FAILED) == false) { + throw new IllegalStateException( + "a decommission action is ongoing with status [" + + decommissionAttributeMetadata.status().status() + + "], cannot update weight during this state" + ); + } + } } diff --git a/server/src/test/java/org/opensearch/cluster/decommission/DecommissionServiceTests.java b/server/src/test/java/org/opensearch/cluster/decommission/DecommissionServiceTests.java index 7dee51b7713f9..c4cf6c7cc6641 100644 --- a/server/src/test/java/org/opensearch/cluster/decommission/DecommissionServiceTests.java +++ b/server/src/test/java/org/opensearch/cluster/decommission/DecommissionServiceTests.java @@ -22,14 +22,17 @@ import org.opensearch.cluster.ClusterState; import org.opensearch.cluster.coordination.CoordinationMetadata; import org.opensearch.cluster.metadata.Metadata; +import org.opensearch.cluster.metadata.WeightedRoutingMetadata; import org.opensearch.cluster.node.DiscoveryNode; import org.opensearch.cluster.node.DiscoveryNodeRole; import org.opensearch.cluster.node.DiscoveryNodes; +import org.opensearch.cluster.routing.WeightedRouting; import org.opensearch.cluster.routing.allocation.AllocationService; import org.opensearch.cluster.routing.allocation.decider.AwarenessAllocationDecider; import org.opensearch.cluster.service.ClusterService; import org.opensearch.common.settings.ClusterSettings; import org.opensearch.common.settings.Settings; +import org.opensearch.test.ClusterServiceUtils; import org.opensearch.test.OpenSearchTestCase; import org.opensearch.test.transport.MockTransport; import org.opensearch.threadpool.TestThreadPool; @@ -169,6 +172,56 @@ public void onFailure(Exception e) { assertTrue(countDownLatch.await(30, TimeUnit.SECONDS)); } + public void testDecommissionNotStartedWithoutWeighingAwayAttribute_1() throws InterruptedException { + Map weights = Map.of("zone_1", 1.0, "zone_2", 1.0, "zone_3", 0.0); + setWeightedRoutingWeights(weights); + final CountDownLatch countDownLatch = new CountDownLatch(1); + DecommissionAttribute decommissionAttribute = new DecommissionAttribute("zone", "zone_1"); + ActionListener listener = new ActionListener() { + @Override + public void onResponse(DecommissionResponse decommissionResponse) { + fail("on response shouldn't have been called"); + } + + @Override + public void onFailure(Exception e) { + assertTrue(e instanceof DecommissioningFailedException); + assertThat( + e.getMessage(), + Matchers.containsString("weight for decommissioned attribute is expected to be [0.0] but found [1.0]") + ); + countDownLatch.countDown(); + } + }; + decommissionService.startDecommissionAction(decommissionAttribute, listener); + assertTrue(countDownLatch.await(30, TimeUnit.SECONDS)); + } + + public void testDecommissionNotStartedWithoutWeighingAwayAttribute_2() throws InterruptedException { + final CountDownLatch countDownLatch = new CountDownLatch(1); + DecommissionAttribute decommissionAttribute = new DecommissionAttribute("zone", "zone_1"); + ActionListener listener = new ActionListener() { + @Override + public void onResponse(DecommissionResponse decommissionResponse) { + fail("on response shouldn't have been called"); + } + + @Override + public void onFailure(Exception e) { + assertTrue(e instanceof DecommissioningFailedException); + assertThat( + e.getMessage(), + Matchers.containsString( + "no weights are set to the attribute. Please set appropriate weights before triggering decommission action" + ) + ); + countDownLatch.countDown(); + } + }; + decommissionService.startDecommissionAction(decommissionAttribute, listener); + assertTrue(countDownLatch.await(30, TimeUnit.SECONDS)); + } + @SuppressWarnings("unchecked") public void testDecommissioningFailedWhenAnotherAttributeDecommissioningSuccessful() throws InterruptedException { final CountDownLatch countDownLatch = new CountDownLatch(1); @@ -286,6 +339,17 @@ public void onFailure(Exception e) { assertTrue(countDownLatch.await(30, TimeUnit.SECONDS)); } + private void setWeightedRoutingWeights(Map weights) { + ClusterState clusterState = clusterService.state(); + WeightedRouting weightedRouting = new WeightedRouting("zone", weights); + WeightedRoutingMetadata weightedRoutingMetadata = new WeightedRoutingMetadata(weightedRouting); + Metadata.Builder metadataBuilder = Metadata.builder(clusterState.metadata()); + metadataBuilder.putCustom(WeightedRoutingMetadata.TYPE, weightedRoutingMetadata); + clusterState = ClusterState.builder(clusterState).metadata(metadataBuilder).build(); + ClusterState.Builder builder = ClusterState.builder(clusterState); + ClusterServiceUtils.setState(clusterService, builder); + } + private ClusterState addDataNodes(ClusterState clusterState, String zone, String... nodeIds) { DiscoveryNodes.Builder nodeBuilder = DiscoveryNodes.builder(clusterState.nodes()); org.opensearch.common.collect.List.of(nodeIds).forEach(nodeId -> nodeBuilder.add(newDataNode(nodeId, singletonMap("zone", zone)))); diff --git a/server/src/test/java/org/opensearch/cluster/routing/WeightedRoutingServiceTests.java b/server/src/test/java/org/opensearch/cluster/routing/WeightedRoutingServiceTests.java index fc5d46ef84c79..91b8703cacf5c 100644 --- a/server/src/test/java/org/opensearch/cluster/routing/WeightedRoutingServiceTests.java +++ b/server/src/test/java/org/opensearch/cluster/routing/WeightedRoutingServiceTests.java @@ -8,6 +8,7 @@ package org.opensearch.cluster.routing; +import org.hamcrest.MatcherAssert; import org.junit.After; import org.junit.Before; import org.opensearch.Version; @@ -21,6 +22,9 @@ import org.opensearch.cluster.ClusterName; import org.opensearch.cluster.ClusterState; import org.opensearch.cluster.ack.ClusterStateUpdateResponse; +import org.opensearch.cluster.decommission.DecommissionAttribute; +import org.opensearch.cluster.decommission.DecommissionAttributeMetadata; +import org.opensearch.cluster.decommission.DecommissionStatus; import org.opensearch.cluster.metadata.Metadata; import org.opensearch.cluster.metadata.WeightedRoutingMetadata; import org.opensearch.cluster.node.DiscoveryNode; @@ -43,6 +47,11 @@ import java.util.Set; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; + +import static org.hamcrest.Matchers.containsString; +import static org.hamcrest.Matchers.instanceOf; +import static org.hamcrest.Matchers.notNullValue; public class WeightedRoutingServiceTests extends OpenSearchTestCase { private ThreadPool threadPool; @@ -173,6 +182,15 @@ private ClusterState setWeightedRoutingWeights(ClusterState clusterState, Map weights = Map.of("zone_A", 1.0, "zone_B", 1.0, "zone_C", 1.0); ClusterState state = clusterService.state(); @@ -276,4 +294,72 @@ public void testVerifyAwarenessAttribute_ValidAttributeName() { fail("verify awareness attribute should not fail"); } } + + public void testAddWeightedRoutingFailsWhenDecommissionOngoing() throws InterruptedException { + Map weights = Map.of("zone_A", 1.0, "zone_B", 1.0, "zone_C", 1.0); + DecommissionStatus status = randomFrom(DecommissionStatus.INIT, DecommissionStatus.IN_PROGRESS, DecommissionStatus.SUCCESSFUL); + ClusterState state = clusterService.state(); + state = setWeightedRoutingWeights(state, weights); + state = setDecommissionAttribute(state, status); + ClusterState.Builder builder = ClusterState.builder(state); + ClusterServiceUtils.setState(clusterService, builder); + + ClusterPutWeightedRoutingRequestBuilder request = new ClusterPutWeightedRoutingRequestBuilder( + client, + ClusterAddWeightedRoutingAction.INSTANCE + ); + WeightedRouting updatedWeightedRouting = new WeightedRouting("zone", weights); + request.setWeightedRouting(updatedWeightedRouting); + final CountDownLatch countDownLatch = new CountDownLatch(1); + final AtomicReference exceptionReference = new AtomicReference<>(); + ActionListener listener = new ActionListener() { + @Override + public void onResponse(ClusterStateUpdateResponse clusterStateUpdateResponse) { + countDownLatch.countDown(); + } + + @Override + public void onFailure(Exception e) { + exceptionReference.set(e); + countDownLatch.countDown(); + } + }; + weightedRoutingService.registerWeightedRoutingMetadata(request.request(), listener); + assertTrue(countDownLatch.await(30, TimeUnit.SECONDS)); + MatcherAssert.assertThat("Expected onFailure to be called", exceptionReference.get(), notNullValue()); + MatcherAssert.assertThat(exceptionReference.get(), instanceOf(IllegalStateException.class)); + MatcherAssert.assertThat(exceptionReference.get().getMessage(), containsString("a decommission action is ongoing with status")); + } + + public void testAddWeightedRoutingPassesWhenDecommissionFailed() throws InterruptedException { + Map weights = Map.of("zone_A", 1.0, "zone_B", 1.0, "zone_C", 1.0); + DecommissionStatus status = DecommissionStatus.FAILED; + ClusterState state = clusterService.state(); + state = setWeightedRoutingWeights(state, weights); + state = setDecommissionAttribute(state, status); + ClusterState.Builder builder = ClusterState.builder(state); + ClusterServiceUtils.setState(clusterService, builder); + + ClusterPutWeightedRoutingRequestBuilder request = new ClusterPutWeightedRoutingRequestBuilder( + client, + ClusterAddWeightedRoutingAction.INSTANCE + ); + WeightedRouting updatedWeightedRouting = new WeightedRouting("zone", weights); + request.setWeightedRouting(updatedWeightedRouting); + final CountDownLatch countDownLatch = new CountDownLatch(1); + final AtomicReference exceptionReference = new AtomicReference<>(); + ActionListener listener = new ActionListener() { + @Override + public void onResponse(ClusterStateUpdateResponse clusterStateUpdateResponse) { + assertTrue(clusterStateUpdateResponse.isAcknowledged()); + assertEquals(updatedWeightedRouting, clusterService.state().metadata().weightedRoutingMetadata().getWeightedRouting()); + countDownLatch.countDown(); + } + + @Override + public void onFailure(Exception e) {} + }; + weightedRoutingService.registerWeightedRoutingMetadata(request.request(), listener); + assertTrue(countDownLatch.await(30, TimeUnit.SECONDS)); + } }