Skip to content

Commit

Permalink
Enforce external id uniqueness during DesiredNode construction (#84227)
Browse files Browse the repository at this point in the history
This commit introduces some small refactorings to improve the desired
nodes codebase.

- DesiredNode must contain a valid external id, otherwise it cannot
  be built.
- DesiredNodes now stores desired nodes as a map that uses desired
  nodes external id as the key. This fixes a small bug around
  idempotent updates, as before we were using a list and comparing
  the desired nodes using that list.
  • Loading branch information
fcofdez authored Apr 13, 2022
1 parent 9af6856 commit a372b54
Show file tree
Hide file tree
Showing 13 changed files with 379 additions and 151 deletions.
5 changes: 5 additions & 0 deletions docs/changelog/84227.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 84227
summary: Enforce external id uniqueness during `DesiredNode` construction
area: Distributed
type: bug
issues: []
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,9 @@ teardown:
_internal.delete_desired_nodes: {}
---
"Test update desired nodes":
- skip:
reason: "contains is a newly added assertion"
features: contains
- do:
cluster.state: {}

Expand Down Expand Up @@ -49,15 +52,17 @@ teardown:

- do:
_internal.get_desired_nodes: {}
- match:
$body:
history_id: "test"
version: 2
nodes:
- { settings: { node: { name: "instance-000187" } }, processors: 8, memory: "64gb", storage: "128gb", node_version: $es_version }
- { settings: { node: { name: "instance-000188" } }, processors: 16, memory: "128gb", storage: "1tb", node_version: $es_version }

- match: { history_id: "test" }
- match: { version: 2 }
- length: { nodes: 2 }
- contains: { nodes: { settings: { node: { name: "instance-000187" } }, processors: 8, memory: "64gb", storage: "128gb", node_version: $es_version } }
- contains: { nodes: { settings: { node: { name: "instance-000188" } }, processors: 16, memory: "128gb", storage: "1tb", node_version: $es_version } }
---
"Test update move to a new history id":
- skip:
reason: "contains is a newly added assertion"
features: contains
- do:
cluster.state: {}

Expand Down Expand Up @@ -97,13 +102,11 @@ teardown:

- do:
_internal.get_desired_nodes: {}
- match:
$body:
history_id: "new_history"
version: 1
nodes:
- { settings: { node: { external_id: "instance-000187" } }, processors: 8, memory: "64gb", storage: "128gb", node_version: $es_version }
- { settings: { node: { external_id: "instance-000188" } }, processors: 16, memory: "128gb", storage: "1tb", node_version: $es_version }
- match: { history_id: "new_history" }
- match: { version: 1 }
- length: { nodes: 2 }
- contains: { nodes: { settings: { node: { external_id: "instance-000187" } }, processors: 8, memory: "64gb", storage: "128gb", node_version: $es_version } }
- contains: { nodes: { settings: { node: { external_id: "instance-000188" } }, processors: 16, memory: "128gb", storage: "1tb", node_version: $es_version } }
---
"Test delete desired nodes":
- do:
Expand Down Expand Up @@ -142,6 +145,9 @@ teardown:
- match: { status: 404 }
---
"Test update desired nodes is idempotent":
- skip:
reason: "contains is a newly added assertion"
features: contains
- do:
cluster.state: {}

Expand All @@ -158,16 +164,51 @@ teardown:
body:
nodes:
- { settings: { "node.external_id": "instance-000187" }, processors: 8, memory: "64gb", storage: "128gb", node_version: $es_version }
- { settings: { "node.external_id": "instance-000188" }, processors: 8, memory: "64gb", storage: "128gb", node_version: $es_version }
- match: { replaced_existing_history_id: false }

- do:
_internal.get_desired_nodes: {}
- match:
$body:

- match: { history_id: "test" }
- match: { version: 1 }
- length: { nodes: 2 }
- contains: { nodes: { settings: { node: { external_id: "instance-000187" } }, processors: 8, memory: "64gb", storage: "128gb", node_version: $es_version } }
- contains: { nodes: { settings: { node: { external_id: "instance-000188" } }, processors: 8, memory: "64gb", storage: "128gb", node_version: $es_version } }

- do:
_internal.update_desired_nodes:
history_id: "test"
version: 1
nodes:
- { settings: { node: { external_id: "instance-000187" } }, processors: 8, memory: "64gb", storage: "128gb", node_version: $es_version }
body:
nodes:
- { settings: { "node.external_id": "instance-000187" }, processors: 8, memory: "64gb", storage: "128gb", node_version: $es_version }
- { settings: { "node.external_id": "instance-000188" }, processors: 8, memory: "64gb", storage: "128gb", node_version: $es_version }

- match: { replaced_existing_history_id: false }

- do:
_internal.get_desired_nodes: {}

- match: { history_id: "test" }
- match: { version: 1 }
- length: { nodes: 2 }
- contains: { nodes: { settings: { node: { external_id: "instance-000187" } }, processors: 8, memory: "64gb", storage: "128gb", node_version: $es_version } }
- contains: { nodes: { settings: { node: { external_id: "instance-000188" } }, processors: 8, memory: "64gb", storage: "128gb", node_version: $es_version } }
---
"Test update desired nodes is idempotent with different order":
- skip:
version: " - 8.2.99"
features: contains
reason: "Bug fixed in 8.3.0 and uses contains feature"
- do:
cluster.state: {}

- set: { master_node: master }

- do:
nodes.info: {}
- set: { nodes.$master.version: es_version }

- do:
_internal.update_desired_nodes:
Expand All @@ -176,16 +217,37 @@ teardown:
body:
nodes:
- { settings: { "node.external_id": "instance-000187" }, processors: 8, memory: "64gb", storage: "128gb", node_version: $es_version }
- { settings: { "node.external_id": "instance-000188" }, processors: 8, memory: "64gb", storage: "128gb", node_version: $es_version }
- match: { replaced_existing_history_id: false }

- do:
_internal.get_desired_nodes: {}
- match:
$body:

- match: { history_id: "test" }
- match: { version: 1 }
- length: { nodes: 2 }
- contains: { nodes: { settings: { node: { external_id: "instance-000187" } }, processors: 8, memory: "64gb", storage: "128gb", node_version: $es_version } }
- contains: { nodes: { settings: { node: { external_id: "instance-000188" } }, processors: 8, memory: "64gb", storage: "128gb", node_version: $es_version } }

- do:
_internal.update_desired_nodes:
history_id: "test"
version: 1
nodes:
- { settings: { node: { external_id: "instance-000187" } }, processors: 8, memory: "64gb", storage: "128gb", node_version: $es_version }
body:
nodes:
- { settings: { "node.external_id": "instance-000188" }, processors: 8, memory: "64gb", storage: "128gb", node_version: $es_version }
- { settings: { "node.external_id": "instance-000187" }, processors: 8, memory: "64gb", storage: "128gb", node_version: $es_version }

- match: { replaced_existing_history_id: false }

- do:
_internal.get_desired_nodes: {}

- match: { history_id: "test" }
- match: { version: 1 }
- length: { nodes: 2 }
- contains: { nodes: { settings: { node: { external_id: "instance-000187" } }, processors: 8, memory: "64gb", storage: "128gb", node_version: $es_version } }
- contains: { nodes: { settings: { node: { external_id: "instance-000188" } }, processors: 8, memory: "64gb", storage: "128gb", node_version: $es_version } }
---
"Test going backwards within the same history is forbidden":
- do:
Expand Down Expand Up @@ -343,6 +405,9 @@ teardown:
- match: { replaced_existing_history_id: false }
---
"Test external_id or node.name is required":
- skip:
version: " - 8.2.99"
reason: "Change error code in 8.3"
- do:
cluster.state: {}

Expand All @@ -361,11 +426,13 @@ teardown:
nodes:
- { settings: { }, processors: 8, memory: "64gb", storage: "128gb", node_version: $es_version }
- match: { status: 400 }
- match: { error.type: illegal_argument_exception }
- match: { error.reason: "Nodes with ids [<missing>] in positions [0] contain invalid settings" }
- match: { error.suppressed.0.reason: "[node.name] or [node.external_id] is missing or empty" }
- match: { error.type: x_content_parse_exception }
- match: { error.caused_by.caused_by.caused_by.reason: "[node.name] or [node.external_id] is missing or empty" }
---
"Test external_id must have content":
- skip:
version: " - 8.2.99"
reason: "Change error code in 8.3"
- do:
cluster.state: {}

Expand All @@ -384,9 +451,8 @@ teardown:
nodes:
- { settings: { "node.external_id": " " }, processors: 8, memory: "64gb", storage: "128gb", node_version: $es_version }
- match: { status: 400 }
- match: { error.type: illegal_argument_exception }
- match: { error.reason: "Nodes with ids [<missing>] in positions [0] contain invalid settings" }
- match: { error.suppressed.0.reason: "[node.name] or [node.external_id] is missing or empty" }
- match: { error.type: x_content_parse_exception }
- match: { error.caused_by.caused_by.caused_by.reason: "[node.name] or [node.external_id] is missing or empty" }
---
"Test duplicated external ids are not allowed":
- do:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import org.elasticsearch.cluster.ClusterStateTaskExecutor;
import org.elasticsearch.cluster.ClusterStateUpdateTask;
import org.elasticsearch.cluster.desirednodes.VersionConflictException;
import org.elasticsearch.cluster.metadata.DesiredNode;
import org.elasticsearch.cluster.metadata.DesiredNodes;
import org.elasticsearch.cluster.metadata.DesiredNodesMetadata;
import org.elasticsearch.cluster.metadata.DesiredNodesTestCase;
Expand All @@ -27,6 +28,7 @@
import org.junit.After;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Locale;
import java.util.concurrent.CountDownLatch;
Expand All @@ -37,7 +39,6 @@
import static org.elasticsearch.cluster.metadata.DesiredNodesTestCase.randomDesiredNodes;
import static org.elasticsearch.common.util.concurrent.EsExecutors.NODE_PROCESSORS_SETTING;
import static org.elasticsearch.http.HttpTransportSettings.SETTING_HTTP_TCP_KEEP_IDLE;
import static org.elasticsearch.node.Node.NODE_EXTERNAL_ID_SETTING;
import static org.elasticsearch.node.NodeRoleSettings.NODE_ROLES_SETTING;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo;
Expand All @@ -64,10 +65,16 @@ public void testUpdateDesiredNodes() {

public void testUpdateDesiredNodesIsIdempotent() {
final DesiredNodes desiredNodes = putRandomDesiredNodes();
updateDesiredNodes(desiredNodes);

final List<DesiredNode> desiredNodesList = new ArrayList<>(desiredNodes.nodes());
if (randomBoolean()) {
Collections.shuffle(desiredNodesList, random());
}

updateDesiredNodes(new DesiredNodes(desiredNodes.historyID(), desiredNodes.version(), desiredNodesList));

final ClusterState state = client().admin().cluster().prepareState().get().getState();
final DesiredNodesMetadata metadata = state.metadata().custom(DesiredNodesMetadata.TYPE);
final DesiredNodesMetadata metadata = DesiredNodesMetadata.fromClusterState(state);
assertThat(metadata, is(notNullValue()));
final DesiredNodes latestDesiredNodes = metadata.getLatestDesiredNodes();
assertThat(latestDesiredNodes, is(equalTo(desiredNodes)));
Expand Down Expand Up @@ -247,10 +254,9 @@ public void testNodeProcessorsGetValidatedWithDesiredNodeProcessors() {
final DesiredNodes latestDesiredNodes = metadata.getLatestDesiredNodes();
assertThat(latestDesiredNodes, is(equalTo(desiredNodes)));
assertThat(latestDesiredNodes.nodes().isEmpty(), is(equalTo(false)));
assertThat(
latestDesiredNodes.nodes().get(0).settings().get(NODE_PROCESSORS_SETTING.getKey()),
is(equalTo(Integer.toString(numProcessors)))
);
for (DesiredNode desiredNode : latestDesiredNodes.nodes()) {
assertThat(desiredNode.settings().get(NODE_PROCESSORS_SETTING.getKey()), is(equalTo(Integer.toString(numProcessors))));
}
}
}

Expand All @@ -263,7 +269,7 @@ public void testUpdateDesiredNodesTasksAreBatchedCorrectly() throws Exception {
final UpdateDesiredNodesRequest request = new UpdateDesiredNodesRequest(
desiredNodes.historyID(),
desiredNodes.version(),
desiredNodes.nodes()
List.copyOf(desiredNodes.nodes())
);
// Use the master client to ensure the same updates ordering as in proposedDesiredNodesList
updateDesiredNodesFutures.add(internalCluster().masterClient().execute(UpdateDesiredNodesAction.INSTANCE, request));
Expand All @@ -280,7 +286,7 @@ public void testUpdateDesiredNodesTasksAreBatchedCorrectly() throws Exception {
}

final ClusterState state = client().admin().cluster().prepareState().get().getState();
final DesiredNodes latestDesiredNodes = DesiredNodesMetadata.latestFromClusterState(state);
final DesiredNodes latestDesiredNodes = DesiredNodes.latestFromClusterState(state);
final DesiredNodes latestProposedDesiredNodes = proposedDesiredNodes.get(proposedDesiredNodes.size() - 1);
assertThat(latestDesiredNodes, equalTo(latestProposedDesiredNodes));
}
Expand Down Expand Up @@ -308,7 +314,7 @@ public void testDeleteDesiredNodesTasksAreBatchedCorrectly() throws Exception {
}

final ClusterState state = client().admin().cluster().prepareState().get().getState();
final DesiredNodes latestDesiredNodes = DesiredNodesMetadata.latestFromClusterState(state);
final DesiredNodes latestDesiredNodes = DesiredNodes.latestFromClusterState(state);
assertThat(latestDesiredNodes, is(nullValue()));
}

Expand All @@ -328,21 +334,6 @@ public void testDeleteDesiredNodes() {
expectThrows(ResourceNotFoundException.class, this::getLatestDesiredNodes);
}

public void testEmptyExternalIDIsInvalid() {
final Consumer<Settings.Builder> settingsConsumer = (settings) -> settings.put(NODE_EXTERNAL_ID_SETTING.getKey(), " ");
final DesiredNodes desiredNodes = new DesiredNodes(
UUIDs.randomBase64UUID(),
randomIntBetween(1, 20),
randomList(1, 20, () -> randomDesiredNode(Version.CURRENT, settingsConsumer))
);

final IllegalArgumentException exception = expectThrows(IllegalArgumentException.class, () -> updateDesiredNodes(desiredNodes));
assertThat(exception.getMessage(), containsString("Nodes with ids"));
assertThat(exception.getMessage(), containsString("contain invalid settings"));
assertThat(exception.getSuppressed().length > 0, is(equalTo(true)));
assertThat(exception.getSuppressed()[0].getMessage(), containsString("[node.external_id] is missing or empty"));
}

private void deleteDesiredNodes() {
final DeleteDesiredNodesAction.Request request = new DeleteDesiredNodesAction.Request();
client().execute(DeleteDesiredNodesAction.INSTANCE, request).actionGet();
Expand All @@ -364,7 +355,7 @@ private UpdateDesiredNodesResponse updateDesiredNodes(DesiredNodes desiredNodes)
final UpdateDesiredNodesRequest request = new UpdateDesiredNodesRequest(
desiredNodes.historyID(),
desiredNodes.version(),
desiredNodes.nodes()
List.copyOf(desiredNodes.nodes())
);
return client().execute(UpdateDesiredNodesAction.INSTANCE, request).actionGet();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.metadata.DesiredNodes;
import org.elasticsearch.cluster.metadata.DesiredNodesMetadata;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.inject.Inject;
Expand Down Expand Up @@ -55,7 +54,7 @@ protected void masterOperation(
ClusterState state,
ActionListener<GetDesiredNodesAction.Response> listener
) throws Exception {
final DesiredNodes latestDesiredNodes = DesiredNodesMetadata.latestFromClusterState(state);
final DesiredNodes latestDesiredNodes = DesiredNodes.latestFromClusterState(state);
if (latestDesiredNodes == null) {
listener.onFailure(new ResourceNotFoundException("Desired nodes not found"));
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,8 +74,7 @@ protected void masterOperation(
ActionListener<UpdateDesiredNodesResponse> listener
) throws Exception {
try {
DesiredNodes proposedDesiredNodes = new DesiredNodes(request.getHistoryID(), request.getVersion(), request.getNodes());
settingsValidator.validate(proposedDesiredNodes);
settingsValidator.validate(request.getNodes());

clusterService.submitStateUpdateTask(
"update-desired-nodes",
Expand All @@ -85,8 +84,8 @@ protected void masterOperation(
@Override
public ClusterState execute(ClusterState currentState) {
final ClusterState updatedState = updateDesiredNodes(currentState, request);
final DesiredNodes previousDesiredNodes = DesiredNodesMetadata.latestFromClusterState(currentState);
final DesiredNodes latestDesiredNodes = DesiredNodesMetadata.latestFromClusterState(updatedState);
final DesiredNodes previousDesiredNodes = DesiredNodes.latestFromClusterState(currentState);
final DesiredNodes latestDesiredNodes = DesiredNodes.latestFromClusterState(updatedState);
replacedExistingHistoryId = previousDesiredNodes != null
&& previousDesiredNodes.hasSameHistoryId(latestDesiredNodes) == false;
return updatedState;
Expand All @@ -110,9 +109,9 @@ public void clusterStateProcessed(ClusterState oldState, ClusterState newState)
}

static ClusterState updateDesiredNodes(ClusterState currentState, UpdateDesiredNodesRequest request) {
DesiredNodesMetadata desiredNodesMetadata = currentState.metadata().custom(DesiredNodesMetadata.TYPE, DesiredNodesMetadata.EMPTY);
DesiredNodes latestDesiredNodes = desiredNodesMetadata.getLatestDesiredNodes();
DesiredNodes proposedDesiredNodes = new DesiredNodes(request.getHistoryID(), request.getVersion(), request.getNodes());
final DesiredNodesMetadata desiredNodesMetadata = DesiredNodesMetadata.fromClusterState(currentState);
final DesiredNodes latestDesiredNodes = desiredNodesMetadata.getLatestDesiredNodes();
final DesiredNodes proposedDesiredNodes = new DesiredNodes(request.getHistoryID(), request.getVersion(), request.getNodes());

if (latestDesiredNodes != null) {
if (latestDesiredNodes.equals(proposedDesiredNodes)) {
Expand Down
Loading

0 comments on commit a372b54

Please sign in to comment.