Skip to content

Commit

Permalink
Remove 8.11 node version inference code (elastic#116019)
Browse files Browse the repository at this point in the history
  • Loading branch information
thecoop authored and alexey-ivanov-es committed Nov 28, 2024
1 parent 920c2e8 commit 25f5118
Show file tree
Hide file tree
Showing 8 changed files with 23 additions and 173 deletions.
94 changes: 15 additions & 79 deletions server/src/main/java/org/elasticsearch/cluster/ClusterState.java
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import org.elasticsearch.common.xcontent.ChunkedToXContentHelper;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.core.SuppressForbidden;
import org.elasticsearch.core.UpdateForV9;
import org.elasticsearch.index.shard.IndexLongFieldRange;
import org.elasticsearch.indices.SystemIndexDescriptor;
import org.elasticsearch.xcontent.ToXContent;
Expand Down Expand Up @@ -1025,52 +1026,25 @@ public static ClusterState readFrom(StreamInput in, DiscoveryNode localNode) thr
builder.metadata = Metadata.readFrom(in);
builder.routingTable = RoutingTable.readFrom(in);
builder.nodes = DiscoveryNodes.readFrom(in, localNode);
if (in.getTransportVersion().onOrAfter(TransportVersions.V_8_8_0)) {
builder.nodeIdsToCompatibilityVersions(in.readMap(CompatibilityVersions::readVersion));
} else {
// this clusterstate is from a pre-8.8.0 node
// infer the versions from discoverynodes for now
// leave mappings versions empty
builder.nodes()
.getNodes()
.values()
.forEach(n -> builder.putCompatibilityVersions(n.getId(), inferTransportVersion(n), Map.of()));
}
if (in.getTransportVersion().onOrAfter(TransportVersions.V_8_12_0)) {
builder.nodeFeatures(ClusterFeatures.readFrom(in));
}
builder.nodeIdsToCompatibilityVersions(in.readMap(CompatibilityVersions::readVersion));
builder.nodeFeatures(ClusterFeatures.readFrom(in));
builder.blocks = ClusterBlocks.readFrom(in);
int customSize = in.readVInt();
for (int i = 0; i < customSize; i++) {
Custom customIndexMetadata = in.readNamedWriteable(Custom.class);
builder.putCustom(customIndexMetadata.getWriteableName(), customIndexMetadata);
}
if (in.getTransportVersion().before(TransportVersions.V_8_0_0)) {
in.readVInt(); // used to be minimumMasterNodesOnPublishingMaster, which was used in 7.x for BWC with 6.x
}
return builder.build();
}

/**
* If the cluster state does not contain transport version information, this is the version
* that is inferred for all nodes on version 8.8.0 or above.
*/
@UpdateForV9(owner = UpdateForV9.Owner.CORE_INFRA)
public static final TransportVersion INFERRED_TRANSPORT_VERSION = TransportVersions.V_8_8_0;

public static final Version VERSION_INTRODUCING_TRANSPORT_VERSIONS = Version.V_8_8_0;

private static TransportVersion inferTransportVersion(DiscoveryNode node) {
TransportVersion tv;
if (node.getVersion().before(VERSION_INTRODUCING_TRANSPORT_VERSIONS)) {
// 1-to-1 mapping between Version and TransportVersion
tv = TransportVersion.fromId(node.getPre811VersionId().getAsInt());
} else {
// use the lowest value it could be for now
tv = INFERRED_TRANSPORT_VERSION;
}
return tv;
}

