Skip to content

Commit

Permalink
Add noop detection to node shutdown actions (#85914)
Browse files Browse the repository at this point in the history
When node shutdown apis are called to add or remove a node to be
shutdown, it is possible the given node is already shutting down. In
this case, there is no need to submit a cluster state update. This
commit detects when this no-op case occurs and simply returns.

relates #84847
  • Loading branch information
rjernst authored Apr 18, 2022
1 parent b34eaf6 commit a672936
Show file tree
Hide file tree
Showing 5 changed files with 176 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,10 @@ public static Optional<NodesShutdownMetadata> getShutdowns(final ClusterState st
return Optional.of(state).map(ClusterState::metadata).map(m -> m.custom(TYPE));
}

public static NodesShutdownMetadata getShutdownsOrEmpty(final ClusterState state) {
return getShutdowns(state).orElse(EMPTY);
}

private final Map<String, SingleNodeShutdownMetadata> nodes;

public NodesShutdownMetadata(Map<String, SingleNodeShutdownMetadata> nodes) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;

import static org.elasticsearch.cluster.metadata.NodesShutdownMetadata.getShutdownsOrEmpty;

public class TransportDeleteShutdownNodeAction extends AcknowledgedTransportMasterNodeAction<DeleteShutdownNodeAction.Request> {
private static final Logger logger = LogManager.getLogger(TransportDeleteShutdownNodeAction.class);

Expand Down Expand Up @@ -71,7 +73,12 @@ protected void masterOperation(
clusterService.submitStateUpdateTask("delete-node-shutdown-" + request.getNodeId(), new ClusterStateUpdateTask() {
@Override
public ClusterState execute(ClusterState currentState) throws Exception {
NodesShutdownMetadata currentShutdownMetadata = currentState.metadata().custom(NodesShutdownMetadata.TYPE);
NodesShutdownMetadata currentShutdownMetadata = getShutdownsOrEmpty(currentState);
var existing = currentShutdownMetadata.getAllNodeMetadataMap().get(request.getNodeId());
if (existing == null) {
// noop, the node has already been removed by the time we got to this update task
return currentState;
}

logger.info("removing shutdown record for node [{}]", request.getNodeId());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,10 @@
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;

import java.util.Objects;

import static org.elasticsearch.cluster.metadata.NodesShutdownMetadata.getShutdownsOrEmpty;

public class TransportPutShutdownNodeAction extends AcknowledgedTransportMasterNodeAction<PutShutdownNodeAction.Request> {
private static final Logger logger = LogManager.getLogger(TransportPutShutdownNodeAction.class);

Expand Down Expand Up @@ -61,13 +65,18 @@ protected void masterOperation(
ClusterState state,
ActionListener<AcknowledgedResponse> listener
) throws Exception {
if (isNoop(state, request)) {
listener.onResponse(AcknowledgedResponse.TRUE);
return;
}
clusterService.submitStateUpdateTask("put-node-shutdown-" + request.getNodeId(), new ClusterStateUpdateTask() {
@Override
public ClusterState execute(ClusterState currentState) {
var currentShutdownMetadata = currentState.metadata().custom(NodesShutdownMetadata.TYPE, NodesShutdownMetadata.EMPTY);
if (isNoop(currentState, request)) {
return currentState;
}

final boolean nodeSeen = currentState.getNodes().nodeExists(request.getNodeId());

SingleNodeShutdownMetadata newNodeMetadata = SingleNodeShutdownMetadata.builder()
.setNodeId(request.getNodeId())
.setType(request.getType())
Expand All @@ -78,7 +87,8 @@ public ClusterState execute(ClusterState currentState) {
.setTargetNodeName(request.getTargetNodeName())
.build();

// Verify that there's not already a shutdown metadata for this node
// log the update
var currentShutdownMetadata = getShutdownsOrEmpty(currentState);
SingleNodeShutdownMetadata existingRecord = currentShutdownMetadata.getAllNodeMetadataMap().get(request.getNodeId());
if (existingRecord != null) {
logger.info("updating existing shutdown record {} with new record {}", existingRecord, newNodeMetadata);
Expand Down Expand Up @@ -132,6 +142,16 @@ public void onFailure(Exception e) {
}, newExecutor());
}

private static boolean isNoop(ClusterState state, PutShutdownNodeAction.Request request) {
var currentShutdownMetadata = getShutdownsOrEmpty(state);
var existing = currentShutdownMetadata.getAllNodeMetadataMap().get(request.getNodeId());
return existing != null
&& existing.getType().equals(request.getType())
&& existing.getReason().equals(request.getReason())
&& Objects.equals(existing.getAllocationDelay(), request.getAllocationDelay())
&& Objects.equals(existing.getTargetNodeName(), request.getTargetNodeName());
}

@Override
protected ClusterBlockException checkBlock(PutShutdownNodeAction.Request request, ClusterState state) {
return state.blocks().globalBlockedException(ClusterBlockLevel.METADATA_WRITE);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

package org.elasticsearch.xpack.shutdown;

import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateUpdateTask;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.metadata.Metadata;
import org.elasticsearch.cluster.metadata.NodesShutdownMetadata;
import org.elasticsearch.cluster.metadata.SingleNodeShutdownMetadata;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import org.junit.Before;
import org.mockito.ArgumentCaptor;

import java.util.Map;

import static org.elasticsearch.cluster.metadata.NodesShutdownMetadata.TYPE;
import static org.hamcrest.Matchers.sameInstance;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;

public class TransportDeleteShutdownNodeActionTests extends ESTestCase {
private ClusterService clusterService;
private TransportDeleteShutdownNodeAction action;

@Before
public void init() {
// TODO: it takes almost 2 seconds to create these mocks....WHY?!?
var threadPool = mock(ThreadPool.class);
var transportService = mock(TransportService.class);
clusterService = mock(ClusterService.class);
var actionFilters = mock(ActionFilters.class);
var indexNameExpressionResolver = mock(IndexNameExpressionResolver.class);
action = new TransportDeleteShutdownNodeAction(
transportService,
clusterService,
threadPool,
actionFilters,
indexNameExpressionResolver
);
}

public void testNoop() throws Exception {
var singleNodeMetadata = mock(SingleNodeShutdownMetadata.class);
var nodesShutdownMetadata = new NodesShutdownMetadata(Map.of("node1", singleNodeMetadata));
var metadata = Metadata.builder().putCustom(TYPE, nodesShutdownMetadata).build();
var clusterStateWithShutdown = ClusterState.builder(ClusterState.EMPTY_STATE).metadata(metadata).build();

var request = new DeleteShutdownNodeAction.Request("node1");
action.masterOperation(null, request, clusterStateWithShutdown, ActionListener.noop());
var updateTaskCapture = ArgumentCaptor.forClass(ClusterStateUpdateTask.class);
verify(clusterService).submitStateUpdateTask(any(), updateTaskCapture.capture(), any());
ClusterState gotState = updateTaskCapture.getValue().execute(ClusterState.EMPTY_STATE);
assertThat(gotState, sameInstance(ClusterState.EMPTY_STATE));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

package org.elasticsearch.xpack.shutdown;

import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateUpdateTask;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.metadata.SingleNodeShutdownMetadata.Type;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import org.junit.Before;
import org.mockito.ArgumentCaptor;

import static org.hamcrest.Matchers.sameInstance;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.clearInvocations;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoInteractions;

public class TransportPutShutdownNodeActionTests extends ESTestCase {

private ClusterService clusterService;
private TransportPutShutdownNodeAction action;

@Before
public void init() {
// TODO: it takes almost 2 seconds to create these mocks....WHY?!?
var threadPool = mock(ThreadPool.class);
var transportService = mock(TransportService.class);
clusterService = mock(ClusterService.class);
var actionFilters = mock(ActionFilters.class);
var indexNameExpressionResolver = mock(IndexNameExpressionResolver.class);
action = new TransportPutShutdownNodeAction(
transportService,
clusterService,
threadPool,
actionFilters,
indexNameExpressionResolver
);
}

public void testNoop() throws Exception {
var type = randomFrom(Type.REMOVE, Type.REPLACE, Type.RESTART);
var allocationDelay = type == Type.RESTART ? TimeValue.timeValueMinutes(randomIntBetween(1, 3)) : null;
var targetNodeName = type == Type.REPLACE ? randomAlphaOfLength(5) : null;
var request = new PutShutdownNodeAction.Request("node1", type, "sunsetting", allocationDelay, targetNodeName);
action.masterOperation(null, request, ClusterState.EMPTY_STATE, ActionListener.noop());
var updateTaskCapture = ArgumentCaptor.forClass(ClusterStateUpdateTask.class);
verify(clusterService).submitStateUpdateTask(any(), updateTaskCapture.capture(), any());
ClusterState stableState = updateTaskCapture.getValue().execute(ClusterState.EMPTY_STATE);

// run the request again, there should be no call to submit an update task
clearInvocations(clusterService);
action.masterOperation(null, request, stableState, ActionListener.noop());
verifyNoInteractions(clusterService);

// run the request again with empty state, the update task should return the same state
action.masterOperation(null, request, ClusterState.EMPTY_STATE, ActionListener.noop());
verify(clusterService).submitStateUpdateTask(any(), updateTaskCapture.capture(), any());
ClusterState gotState = updateTaskCapture.getValue().execute(stableState);
assertThat(gotState, sameInstance(stableState));
}
}

0 comments on commit a672936

Please sign in to comment.