diff --git a/CHANGELOG.md b/CHANGELOG.md index 37e16aecff69a..4bd7e65f77c18 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -78,6 +78,7 @@ Inspired from [Keep a Changelog](https://keepachangelog.com/en/1.0.0/) - Update to Apache Lucene 9.4.0 ([#4661](https://github.com/opensearch-project/OpenSearch/pull/4661)) - Controlling discovery for decommissioned nodes ([#4590](https://github.com/opensearch-project/OpenSearch/pull/4590)) - Backport Apache Lucene version change for 2.4.0 ([#4677](https://github.com/opensearch-project/OpenSearch/pull/4677)) +- Control concurrency and add retry action in decommission flow ([#4684](https://github.com/opensearch-project/OpenSearch/pull/4684)) ### Deprecated diff --git a/server/src/main/java/org/opensearch/action/admin/cluster/decommission/awareness/put/DecommissionRequest.java b/server/src/main/java/org/opensearch/action/admin/cluster/decommission/awareness/put/DecommissionRequest.java index 79a6688dc6049..26ea8891d1486 100644 --- a/server/src/main/java/org/opensearch/action/admin/cluster/decommission/awareness/put/DecommissionRequest.java +++ b/server/src/main/java/org/opensearch/action/admin/cluster/decommission/awareness/put/DecommissionRequest.java @@ -14,6 +14,7 @@ import org.opensearch.common.Strings; import org.opensearch.common.io.stream.StreamInput; import org.opensearch.common.io.stream.StreamOutput; +import org.opensearch.common.unit.TimeValue; import java.io.IOException; @@ -29,22 +30,34 @@ public class DecommissionRequest extends ClusterManagerNodeRequest { private DecommissionAttribute decommissionAttribute; + private boolean retryOnClusterManagerChange; + private TimeValue retryTimeout; public DecommissionRequest() {} - public DecommissionRequest(DecommissionAttribute decommissionAttribute) { + public DecommissionRequest(DecommissionAttribute decommissionAttribute, boolean retryOnClusterManagerChange, TimeValue retryTimeout) { this.decommissionAttribute = decommissionAttribute; + this.retryOnClusterManagerChange = retryOnClusterManagerChange; + this.retryTimeout = retryTimeout; + } + + public DecommissionRequest(DecommissionAttribute decommissionAttribute, TimeValue retryTimeout) { + this(decommissionAttribute, false, retryTimeout); } public DecommissionRequest(StreamInput in) throws IOException { super(in); decommissionAttribute = new DecommissionAttribute(in); + retryOnClusterManagerChange = in.readBoolean(); + retryTimeout = in.readTimeValue(); } @Override public void writeTo(StreamOutput out) throws IOException { super.writeTo(out); decommissionAttribute.writeTo(out); + out.writeBoolean(retryOnClusterManagerChange); + out.writeTimeValue(retryTimeout); } /** @@ -58,6 +71,28 @@ public DecommissionRequest setDecommissionAttribute(DecommissionAttribute decomm return this; } + /** + * Sets retryOnClusterManagerChange for decommission request + * + * @param retryOnClusterManagerChange boolean for request to retry decommission action on cluster manager change + * @return this request + */ + public DecommissionRequest setRetryOnClusterManagerChange(boolean retryOnClusterManagerChange) { + this.retryOnClusterManagerChange = retryOnClusterManagerChange; + return this; + } + + /** + * Sets the retry timeout for the request + * + * @param retryTimeout retry time out for the request + * @return this request + */ + public DecommissionRequest setRetryTimeout(TimeValue retryTimeout) { + this.retryTimeout = retryTimeout; + return this; + } + /** * @return Returns the decommission attribute key-value */ @@ -65,6 +100,20 @@ public DecommissionAttribute getDecommissionAttribute() { return this.decommissionAttribute; } + /** + * @return Returns whether decommission is retry eligible on cluster manager change + */ + public boolean retryOnClusterManagerChange() { + return this.retryOnClusterManagerChange; + } + + /** + * @return retry timeout + */ + public TimeValue getRetryTimeout() { + return this.retryTimeout; + } + @Override public ActionRequestValidationException validate() { ActionRequestValidationException validationException = null; @@ -79,6 +128,13 @@ public ActionRequestValidationException validate() { @Override public String toString() { - return "DecommissionRequest{" + "decommissionAttribute=" + decommissionAttribute + '}'; + return "DecommissionRequest{" + + "decommissionAttribute=" + + decommissionAttribute + + ", retryOnClusterManagerChange=" + + retryOnClusterManagerChange + + ", retryTimeout=" + + retryTimeout + + '}'; } } diff --git a/server/src/main/java/org/opensearch/action/admin/cluster/decommission/awareness/put/DecommissionRequestBuilder.java b/server/src/main/java/org/opensearch/action/admin/cluster/decommission/awareness/put/DecommissionRequestBuilder.java index 47af3b952c895..8228ca27ec7b3 100644 --- a/server/src/main/java/org/opensearch/action/admin/cluster/decommission/awareness/put/DecommissionRequestBuilder.java +++ b/server/src/main/java/org/opensearch/action/admin/cluster/decommission/awareness/put/DecommissionRequestBuilder.java @@ -12,6 +12,7 @@ import org.opensearch.action.support.clustermanager.ClusterManagerNodeOperationRequestBuilder; import org.opensearch.client.OpenSearchClient; import org.opensearch.cluster.decommission.DecommissionAttribute; +import org.opensearch.common.unit.TimeValue; /** * Register decommission request builder @@ -35,4 +36,26 @@ public DecommissionRequestBuilder setDecommissionedAttribute(DecommissionAttribu request.setDecommissionAttribute(decommissionAttribute); return this; } + + /** + * Sets retryOnClusterManagerChange for decommission request + * + * @param retryOnClusterManagerChange boolean for request to retry decommission action on cluster manager change + * @return current object + */ + public DecommissionRequestBuilder setRetryOnClusterManagerChange(boolean retryOnClusterManagerChange) { + request.setRetryOnClusterManagerChange(retryOnClusterManagerChange); + return this; + } + + /** + * Sets the retry timeout for the decommission request + * + * @param retryTimeout retry time out for the request + * @return current object + */ + public DecommissionRequestBuilder setRetryTimeout(TimeValue retryTimeout) { + request.setRetryTimeout(retryTimeout); + return this; + } } diff --git a/server/src/main/java/org/opensearch/action/admin/cluster/decommission/awareness/put/TransportDecommissionAction.java b/server/src/main/java/org/opensearch/action/admin/cluster/decommission/awareness/put/TransportDecommissionAction.java index 3a067d2f110b9..6f4e3cf82d2ce 100644 --- a/server/src/main/java/org/opensearch/action/admin/cluster/decommission/awareness/put/TransportDecommissionAction.java +++ b/server/src/main/java/org/opensearch/action/admin/cluster/decommission/awareness/put/TransportDecommissionAction.java @@ -76,6 +76,6 @@ protected ClusterBlockException checkBlock(DecommissionRequest request, ClusterS protected void clusterManagerOperation(DecommissionRequest request, ClusterState state, ActionListener listener) throws Exception { logger.info("starting awareness attribute [{}] decommissioning", request.getDecommissionAttribute().toString()); - decommissionService.startDecommissionAction(request.getDecommissionAttribute(), listener); + decommissionService.startDecommissionAction(request, listener); } } diff --git a/server/src/main/java/org/opensearch/cluster/decommission/DecommissionController.java b/server/src/main/java/org/opensearch/cluster/decommission/DecommissionController.java index 7719012f2f3d7..d6574a87911ae 100644 --- a/server/src/main/java/org/opensearch/cluster/decommission/DecommissionController.java +++ b/server/src/main/java/org/opensearch/cluster/decommission/DecommissionController.java @@ -18,6 +18,9 @@ import org.opensearch.action.admin.cluster.configuration.ClearVotingConfigExclusionsAction; import org.opensearch.action.admin.cluster.configuration.ClearVotingConfigExclusionsRequest; import org.opensearch.action.admin.cluster.configuration.ClearVotingConfigExclusionsResponse; +import org.opensearch.action.admin.cluster.decommission.awareness.put.DecommissionAction; +import org.opensearch.action.admin.cluster.decommission.awareness.put.DecommissionRequest; +import org.opensearch.action.admin.cluster.decommission.awareness.put.DecommissionResponse; import org.opensearch.cluster.ClusterState; import org.opensearch.cluster.ClusterStateObserver; import org.opensearch.cluster.ClusterStateTaskConfig; @@ -72,6 +75,66 @@ public class DecommissionController { this.threadPool = threadPool; } + /** + * This method sends a transport call to retry decommission action, given that - + * 1. The request is not timed out + * 2. And executed when there was a cluster manager change + * + * @param decommissionRequest decommission request object + * @param startTime start time of previous request + * @param listener callback for the retry action + */ + public void retryDecommissionAction( + DecommissionRequest decommissionRequest, + long startTime, + ActionListener listener + ) { + final long remainingTimeoutMS = decommissionRequest.getRetryTimeout().millis() - (threadPool.relativeTimeInMillis() - startTime); + if (remainingTimeoutMS <= 0) { + logger.debug( + "timed out before retrying [{}] for attribute [{}] after cluster manager change", + DecommissionAction.NAME, + decommissionRequest.getDecommissionAttribute() + ); + listener.onFailure( + new OpenSearchTimeoutException( + "timed out before retrying [{}] for attribute [{}] after cluster manager change", + DecommissionAction.NAME, + decommissionRequest.getDecommissionAttribute() + ) + ); + return; + } + decommissionRequest.setRetryOnClusterManagerChange(true); + decommissionRequest.setRetryTimeout(TimeValue.timeValueMillis(remainingTimeoutMS)); + transportService.sendRequest( + transportService.getLocalNode(), + DecommissionAction.NAME, + decommissionRequest, + new TransportResponseHandler() { + @Override + public void handleResponse(DecommissionResponse response) { + listener.onResponse(response); + } + + @Override + public void handleException(TransportException exp) { + listener.onFailure(exp); + } + + @Override + public String executor() { + return ThreadPool.Names.SAME; + } + + @Override + public DecommissionResponse read(StreamInput in) throws IOException { + return new DecommissionResponse(in); + } + } + ); + } + /** * Transport call to add nodes to voting config exclusion * diff --git a/server/src/main/java/org/opensearch/cluster/decommission/DecommissionService.java b/server/src/main/java/org/opensearch/cluster/decommission/DecommissionService.java index 5def2733b5ded..557e8fab860d2 100644 --- a/server/src/main/java/org/opensearch/cluster/decommission/DecommissionService.java +++ b/server/src/main/java/org/opensearch/cluster/decommission/DecommissionService.java @@ -13,12 +13,12 @@ import org.apache.logging.log4j.message.ParameterizedMessage; import org.opensearch.OpenSearchTimeoutException; import org.opensearch.action.ActionListener; +import org.opensearch.action.admin.cluster.decommission.awareness.put.DecommissionRequest; import org.opensearch.action.admin.cluster.decommission.awareness.put.DecommissionResponse; import org.opensearch.action.support.master.AcknowledgedResponse; import org.opensearch.cluster.ClusterState; import org.opensearch.cluster.ClusterStateObserver; import org.opensearch.cluster.ClusterStateUpdateTask; -import org.opensearch.cluster.NotClusterManagerException; import org.opensearch.cluster.metadata.Metadata; import org.opensearch.cluster.node.DiscoveryNode; import org.opensearch.cluster.routing.allocation.AllocationService; @@ -67,6 +67,7 @@ public class DecommissionService { private final TransportService transportService; private final ThreadPool threadPool; private final DecommissionController decommissionController; + private final long startTime; private volatile List awarenessAttributes; private volatile Map> forcedAwarenessAttributes; @@ -83,6 +84,7 @@ public DecommissionService( this.transportService = transportService; this.threadPool = threadPool; this.decommissionController = new DecommissionController(clusterService, transportService, allocationService, threadPool); + this.startTime = threadPool.relativeTimeInMillis(); this.awarenessAttributes = CLUSTER_ROUTING_ALLOCATION_AWARENESS_ATTRIBUTE_SETTING.get(settings); clusterSettings.addSettingsUpdateConsumer(CLUSTER_ROUTING_ALLOCATION_AWARENESS_ATTRIBUTE_SETTING, this::setAwarenessAttributes); @@ -113,13 +115,14 @@ private void setForcedAwarenessAttributes(Settings forceSettings) { * Starts the new decommission request and registers the metadata with status as {@link DecommissionStatus#INIT} * Once the status is updated, it tries to exclude to-be-decommissioned cluster manager eligible nodes from Voting Configuration * - * @param decommissionAttribute register decommission attribute in the metadata request + * @param decommissionRequest request for decommission action * @param listener register decommission listener */ public void startDecommissionAction( - final DecommissionAttribute decommissionAttribute, + final DecommissionRequest decommissionRequest, final ActionListener listener ) { + final DecommissionAttribute decommissionAttribute = decommissionRequest.getDecommissionAttribute(); // register the metadata with status as INIT as first step clusterService.submitStateUpdateTask("decommission [" + decommissionAttribute + "]", new ClusterStateUpdateTask(Priority.URGENT) { @Override @@ -129,6 +132,7 @@ public ClusterState execute(ClusterState currentState) { DecommissionAttributeMetadata decommissionAttributeMetadata = currentState.metadata().decommissionAttributeMetadata(); // check that request is eligible to proceed ensureEligibleRequest(decommissionAttributeMetadata, decommissionAttribute); + ensureEligibleRetry(decommissionRequest, decommissionAttributeMetadata); decommissionAttributeMetadata = new DecommissionAttributeMetadata(decommissionAttribute); logger.info("registering decommission metadata [{}] to execute action", decommissionAttributeMetadata.toString()); return ClusterState.builder(currentState) @@ -157,15 +161,17 @@ public void clusterStateProcessed(String source, ClusterState oldState, ClusterS decommissionAttributeMetadata.decommissionAttribute(), decommissionAttributeMetadata.status() ); - decommissionClusterManagerNodes(decommissionAttributeMetadata.decommissionAttribute(), listener); + assert decommissionAttributeMetadata.decommissionAttribute().equals(decommissionRequest.getDecommissionAttribute()); + decommissionClusterManagerNodes(decommissionRequest, listener); } }); } private synchronized void decommissionClusterManagerNodes( - final DecommissionAttribute decommissionAttribute, + final DecommissionRequest decommissionRequest, ActionListener listener ) { + final DecommissionAttribute decommissionAttribute = decommissionRequest.getDecommissionAttribute(); ClusterState state = clusterService.getClusterApplierService().state(); // since here metadata is already registered with INIT, we can guarantee that no new node with decommission attribute can further // join the cluster @@ -211,18 +217,22 @@ public void onResponse(Void unused) { failDecommissionedNodes(clusterService.getClusterApplierService().state()); } } else { - // explicitly calling listener.onFailure with NotClusterManagerException as the local node is not the cluster manager - // this will ensures that request is retried until cluster manager times out - logger.info( - "local node is not eligible to process the request, " - + "throwing NotClusterManagerException to attempt a retry on an eligible node" - ); - listener.onFailure( - new NotClusterManagerException( - "node [" - + transportService.getLocalNode().toString() - + "] not eligible to execute decommission request. Will retry until timeout." - ) + // since the local node is no longer cluster manager which could've happened due to leader abdication, + // hence retrying the decommission action until it times out + logger.info("local node is not eligible to process the request, " + "retrying the transport action until it times out"); + decommissionController.retryDecommissionAction( + decommissionRequest, + startTime, + ActionListener.delegateResponse(listener, (delegatedListener, t) -> { + logger.debug( + () -> new ParameterizedMessage( + "failed to retry decommission action for attribute [{}]", + decommissionRequest.getDecommissionAttribute() + ), + t + ); + delegatedListener.onFailure(t); + }) ); } } @@ -465,6 +475,21 @@ private static void ensureEligibleRequest( } } + private static void ensureEligibleRetry( + DecommissionRequest decommissionRequest, + DecommissionAttributeMetadata decommissionAttributeMetadata + ) { + if (decommissionAttributeMetadata != null) { + if (decommissionAttributeMetadata.status().equals(DecommissionStatus.INIT) + && decommissionRequest.retryOnClusterManagerChange() == false) { + throw new DecommissioningFailedException( + decommissionRequest.getDecommissionAttribute(), + "concurrent request received to decommission attribute" + ); + } + } + } + private ActionListener statusUpdateListener() { return new ActionListener<>() { @Override diff --git a/server/src/main/java/org/opensearch/rest/action/admin/cluster/RestDecommissionAction.java b/server/src/main/java/org/opensearch/rest/action/admin/cluster/RestDecommissionAction.java index 979bf05f537b7..446e3b9ca6136 100644 --- a/server/src/main/java/org/opensearch/rest/action/admin/cluster/RestDecommissionAction.java +++ b/server/src/main/java/org/opensearch/rest/action/admin/cluster/RestDecommissionAction.java @@ -12,6 +12,7 @@ import org.opensearch.client.Requests; import org.opensearch.client.node.NodeClient; import org.opensearch.cluster.decommission.DecommissionAttribute; +import org.opensearch.common.unit.TimeValue; import org.opensearch.rest.BaseRestHandler; import org.opensearch.rest.RestRequest; import org.opensearch.rest.action.RestToXContentListener; @@ -29,6 +30,8 @@ */ public class RestDecommissionAction extends BaseRestHandler { + private static final TimeValue DEFAULT_RETRY_TIMEOUT = TimeValue.timeValueMinutes(5L); + @Override public List routes() { return singletonList(new Route(PUT, "/_cluster/decommission/awareness/{awareness_attribute_name}/{awareness_attribute_value}")); @@ -49,6 +52,11 @@ DecommissionRequest createRequest(RestRequest request) throws IOException { DecommissionRequest decommissionRequest = Requests.decommissionRequest(); String attributeName = request.param("awareness_attribute_name"); String attributeValue = request.param("awareness_attribute_value"); - return decommissionRequest.setDecommissionAttribute(new DecommissionAttribute(attributeName, attributeValue)); + // for REST request, we will set the retry flag to false. User won't have the option to execute retry on REST + return decommissionRequest.setDecommissionAttribute(new DecommissionAttribute(attributeName, attributeValue)) + .setRetryOnClusterManagerChange(false) + .setRetryTimeout( + TimeValue.parseTimeValue(request.param("timeout"), DEFAULT_RETRY_TIMEOUT, getClass().getSimpleName() + ".timeout") + ); } } diff --git a/server/src/test/java/org/opensearch/action/admin/cluster/decommission/awareness/put/DecommissionRequestTests.java b/server/src/test/java/org/opensearch/action/admin/cluster/decommission/awareness/put/DecommissionRequestTests.java index c189b5702dea0..658cbf58ff786 100644 --- a/server/src/test/java/org/opensearch/action/admin/cluster/decommission/awareness/put/DecommissionRequestTests.java +++ b/server/src/test/java/org/opensearch/action/admin/cluster/decommission/awareness/put/DecommissionRequestTests.java @@ -10,6 +10,7 @@ import org.opensearch.action.ActionRequestValidationException; import org.opensearch.cluster.decommission.DecommissionAttribute; +import org.opensearch.common.unit.TimeValue; import org.opensearch.test.OpenSearchTestCase; import java.io.IOException; @@ -19,8 +20,9 @@ public class DecommissionRequestTests extends OpenSearchTestCase { public void testSerialization() throws IOException { String attributeName = "zone"; String attributeValue = "zone-1"; + TimeValue retryTimeout = TimeValue.timeValueMinutes(between(0, 10)); DecommissionAttribute decommissionAttribute = new DecommissionAttribute(attributeName, attributeValue); - final DecommissionRequest originalRequest = new DecommissionRequest(decommissionAttribute); + final DecommissionRequest originalRequest = new DecommissionRequest(decommissionAttribute, retryTimeout); final DecommissionRequest deserialized = copyWriteable(originalRequest, writableRegistry(), DecommissionRequest::new); @@ -31,9 +33,10 @@ public void testValidation() { { String attributeName = null; String attributeValue = "test"; + TimeValue retryTimeout = TimeValue.timeValueMinutes(between(0, 10)); DecommissionAttribute decommissionAttribute = new DecommissionAttribute(attributeName, attributeValue); - final DecommissionRequest request = new DecommissionRequest(decommissionAttribute); + final DecommissionRequest request = new DecommissionRequest(decommissionAttribute, retryTimeout); ActionRequestValidationException e = request.validate(); assertNotNull(e); assertTrue(e.getMessage().contains("attribute name is missing")); @@ -41,9 +44,10 @@ public void testValidation() { { String attributeName = "zone"; String attributeValue = ""; + TimeValue retryTimeout = TimeValue.timeValueMinutes(between(0, 10)); DecommissionAttribute decommissionAttribute = new DecommissionAttribute(attributeName, attributeValue); - final DecommissionRequest request = new DecommissionRequest(decommissionAttribute); + final DecommissionRequest request = new DecommissionRequest(decommissionAttribute, retryTimeout); ActionRequestValidationException e = request.validate(); assertNotNull(e); assertTrue(e.getMessage().contains("attribute value is missing")); @@ -51,9 +55,10 @@ public void testValidation() { { String attributeName = "zone"; String attributeValue = "test"; + TimeValue retryTimeout = TimeValue.timeValueMinutes(between(0, 10)); DecommissionAttribute decommissionAttribute = new DecommissionAttribute(attributeName, attributeValue); - final DecommissionRequest request = new DecommissionRequest(decommissionAttribute); + final DecommissionRequest request = new DecommissionRequest(decommissionAttribute, retryTimeout); ActionRequestValidationException e = request.validate(); assertNull(e); } diff --git a/server/src/test/java/org/opensearch/cluster/decommission/DecommissionServiceTests.java b/server/src/test/java/org/opensearch/cluster/decommission/DecommissionServiceTests.java index 840ce1634a68e..f87b0d50cbb7e 100644 --- a/server/src/test/java/org/opensearch/cluster/decommission/DecommissionServiceTests.java +++ b/server/src/test/java/org/opensearch/cluster/decommission/DecommissionServiceTests.java @@ -15,6 +15,7 @@ import org.mockito.Mockito; import org.opensearch.Version; import org.opensearch.action.ActionListener; +import org.opensearch.action.admin.cluster.decommission.awareness.put.DecommissionRequest; import org.opensearch.action.admin.cluster.decommission.awareness.put.DecommissionResponse; import org.opensearch.action.admin.cluster.configuration.ClearVotingConfigExclusionsRequest; import org.opensearch.action.support.master.AcknowledgedResponse; @@ -30,6 +31,7 @@ import org.opensearch.cluster.service.ClusterService; import org.opensearch.common.settings.ClusterSettings; import org.opensearch.common.settings.Settings; +import org.opensearch.common.unit.TimeValue; import org.opensearch.test.OpenSearchTestCase; import org.opensearch.test.transport.MockTransport; import org.opensearch.threadpool.TestThreadPool; @@ -138,7 +140,8 @@ public void onFailure(Exception e) { countDownLatch.countDown(); } }; - decommissionService.startDecommissionAction(decommissionAttribute, listener); + TimeValue retryTimeout = TimeValue.timeValueMinutes(between(0, 10)); + decommissionService.startDecommissionAction(new DecommissionRequest(decommissionAttribute, retryTimeout), listener); assertTrue(countDownLatch.await(30, TimeUnit.SECONDS)); } @@ -165,7 +168,8 @@ public void onFailure(Exception e) { countDownLatch.countDown(); } }; - decommissionService.startDecommissionAction(decommissionAttribute, listener); + TimeValue retryTimeout = TimeValue.timeValueMinutes(between(0, 10)); + decommissionService.startDecommissionAction(new DecommissionRequest(decommissionAttribute, retryTimeout), listener); assertTrue(countDownLatch.await(30, TimeUnit.SECONDS)); } @@ -202,7 +206,9 @@ public void onFailure(Exception e) { countDownLatch.countDown(); } }; - decommissionService.startDecommissionAction(new DecommissionAttribute("zone", "zone_2"), listener); + TimeValue retryTimeout = TimeValue.timeValueMinutes(between(0, 10)); + DecommissionAttribute decommissionAttribute = new DecommissionAttribute("zone", "zone_2"); + decommissionService.startDecommissionAction(new DecommissionRequest(decommissionAttribute, retryTimeout), listener); assertTrue(countDownLatch.await(30, TimeUnit.SECONDS)); }