@Override
public void writeTo(StreamOutput out) throws IOException {
clusterName.writeTo(out);
Expand All @@ -1079,17 +1053,10 @@ public void writeTo(StreamOutput out) throws IOException {
metadata.writeTo(out);
routingTable.writeTo(out);
nodes.writeTo(out);
if (out.getTransportVersion().onOrAfter(TransportVersions.V_8_8_0)) {
out.writeMap(compatibilityVersions, StreamOutput::writeWriteable);
}
if (out.getTransportVersion().onOrAfter(TransportVersions.V_8_12_0)) {
clusterFeatures.writeTo(out);
}
out.writeMap(compatibilityVersions, StreamOutput::writeWriteable);
clusterFeatures.writeTo(out);
blocks.writeTo(out);
VersionedNamedWriteable.writeVersionedWritables(out, customs);
if (out.getTransportVersion().before(TransportVersions.V_8_0_0)) {
out.writeVInt(-1); // used to be minimumMasterNodesOnPublishingMaster, which was used in 7.x for BWC with 6.x
}
}

private static class ClusterStateDiff implements Diff<ClusterState> {
Expand All @@ -1106,7 +1073,6 @@ private static class ClusterStateDiff implements Diff<ClusterState> {

private final Diff<DiscoveryNodes> nodes;

@Nullable
private final Diff<Map<String, CompatibilityVersions>> versions;
private final Diff<ClusterFeatures> features;

Expand Down Expand Up @@ -1142,26 +1108,13 @@ private static class ClusterStateDiff implements Diff<ClusterState> {
toVersion = in.readLong();
routingTable = RoutingTable.readDiffFrom(in);
nodes = DiscoveryNodes.readDiffFrom(in, localNode);
if (in.getTransportVersion().onOrAfter(TransportVersions.V_8_8_0) && in.readBoolean()) {
versions = DiffableUtils.readJdkMapDiff(
in,
DiffableUtils.getStringKeySerializer(),
COMPATIBILITY_VERSIONS_VALUE_SERIALIZER
);
} else {
versions = null; // infer at application time
}
if (in.getTransportVersion().onOrAfter(TransportVersions.V_8_12_0)) {
features = ClusterFeatures.readDiffFrom(in);
} else {
features = null; // fill in when nodes re-register with a master that understands features
}
boolean versionPresent = in.readBoolean();
if (versionPresent == false) throw new IOException("ClusterStateDiff stream must have versions");
versions = DiffableUtils.readJdkMapDiff(in, DiffableUtils.getStringKeySerializer(), COMPATIBILITY_VERSIONS_VALUE_SERIALIZER);
features = ClusterFeatures.readDiffFrom(in);
metadata = Metadata.readDiffFrom(in);
blocks = ClusterBlocks.readDiffFrom(in);
customs = DiffableUtils.readJdkMapDiff(in, DiffableUtils.getStringKeySerializer(), CUSTOM_VALUE_SERIALIZER);
if (in.getTransportVersion().before(TransportVersions.V_8_0_0)) {
in.readVInt(); // used to be minimumMasterNodesOnPublishingMaster, which was used in 7.x for BWC with 6.x
}
}

@Override
Expand All @@ -1172,18 +1125,12 @@ public void writeTo(StreamOutput out) throws IOException {
out.writeLong(toVersion);
routingTable.writeTo(out);
nodes.writeTo(out);
if (out.getTransportVersion().onOrAfter(TransportVersions.V_8_8_0)) {
out.writeOptionalWriteable(versions);
}
if (out.getTransportVersion().onOrAfter(TransportVersions.V_8_12_0)) {
features.writeTo(out);
}
out.writeBoolean(true);
versions.writeTo(out);
features.writeTo(out);
metadata.writeTo(out);
blocks.writeTo(out);
customs.writeTo(out);
if (out.getTransportVersion().before(TransportVersions.V_8_0_0)) {
out.writeVInt(-1); // used to be minimumMasterNodesOnPublishingMaster, which was used in 7.x for BWC with 6.x
}
}

@Override
Expand All @@ -1200,19 +1147,8 @@ public ClusterState apply(ClusterState state) {
builder.version(toVersion);
builder.routingTable(routingTable.apply(state.routingTable));
builder.nodes(nodes.apply(state.nodes));
if (versions != null) {
builder.nodeIdsToCompatibilityVersions(this.versions.apply(state.compatibilityVersions));
} else {
// infer the versions from discoverynodes for now
// leave mappings versions empty
builder.nodes()
.getNodes()
.values()
.forEach(n -> builder.putCompatibilityVersions(n.getId(), inferTransportVersion(n), Map.of()));
}
if (features != null) {
builder.nodeFeatures(this.features.apply(state.clusterFeatures));
}
builder.nodeIdsToCompatibilityVersions(this.versions.apply(state.compatibilityVersions));
builder.nodeFeatures(this.features.apply(state.clusterFeatures));
builder.metadata(metadata.apply(state.metadata));
builder.blocks(blocks.apply(state.blocks));
builder.customs(customs.apply(state.customs));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,16 +8,13 @@
*/
package org.elasticsearch.cluster.coordination;

import org.elasticsearch.TransportVersion;
import org.elasticsearch.TransportVersions;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.version.CompatibilityVersions;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.transport.TransportRequest;

import java.io.IOException;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
Expand Down Expand Up @@ -72,21 +69,8 @@ public JoinRequest(
public JoinRequest(StreamInput in) throws IOException {
super(in);
sourceNode = new DiscoveryNode(in);
if (in.getTransportVersion().onOrAfter(TransportVersions.V_8_8_0)) {
compatibilityVersions = CompatibilityVersions.readVersion(in);
} else {
// there's a 1-1 mapping from Version to TransportVersion before 8.8.0
// no known mapping versions here
compatibilityVersions = new CompatibilityVersions(
TransportVersion.fromId(sourceNode.getPre811VersionId().getAsInt()),
Map.of()
);
}
if (in.getTransportVersion().onOrAfter(TransportVersions.V_8_12_0)) {
features = in.readCollectionAsSet(StreamInput::readString);
} else {
features = Set.of();
}
compatibilityVersions = CompatibilityVersions.readVersion(in);
features = in.readCollectionAsSet(StreamInput::readString);
minimumTerm = in.readLong();
optionalJoin = Optional.ofNullable(in.readOptionalWriteable(Join::new));
}
Expand All @@ -95,12 +79,8 @@ public JoinRequest(StreamInput in) throws IOException {
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
sourceNode.writeTo(out);
if (out.getTransportVersion().onOrAfter(TransportVersions.V_8_8_0)) {
compatibilityVersions.writeTo(out);
}
if (out.getTransportVersion().onOrAfter(TransportVersions.V_8_12_0)) {
out.writeCollection(features, StreamOutput::writeString);
}
compatibilityVersions.writeTo(out);
out.writeCollection(features, StreamOutput::writeString);
out.writeLong(minimumTerm);
out.writeOptionalWriteable(optionalJoin.orElse(null));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -298,10 +298,8 @@ public static Tuple<MlConfigVersion, MlConfigVersion> getMinMaxMlConfigVersion(D

public static MlConfigVersion getMlConfigVersionForNode(DiscoveryNode node) {
String mlConfigVerStr = node.getAttributes().get(ML_CONFIG_VERSION_NODE_ATTR);
if (mlConfigVerStr != null) {
return fromString(mlConfigVerStr);
}
return fromId(node.getPre811VersionId().orElseThrow(() -> new IllegalStateException("getting legacy version id not possible")));
if (mlConfigVerStr == null) throw new IllegalStateException(ML_CONFIG_VERSION_NODE_ATTR + " not present on node");
return fromString(mlConfigVerStr);
}

// Parse an MlConfigVersion from a string.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -331,10 +331,8 @@ public static Tuple<TransformConfigVersion, TransformConfigVersion> getMinMaxTra

public static TransformConfigVersion getTransformConfigVersionForNode(DiscoveryNode node) {
String transformConfigVerStr = node.getAttributes().get(TRANSFORM_CONFIG_VERSION_NODE_ATTR);
if (transformConfigVerStr != null) {
return fromString(transformConfigVerStr);
}
return fromId(node.getPre811VersionId().orElseThrow(() -> new IllegalStateException("getting legacy version id not possible")));
if (transformConfigVerStr == null) throw new IllegalStateException(TRANSFORM_CONFIG_VERSION_NODE_ATTR + " not present on node");
return fromString(transformConfigVerStr);
}

// Parse an TransformConfigVersion from a string.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -196,15 +196,6 @@ public void testGetMinMaxMlConfigVersionWhenMlConfigVersionAttrIsMissing() {
}

public void testGetMlConfigVersionForNode() {
DiscoveryNode node = DiscoveryNodeUtils.builder("_node_id4")
.name("_node_name4")
.address(new TransportAddress(InetAddress.getLoopbackAddress(), 9303))
.roles(ROLES_WITH_ML)
.version(VersionInformation.inferVersions(Version.fromString("8.7.0")))
.build();
MlConfigVersion mlConfigVersion = MlConfigVersion.getMlConfigVersionForNode(node);
assertEquals(MlConfigVersion.V_8_7_0, mlConfigVersion);

DiscoveryNode node1 = DiscoveryNodeUtils.builder("_node_id5")
.name("_node_name5")
.address(new TransportAddress(InetAddress.getLoopbackAddress(), 9304))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -155,15 +155,6 @@ public void testGetMinMaxTransformConfigVersion() {
}

public void testGetTransformConfigVersionForNode() {
DiscoveryNode node = DiscoveryNodeUtils.builder("_node_id4")
.name("_node_name4")
.address(new TransportAddress(InetAddress.getLoopbackAddress(), 9303))
.roles(ROLES_WITH_TRANSFORM)
.version(VersionInformation.inferVersions(Version.fromString("8.7.0")))
.build();
TransformConfigVersion transformConfigVersion = TransformConfigVersion.getTransformConfigVersionForNode(node);
assertEquals(TransformConfigVersion.V_8_7_0, transformConfigVersion);

DiscoveryNode node1 = DiscoveryNodeUtils.builder("_node_id5")
.name("_node_name5")
.address(new TransportAddress(InetAddress.getLoopbackAddress(), 9304))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,37 +64,6 @@ public void testValidateMinimumVersion() {
{
ClusterState state = mock(ClusterState.class);

final ModelPackageConfig packageConfigCurrent = new ModelPackageConfig.Builder(
ModelPackageConfigTests.randomModulePackageConfig()
).setMinimumVersion(MlConfigVersion.CURRENT.toString()).build();

DiscoveryNode node = DiscoveryNodeUtils.create(
"node1",
new TransportAddress(InetAddress.getLoopbackAddress(), 9300),
Version.V_8_7_0
);

DiscoveryNodes nodes = DiscoveryNodes.builder().add(node).build();

when(state.nodes()).thenReturn(nodes);

Exception e = expectThrows(
ActionRequestValidationException.class,
() -> TrainedModelValidator.validateMinimumVersion(packageConfigCurrent, state)
);

assertEquals(
"Validation Failed: 1: The model ["
+ packageConfigCurrent.getPackagedModelId()
+ "] requires that all nodes have ML config version ["
+ MlConfigVersion.CURRENT
+ "] or higher;",
e.getMessage()
);
}
{
ClusterState state = mock(ClusterState.class);

final ModelPackageConfig packageConfigBroken = new ModelPackageConfig.Builder(
ModelPackageConfigTests.randomModulePackageConfig()
).setMinimumVersion("_broken_version_").build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,19 +88,6 @@ public void testNodeNameAndVersionForRecentNode() {
assertEquals("{_node_name1}{ML config version=10.0.0}", JobNodeSelector.nodeNameAndVersion(node));
}

public void testNodeNameAndVersionForOldNode() {
TransportAddress ta = new TransportAddress(InetAddress.getLoopbackAddress(), 9300);
Map<String, String> attributes = Map.of("unrelated", "attribute");
DiscoveryNode node = DiscoveryNodeUtils.builder("_node_id2")
.name("_node_name2")
.address(ta)
.attributes(attributes)
.roles(ROLES_WITH_ML)
.version(VersionInformation.inferVersions(Version.V_8_7_0))
.build();
assertEquals("{_node_name2}{ML config version=8.7.0}", JobNodeSelector.nodeNameAndVersion(node));
}

public void testNodeNameAndMlAttributes() {
TransportAddress ta = new TransportAddress(InetAddress.getLoopbackAddress(), 9300);
SortedMap<String, String> attributes = new TreeMap<>();
Expand Down

0 comments on commit 25f5118

Please sign in to comment.