From a67293624072e6940e5e2b114b396eef594dc668 Mon Sep 17 00:00:00 2001 From: Ryan Ernst Date: Mon, 18 Apr 2022 06:06:58 -0700 Subject: [PATCH] Add noop detection to node shutdown actions (#85914) When node shutdown apis are called to add or remove a node to be shutdown, it is possible the given node is already shutting down. In this case, there is no need to submit a cluster state update. This commit detects when this no-op case occurs and simply returns. relates #84847 --- .../metadata/NodesShutdownMetadata.java | 4 + .../TransportDeleteShutdownNodeAction.java | 9 ++- .../TransportPutShutdownNodeAction.java | 26 ++++++- ...ransportDeleteShutdownNodeActionTests.java | 67 +++++++++++++++++ .../TransportPutShutdownNodeActionTests.java | 74 +++++++++++++++++++ 5 files changed, 176 insertions(+), 4 deletions(-) create mode 100644 x-pack/plugin/shutdown/src/test/java/org/elasticsearch/xpack/shutdown/TransportDeleteShutdownNodeActionTests.java create mode 100644 x-pack/plugin/shutdown/src/test/java/org/elasticsearch/xpack/shutdown/TransportPutShutdownNodeActionTests.java diff --git a/server/src/main/java/org/elasticsearch/cluster/metadata/NodesShutdownMetadata.java b/server/src/main/java/org/elasticsearch/cluster/metadata/NodesShutdownMetadata.java index d08b21242805d..553bdc1e25dc0 100644 --- a/server/src/main/java/org/elasticsearch/cluster/metadata/NodesShutdownMetadata.java +++ b/server/src/main/java/org/elasticsearch/cluster/metadata/NodesShutdownMetadata.java @@ -74,6 +74,10 @@ public static Optional getShutdowns(final ClusterState st return Optional.of(state).map(ClusterState::metadata).map(m -> m.custom(TYPE)); } + public static NodesShutdownMetadata getShutdownsOrEmpty(final ClusterState state) { + return getShutdowns(state).orElse(EMPTY); + } + private final Map nodes; public NodesShutdownMetadata(Map nodes) { diff --git a/x-pack/plugin/shutdown/src/main/java/org/elasticsearch/xpack/shutdown/TransportDeleteShutdownNodeAction.java b/x-pack/plugin/shutdown/src/main/java/org/elasticsearch/xpack/shutdown/TransportDeleteShutdownNodeAction.java index c1a4518d02637..8483b6ccc3e36 100644 --- a/x-pack/plugin/shutdown/src/main/java/org/elasticsearch/xpack/shutdown/TransportDeleteShutdownNodeAction.java +++ b/x-pack/plugin/shutdown/src/main/java/org/elasticsearch/xpack/shutdown/TransportDeleteShutdownNodeAction.java @@ -31,6 +31,8 @@ import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; +import static org.elasticsearch.cluster.metadata.NodesShutdownMetadata.getShutdownsOrEmpty; + public class TransportDeleteShutdownNodeAction extends AcknowledgedTransportMasterNodeAction { private static final Logger logger = LogManager.getLogger(TransportDeleteShutdownNodeAction.class); @@ -71,7 +73,12 @@ protected void masterOperation( clusterService.submitStateUpdateTask("delete-node-shutdown-" + request.getNodeId(), new ClusterStateUpdateTask() { @Override public ClusterState execute(ClusterState currentState) throws Exception { - NodesShutdownMetadata currentShutdownMetadata = currentState.metadata().custom(NodesShutdownMetadata.TYPE); + NodesShutdownMetadata currentShutdownMetadata = getShutdownsOrEmpty(currentState); + var existing = currentShutdownMetadata.getAllNodeMetadataMap().get(request.getNodeId()); + if (existing == null) { + // noop, the node has already been removed by the time we got to this update task + return currentState; + } logger.info("removing shutdown record for node [{}]", request.getNodeId()); diff --git a/x-pack/plugin/shutdown/src/main/java/org/elasticsearch/xpack/shutdown/TransportPutShutdownNodeAction.java b/x-pack/plugin/shutdown/src/main/java/org/elasticsearch/xpack/shutdown/TransportPutShutdownNodeAction.java index 2a343d466970a..fc5ba1cca1e9c 100644 --- a/x-pack/plugin/shutdown/src/main/java/org/elasticsearch/xpack/shutdown/TransportPutShutdownNodeAction.java +++ b/x-pack/plugin/shutdown/src/main/java/org/elasticsearch/xpack/shutdown/TransportPutShutdownNodeAction.java @@ -31,6 +31,10 @@ import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; +import java.util.Objects; + +import static org.elasticsearch.cluster.metadata.NodesShutdownMetadata.getShutdownsOrEmpty; + public class TransportPutShutdownNodeAction extends AcknowledgedTransportMasterNodeAction { private static final Logger logger = LogManager.getLogger(TransportPutShutdownNodeAction.class); @@ -61,13 +65,18 @@ protected void masterOperation( ClusterState state, ActionListener listener ) throws Exception { + if (isNoop(state, request)) { + listener.onResponse(AcknowledgedResponse.TRUE); + return; + } clusterService.submitStateUpdateTask("put-node-shutdown-" + request.getNodeId(), new ClusterStateUpdateTask() { @Override public ClusterState execute(ClusterState currentState) { - var currentShutdownMetadata = currentState.metadata().custom(NodesShutdownMetadata.TYPE, NodesShutdownMetadata.EMPTY); + if (isNoop(currentState, request)) { + return currentState; + } final boolean nodeSeen = currentState.getNodes().nodeExists(request.getNodeId()); - SingleNodeShutdownMetadata newNodeMetadata = SingleNodeShutdownMetadata.builder() .setNodeId(request.getNodeId()) .setType(request.getType()) @@ -78,7 +87,8 @@ public ClusterState execute(ClusterState currentState) { .setTargetNodeName(request.getTargetNodeName()) .build(); - // Verify that there's not already a shutdown metadata for this node + // log the update + var currentShutdownMetadata = getShutdownsOrEmpty(currentState); SingleNodeShutdownMetadata existingRecord = currentShutdownMetadata.getAllNodeMetadataMap().get(request.getNodeId()); if (existingRecord != null) { logger.info("updating existing shutdown record {} with new record {}", existingRecord, newNodeMetadata); @@ -132,6 +142,16 @@ public void onFailure(Exception e) { }, newExecutor()); } + private static boolean isNoop(ClusterState state, PutShutdownNodeAction.Request request) { + var currentShutdownMetadata = getShutdownsOrEmpty(state); + var existing = currentShutdownMetadata.getAllNodeMetadataMap().get(request.getNodeId()); + return existing != null + && existing.getType().equals(request.getType()) + && existing.getReason().equals(request.getReason()) + && Objects.equals(existing.getAllocationDelay(), request.getAllocationDelay()) + && Objects.equals(existing.getTargetNodeName(), request.getTargetNodeName()); + } + @Override protected ClusterBlockException checkBlock(PutShutdownNodeAction.Request request, ClusterState state) { return state.blocks().globalBlockedException(ClusterBlockLevel.METADATA_WRITE); diff --git a/x-pack/plugin/shutdown/src/test/java/org/elasticsearch/xpack/shutdown/TransportDeleteShutdownNodeActionTests.java b/x-pack/plugin/shutdown/src/test/java/org/elasticsearch/xpack/shutdown/TransportDeleteShutdownNodeActionTests.java new file mode 100644 index 0000000000000..65c765ef59a91 --- /dev/null +++ b/x-pack/plugin/shutdown/src/test/java/org/elasticsearch/xpack/shutdown/TransportDeleteShutdownNodeActionTests.java @@ -0,0 +1,67 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.shutdown; + +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.support.ActionFilters; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.ClusterStateUpdateTask; +import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; +import org.elasticsearch.cluster.metadata.Metadata; +import org.elasticsearch.cluster.metadata.NodesShutdownMetadata; +import org.elasticsearch.cluster.metadata.SingleNodeShutdownMetadata; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.TransportService; +import org.junit.Before; +import org.mockito.ArgumentCaptor; + +import java.util.Map; + +import static org.elasticsearch.cluster.metadata.NodesShutdownMetadata.TYPE; +import static org.hamcrest.Matchers.sameInstance; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; + +public class TransportDeleteShutdownNodeActionTests extends ESTestCase { + private ClusterService clusterService; + private TransportDeleteShutdownNodeAction action; + + @Before + public void init() { + // TODO: it takes almost 2 seconds to create these mocks....WHY?!? + var threadPool = mock(ThreadPool.class); + var transportService = mock(TransportService.class); + clusterService = mock(ClusterService.class); + var actionFilters = mock(ActionFilters.class); + var indexNameExpressionResolver = mock(IndexNameExpressionResolver.class); + action = new TransportDeleteShutdownNodeAction( + transportService, + clusterService, + threadPool, + actionFilters, + indexNameExpressionResolver + ); + } + + public void testNoop() throws Exception { + var singleNodeMetadata = mock(SingleNodeShutdownMetadata.class); + var nodesShutdownMetadata = new NodesShutdownMetadata(Map.of("node1", singleNodeMetadata)); + var metadata = Metadata.builder().putCustom(TYPE, nodesShutdownMetadata).build(); + var clusterStateWithShutdown = ClusterState.builder(ClusterState.EMPTY_STATE).metadata(metadata).build(); + + var request = new DeleteShutdownNodeAction.Request("node1"); + action.masterOperation(null, request, clusterStateWithShutdown, ActionListener.noop()); + var updateTaskCapture = ArgumentCaptor.forClass(ClusterStateUpdateTask.class); + verify(clusterService).submitStateUpdateTask(any(), updateTaskCapture.capture(), any()); + ClusterState gotState = updateTaskCapture.getValue().execute(ClusterState.EMPTY_STATE); + assertThat(gotState, sameInstance(ClusterState.EMPTY_STATE)); + } +} diff --git a/x-pack/plugin/shutdown/src/test/java/org/elasticsearch/xpack/shutdown/TransportPutShutdownNodeActionTests.java b/x-pack/plugin/shutdown/src/test/java/org/elasticsearch/xpack/shutdown/TransportPutShutdownNodeActionTests.java new file mode 100644 index 0000000000000..c4efd15d35143 --- /dev/null +++ b/x-pack/plugin/shutdown/src/test/java/org/elasticsearch/xpack/shutdown/TransportPutShutdownNodeActionTests.java @@ -0,0 +1,74 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.shutdown; + +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.support.ActionFilters; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.ClusterStateUpdateTask; +import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; +import org.elasticsearch.cluster.metadata.SingleNodeShutdownMetadata.Type; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.core.TimeValue; +import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.TransportService; +import org.junit.Before; +import org.mockito.ArgumentCaptor; + +import static org.hamcrest.Matchers.sameInstance; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.clearInvocations; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoInteractions; + +public class TransportPutShutdownNodeActionTests extends ESTestCase { + + private ClusterService clusterService; + private TransportPutShutdownNodeAction action; + + @Before + public void init() { + // TODO: it takes almost 2 seconds to create these mocks....WHY?!? + var threadPool = mock(ThreadPool.class); + var transportService = mock(TransportService.class); + clusterService = mock(ClusterService.class); + var actionFilters = mock(ActionFilters.class); + var indexNameExpressionResolver = mock(IndexNameExpressionResolver.class); + action = new TransportPutShutdownNodeAction( + transportService, + clusterService, + threadPool, + actionFilters, + indexNameExpressionResolver + ); + } + + public void testNoop() throws Exception { + var type = randomFrom(Type.REMOVE, Type.REPLACE, Type.RESTART); + var allocationDelay = type == Type.RESTART ? TimeValue.timeValueMinutes(randomIntBetween(1, 3)) : null; + var targetNodeName = type == Type.REPLACE ? randomAlphaOfLength(5) : null; + var request = new PutShutdownNodeAction.Request("node1", type, "sunsetting", allocationDelay, targetNodeName); + action.masterOperation(null, request, ClusterState.EMPTY_STATE, ActionListener.noop()); + var updateTaskCapture = ArgumentCaptor.forClass(ClusterStateUpdateTask.class); + verify(clusterService).submitStateUpdateTask(any(), updateTaskCapture.capture(), any()); + ClusterState stableState = updateTaskCapture.getValue().execute(ClusterState.EMPTY_STATE); + + // run the request again, there should be no call to submit an update task + clearInvocations(clusterService); + action.masterOperation(null, request, stableState, ActionListener.noop()); + verifyNoInteractions(clusterService); + + // run the request again with empty state, the update task should return the same state + action.masterOperation(null, request, ClusterState.EMPTY_STATE, ActionListener.noop()); + verify(clusterService).submitStateUpdateTask(any(), updateTaskCapture.capture(), any()); + ClusterState gotState = updateTaskCapture.getValue().execute(stableState); + assertThat(gotState, sameInstance(stableState)); + } +}