Skip to content

Commit

Permalink
Add a cluster listener to fix missing cluster features after upgrade (#…
Browse files Browse the repository at this point in the history
…110710) (#110924)

Non-master-eligible nodes that are already part of a cluster when the master is upgraded don't re-join the cluster, so their cluster features never get updated. This adds a cluster listener that spots this occurring, and manually gets the node's features with a new transport action and updates the cluster state after the fact.
  • Loading branch information
thecoop authored Jul 16, 2024
1 parent 281d529 commit ef93c95
Show file tree
Hide file tree
Showing 11 changed files with 663 additions and 0 deletions.
6 changes: 6 additions & 0 deletions docs/changelog/110710.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
pr: 110710
summary: Add a cluster listener to fix missing node features after upgrading from a version prior to 8.13
area: Infra/Core
type: bug
issues:
- 109254
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.elasticsearch.action.admin.cluster.migration.TransportGetFeatureUpgradeStatusAction;
import org.elasticsearch.action.admin.cluster.migration.TransportPostFeatureUpgradeAction;
import org.elasticsearch.action.admin.cluster.node.capabilities.TransportNodesCapabilitiesAction;
import org.elasticsearch.action.admin.cluster.node.features.TransportNodesFeaturesAction;
import org.elasticsearch.action.admin.cluster.node.hotthreads.TransportNodesHotThreadsAction;
import org.elasticsearch.action.admin.cluster.node.info.TransportNodesInfoAction;
import org.elasticsearch.action.admin.cluster.node.reload.TransportNodesReloadSecureSettingsAction;
Expand Down Expand Up @@ -621,6 +622,7 @@ public <Request extends ActionRequest, Response extends ActionResponse> void reg
actions.register(TransportNodesInfoAction.TYPE, TransportNodesInfoAction.class);
actions.register(TransportRemoteInfoAction.TYPE, TransportRemoteInfoAction.class);
actions.register(TransportNodesCapabilitiesAction.TYPE, TransportNodesCapabilitiesAction.class);
actions.register(TransportNodesFeaturesAction.TYPE, TransportNodesFeaturesAction.class);
actions.register(RemoteClusterNodesAction.TYPE, RemoteClusterNodesAction.TransportAction.class);
actions.register(TransportNodesStatsAction.TYPE, TransportNodesStatsAction.class);
actions.register(TransportNodesUsageAction.TYPE, TransportNodesUsageAction.class);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/

package org.elasticsearch.action.admin.cluster.node.features;

import org.elasticsearch.action.support.nodes.BaseNodeResponse;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;

import java.io.IOException;
import java.util.Set;

public class NodeFeatures extends BaseNodeResponse {

private final Set<String> features;

public NodeFeatures(StreamInput in) throws IOException {
super(in);
features = in.readCollectionAsImmutableSet(StreamInput::readString);
}

public NodeFeatures(Set<String> features, DiscoveryNode node) {
super(node);
this.features = Set.copyOf(features);
}

public Set<String> nodeFeatures() {
return features;
}

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeCollection(features, StreamOutput::writeString);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/

package org.elasticsearch.action.admin.cluster.node.features;

import org.elasticsearch.action.support.nodes.BaseNodesRequest;

public class NodesFeaturesRequest extends BaseNodesRequest<NodesFeaturesRequest> {
public NodesFeaturesRequest(String... nodes) {
super(nodes);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/

package org.elasticsearch.action.admin.cluster.node.features;

import org.elasticsearch.action.FailedNodeException;
import org.elasticsearch.action.support.TransportAction;
import org.elasticsearch.action.support.nodes.BaseNodesResponse;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;

import java.io.IOException;
import java.util.List;

public class NodesFeaturesResponse extends BaseNodesResponse<NodeFeatures> {
public NodesFeaturesResponse(ClusterName clusterName, List<NodeFeatures> nodes, List<FailedNodeException> failures) {
super(clusterName, nodes, failures);
}

@Override
protected List<NodeFeatures> readNodesFrom(StreamInput in) throws IOException {
return TransportAction.localOnly();
}

@Override
protected void writeNodesTo(StreamOutput out, List<NodeFeatures> nodes) throws IOException {
TransportAction.localOnly();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/

package org.elasticsearch.action.admin.cluster.node.features;

import org.elasticsearch.action.ActionType;
import org.elasticsearch.action.FailedNodeException;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.nodes.TransportNodesAction;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.core.UpdateForV9;
import org.elasticsearch.features.FeatureService;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportRequest;
import org.elasticsearch.transport.TransportService;

import java.io.IOException;
import java.util.List;

@UpdateForV9
// @UpdateForV10 // this can be removed in v10. It may be called by v8 nodes to v9 nodes.
public class TransportNodesFeaturesAction extends TransportNodesAction<
NodesFeaturesRequest,
NodesFeaturesResponse,
TransportNodesFeaturesAction.NodeFeaturesRequest,
NodeFeatures> {

public static final ActionType<NodesFeaturesResponse> TYPE = new ActionType<>("cluster:monitor/nodes/features");

private final FeatureService featureService;

@Inject
public TransportNodesFeaturesAction(
ThreadPool threadPool,
ClusterService clusterService,
TransportService transportService,
ActionFilters actionFilters,
FeatureService featureService
) {
super(
TYPE.name(),
clusterService,
transportService,
actionFilters,
NodeFeaturesRequest::new,
threadPool.executor(ThreadPool.Names.MANAGEMENT)
);
this.featureService = featureService;
}

@Override
protected NodesFeaturesResponse newResponse(
NodesFeaturesRequest request,
List<NodeFeatures> responses,
List<FailedNodeException> failures
) {
return new NodesFeaturesResponse(clusterService.getClusterName(), responses, failures);
}

@Override
protected NodeFeaturesRequest newNodeRequest(NodesFeaturesRequest request) {
return new NodeFeaturesRequest();
}

@Override
protected NodeFeatures newNodeResponse(StreamInput in, DiscoveryNode node) throws IOException {
return new NodeFeatures(in);
}

@Override
protected NodeFeatures nodeOperation(NodeFeaturesRequest request, Task task) {
return new NodeFeatures(featureService.getNodeFeatures().keySet(), transportService.getLocalNode());
}

public static class NodeFeaturesRequest extends TransportRequest {
public NodeFeaturesRequest(StreamInput in) throws IOException {
super(in);
}

public NodeFeaturesRequest() {}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -884,6 +884,11 @@ public Map<String, Set<String>> nodeFeatures() {
return Collections.unmodifiableMap(this.nodeFeatures);
}

public Builder putNodeFeatures(String node, Set<String> features) {
this.nodeFeatures.put(node, features);
return this;
}

public Builder routingTable(RoutingTable.Builder routingTableBuilder) {
return routingTable(routingTableBuilder.build());
}
Expand Down
Loading

0 comments on commit ef93c95

Please sign in to comment.