From 70754b38c2ad7e51c966960efa61e112d887d366 Mon Sep 17 00:00:00 2001 From: David Turner Date: Wed, 12 Jun 2024 07:26:27 +0100 Subject: [PATCH 1/2] Skip `BaseNodesRequest` header without instantiation As described in #100878 we sometimes send an unnecessary wrapped-up `BaseNodesRequest` to individual nodes for legacy reasons, and today we do that by calling its constructor and then discarding the resulting object. This commit introduces utilities for skipping and synthesizing the unnecessary data without involving `BaseNodesRequest` itself. --- .../hotthreads/NodesHotThreadsRequest.java | 13 +----- .../TransportNodesHotThreadsAction.java | 15 ++----- .../cluster/node/info/NodesInfoRequest.java | 19 +------- .../node/info/TransportNodesInfoAction.java | 14 ++---- .../cluster/node/stats/NodesStatsRequest.java | 13 +----- .../node/stats/TransportNodesStatsAction.java | 24 +++++------ .../cluster/node/usage/NodesUsageRequest.java | 15 +------ .../node/usage/TransportNodesUsageAction.java | 20 +++------ .../cluster/stats/ClusterStatsRequest.java | 12 +----- .../stats/TransportClusterStatsAction.java | 8 +--- .../support/nodes/BaseNodesRequest.java | 18 -------- .../support/nodes/TransportNodesAction.java | 43 +++++++++++++++++++ .../NodesHotThreadsRequestTests.java | 18 +++++--- .../node/stats/NodesStatsRequestTests.java | 4 +- .../action/TrainedModelCacheInfoAction.java | 10 +---- .../TrainedModelCacheInfoRequestTests.java | 39 ----------------- .../NodesDeprecationCheckAction.java | 11 +++-- .../NodesDeprecationCheckRequest.java | 14 ++---- .../NodesDeprecationCheckRequestTests.java | 21 ++++----- .../TransportTrainedModelCacheInfoAction.java | 8 +--- 20 files changed, 116 insertions(+), 223 deletions(-) rename server/src/test/java/org/elasticsearch/action/admin/cluster/{ => node}/hotthreads/NodesHotThreadsRequestTests.java (86%) delete mode 100644 x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/action/TrainedModelCacheInfoRequestTests.java diff --git a/server/src/main/java/org/elasticsearch/action/admin/cluster/node/hotthreads/NodesHotThreadsRequest.java b/server/src/main/java/org/elasticsearch/action/admin/cluster/node/hotthreads/NodesHotThreadsRequest.java index 55b5e7cf0a257..51a5ce265114d 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/cluster/node/hotthreads/NodesHotThreadsRequest.java +++ b/server/src/main/java/org/elasticsearch/action/admin/cluster/node/hotthreads/NodesHotThreadsRequest.java @@ -8,12 +8,11 @@ package org.elasticsearch.action.admin.cluster.node.hotthreads; +import org.elasticsearch.action.support.TransportAction; import org.elasticsearch.action.support.nodes.BaseNodesRequest; import org.elasticsearch.cluster.node.DiscoveryNode; -import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.core.TimeValue; -import org.elasticsearch.core.UpdateForV9; import org.elasticsearch.monitor.jvm.HotThreads; import java.io.IOException; @@ -22,12 +21,6 @@ public class NodesHotThreadsRequest extends BaseNodesRequest { private final NodesInfoMetrics nodesInfoMetrics; - /** - * Create a new NodeInfoRequest from a {@link StreamInput} object. - * - * @param in A stream input object. - * @throws IOException if the stream cannot be deserialized. - */ - @UpdateForV9 // this constructor is unused in v9 - public NodesInfoRequest(StreamInput in) throws IOException { - super(in); - nodesInfoMetrics = new NodesInfoMetrics(in); - } - /** * Get information from nodes based on the nodes ids specified. If none are passed, information * for all nodes will be returned. @@ -113,11 +100,9 @@ public NodesInfoRequest removeMetric(String metric) { return this; } - @UpdateForV9 // this method can just call localOnly() in v9 @Override public void writeTo(StreamOutput out) throws IOException { - super.writeTo(out); - nodesInfoMetrics.writeTo(out); + TransportAction.localOnly(); } public NodesInfoMetrics getNodesInfoMetrics() { diff --git a/server/src/main/java/org/elasticsearch/action/admin/cluster/node/info/TransportNodesInfoAction.java b/server/src/main/java/org/elasticsearch/action/admin/cluster/node/info/TransportNodesInfoAction.java index 826d74935f556..ce962fb454a88 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/cluster/node/info/TransportNodesInfoAction.java +++ b/server/src/main/java/org/elasticsearch/action/admin/cluster/node/info/TransportNodesInfoAction.java @@ -101,11 +101,8 @@ public static class NodeInfoRequest extends TransportRequest { public NodeInfoRequest(StreamInput in) throws IOException { super(in); - if (in.getTransportVersion().onOrAfter(V_8_11_X)) { - this.nodesInfoMetrics = new NodesInfoMetrics(in); - } else { - this.nodesInfoMetrics = new NodesInfoRequest(in).getNodesInfoMetrics(); - } + skipLegacyNodesRequestHeader(V_8_11_X, in); + this.nodesInfoMetrics = new NodesInfoMetrics(in); } public NodeInfoRequest(NodesInfoRequest request) { @@ -115,11 +112,8 @@ public NodeInfoRequest(NodesInfoRequest request) { @Override public void writeTo(StreamOutput out) throws IOException { super.writeTo(out); - if (out.getTransportVersion().onOrAfter(V_8_11_X)) { - this.nodesInfoMetrics.writeTo(out); - } else { - new NodesInfoRequest().clear().addMetrics(nodesInfoMetrics.requestedMetrics()).writeTo(out); - } + sendLegacyNodesRequestHeader(V_8_11_X, out); + nodesInfoMetrics.writeTo(out); } public Set requestedMetrics() { diff --git a/server/src/main/java/org/elasticsearch/action/admin/cluster/node/stats/NodesStatsRequest.java b/server/src/main/java/org/elasticsearch/action/admin/cluster/node/stats/NodesStatsRequest.java index ff88bc5fcf464..5f83c641a3f82 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/cluster/node/stats/NodesStatsRequest.java +++ b/server/src/main/java/org/elasticsearch/action/admin/cluster/node/stats/NodesStatsRequest.java @@ -9,11 +9,10 @@ package org.elasticsearch.action.admin.cluster.node.stats; import org.elasticsearch.action.admin.indices.stats.CommonStatsFlags; +import org.elasticsearch.action.support.TransportAction; import org.elasticsearch.action.support.nodes.BaseNodesRequest; import org.elasticsearch.common.Strings; -import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; -import org.elasticsearch.core.UpdateForV9; import org.elasticsearch.tasks.CancellableTask; import org.elasticsearch.tasks.Task; import org.elasticsearch.tasks.TaskId; @@ -37,12 +36,6 @@ public NodesStatsRequest() { nodesStatsRequestParameters = new NodesStatsRequestParameters(); } - @UpdateForV9 // this constructor is unused in v9 - public NodesStatsRequest(StreamInput in) throws IOException { - super(in); - nodesStatsRequestParameters = new NodesStatsRequestParameters(in); - } - /** * Get stats from nodes based on the nodes ids specified. If none are passed, stats * for all nodes will be returned. @@ -179,11 +172,9 @@ public void setIncludeShardsStats(boolean includeShardsStats) { nodesStatsRequestParameters.setIncludeShardsStats(includeShardsStats); } - @UpdateForV9 // this method can just call localOnly() in v9 @Override public void writeTo(StreamOutput out) throws IOException { - super.writeTo(out); - nodesStatsRequestParameters.writeTo(out); + TransportAction.localOnly(); } public NodesStatsRequestParameters getNodesStatsRequestParameters() { diff --git a/server/src/main/java/org/elasticsearch/action/admin/cluster/node/stats/TransportNodesStatsAction.java b/server/src/main/java/org/elasticsearch/action/admin/cluster/node/stats/TransportNodesStatsAction.java index 8147da415b7e6..1b7ce13333891 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/cluster/node/stats/TransportNodesStatsAction.java +++ b/server/src/main/java/org/elasticsearch/action/admin/cluster/node/stats/TransportNodesStatsAction.java @@ -161,13 +161,11 @@ public static class NodeStatsRequest extends TransportRequest { public NodeStatsRequest(StreamInput in) throws IOException { super(in); - if (in.getTransportVersion().before(TransportVersions.V_8_13_0)) { - this.nodesStatsRequestParameters = new NodesStatsRequest(in).getNodesStatsRequestParameters(); - } else { - this.nodesStatsRequestParameters = new NodesStatsRequestParameters(in); - if (in.getTransportVersion().before(TransportVersions.DROP_UNUSED_NODES_IDS)) { - in.readStringArray(); // formerly nodeIds, now unused - } + skipLegacyNodesRequestHeader(TransportVersions.V_8_13_0, in); + this.nodesStatsRequestParameters = new NodesStatsRequestParameters(in); + if (in.getTransportVersion().onOrAfter(TransportVersions.V_8_13_0) + && in.getTransportVersion().before(TransportVersions.DROP_UNUSED_NODES_IDS)) { + in.readStringArray(); // formerly nodeIds, now unused } } @@ -192,13 +190,11 @@ public String getDescription() { @Override public void writeTo(StreamOutput out) throws IOException { super.writeTo(out); - if (out.getTransportVersion().before(TransportVersions.V_8_13_0)) { - new NodesStatsRequest(nodesStatsRequestParameters, Strings.EMPTY_ARRAY).writeTo(out); - } else { - nodesStatsRequestParameters.writeTo(out); - if (out.getTransportVersion().before(TransportVersions.DROP_UNUSED_NODES_IDS)) { - out.writeStringArray(Strings.EMPTY_ARRAY); // formerly nodeIds, now unused - } + sendLegacyNodesRequestHeader(TransportVersions.V_8_13_0, out); + nodesStatsRequestParameters.writeTo(out); + if (out.getTransportVersion().onOrAfter(TransportVersions.V_8_13_0) + && out.getTransportVersion().before(TransportVersions.DROP_UNUSED_NODES_IDS)) { + out.writeStringArray(Strings.EMPTY_ARRAY); // formerly nodeIds, now unused } } diff --git a/server/src/main/java/org/elasticsearch/action/admin/cluster/node/usage/NodesUsageRequest.java b/server/src/main/java/org/elasticsearch/action/admin/cluster/node/usage/NodesUsageRequest.java index 232d32047274e..5fcea26b1b286 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/cluster/node/usage/NodesUsageRequest.java +++ b/server/src/main/java/org/elasticsearch/action/admin/cluster/node/usage/NodesUsageRequest.java @@ -8,10 +8,9 @@ package org.elasticsearch.action.admin.cluster.node.usage; +import org.elasticsearch.action.support.TransportAction; import org.elasticsearch.action.support.nodes.BaseNodesRequest; -import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; -import org.elasticsearch.core.UpdateForV9; import java.io.IOException; @@ -20,13 +19,6 @@ public class NodesUsageRequest extends BaseNodesRequest { private boolean restActions; private boolean aggregations; - @UpdateForV9 // will be unused in v9 - public NodesUsageRequest(StreamInput in) throws IOException { - super(in); - this.restActions = in.readBoolean(); - this.aggregations = in.readBoolean(); - } - /** * Get usage from nodes based on the nodes ids specified. If none are * passed, usage for all nodes will be returned. @@ -82,11 +74,8 @@ public NodesUsageRequest aggregations(boolean aggregations) { return this; } - @UpdateForV9 // can become localOnly() in v9 @Override public void writeTo(StreamOutput out) throws IOException { - super.writeTo(out); - out.writeBoolean(restActions); - out.writeBoolean(aggregations); + TransportAction.localOnly(); } } diff --git a/server/src/main/java/org/elasticsearch/action/admin/cluster/node/usage/TransportNodesUsageAction.java b/server/src/main/java/org/elasticsearch/action/admin/cluster/node/usage/TransportNodesUsageAction.java index 3e8b519bf0381..72bbe2683d157 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/cluster/node/usage/TransportNodesUsageAction.java +++ b/server/src/main/java/org/elasticsearch/action/admin/cluster/node/usage/TransportNodesUsageAction.java @@ -91,14 +91,9 @@ public static class NodeUsageRequest extends TransportRequest { public NodeUsageRequest(StreamInput in) throws IOException { super(in); - if (in.getTransportVersion().onOrAfter(TransportVersions.MORE_LIGHTER_NODES_REQUESTS)) { - restActions = in.readBoolean(); - aggregations = in.readBoolean(); - } else { - final var request = new NodesUsageRequest(in); - restActions = request.restActions(); - aggregations = request.aggregations(); - } + skipLegacyNodesRequestHeader(TransportVersions.MORE_LIGHTER_NODES_REQUESTS, in); + restActions = in.readBoolean(); + aggregations = in.readBoolean(); } NodeUsageRequest(NodesUsageRequest request) { @@ -109,12 +104,9 @@ public NodeUsageRequest(StreamInput in) throws IOException { @Override public void writeTo(StreamOutput out) throws IOException { super.writeTo(out); - if (out.getTransportVersion().onOrAfter(TransportVersions.MORE_LIGHTER_NODES_REQUESTS)) { - out.writeBoolean(restActions); - out.writeBoolean(aggregations); - } else { - new NodesUsageRequest().restActions(restActions).aggregations(aggregations).writeTo(out); - } + sendLegacyNodesRequestHeader(TransportVersions.MORE_LIGHTER_NODES_REQUESTS, out); + out.writeBoolean(restActions); + out.writeBoolean(aggregations); } } } diff --git a/server/src/main/java/org/elasticsearch/action/admin/cluster/stats/ClusterStatsRequest.java b/server/src/main/java/org/elasticsearch/action/admin/cluster/stats/ClusterStatsRequest.java index bba669e07a70c..377e931604548 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/cluster/stats/ClusterStatsRequest.java +++ b/server/src/main/java/org/elasticsearch/action/admin/cluster/stats/ClusterStatsRequest.java @@ -8,10 +8,9 @@ package org.elasticsearch.action.admin.cluster.stats; +import org.elasticsearch.action.support.TransportAction; import org.elasticsearch.action.support.nodes.BaseNodesRequest; -import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; -import org.elasticsearch.core.UpdateForV9; import org.elasticsearch.tasks.CancellableTask; import org.elasticsearch.tasks.Task; import org.elasticsearch.tasks.TaskId; @@ -23,12 +22,6 @@ * A request to get cluster level stats. */ public class ClusterStatsRequest extends BaseNodesRequest { - - @UpdateForV9 // this constructor is unused in v9 - public ClusterStatsRequest(StreamInput in) throws IOException { - super(in); - } - /** * Get stats from nodes based on the nodes ids specified. If none are passed, stats * based on all nodes will be returned. @@ -42,10 +35,9 @@ public Task createTask(long id, String type, String action, TaskId parentTaskId, return new CancellableTask(id, type, action, "", parentTaskId, headers); } - @UpdateForV9 // this method can just call localOnly() in v9 @Override public void writeTo(StreamOutput out) throws IOException { - super.writeTo(out); + TransportAction.localOnly(); } } diff --git a/server/src/main/java/org/elasticsearch/action/admin/cluster/stats/TransportClusterStatsAction.java b/server/src/main/java/org/elasticsearch/action/admin/cluster/stats/TransportClusterStatsAction.java index 2a8fecde7ee9e..2c530d8dbbe55 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/cluster/stats/TransportClusterStatsAction.java +++ b/server/src/main/java/org/elasticsearch/action/admin/cluster/stats/TransportClusterStatsAction.java @@ -260,9 +260,7 @@ public static class ClusterStatsNodeRequest extends TransportRequest { public ClusterStatsNodeRequest(StreamInput in) throws IOException { super(in); - if (in.getTransportVersion().before(TransportVersions.DROP_UNUSED_NODES_REQUESTS)) { - new ClusterStatsRequest(in); - } + skipLegacyNodesRequestHeader(TransportVersions.DROP_UNUSED_NODES_REQUESTS, in); } @Override @@ -273,9 +271,7 @@ public Task createTask(long id, String type, String action, TaskId parentTaskId, @Override public void writeTo(StreamOutput out) throws IOException { super.writeTo(out); - if (out.getTransportVersion().before(TransportVersions.DROP_UNUSED_NODES_REQUESTS)) { - new ClusterStatsRequest().writeTo(out); - } + sendLegacyNodesRequestHeader(TransportVersions.DROP_UNUSED_NODES_REQUESTS, out); } } diff --git a/server/src/main/java/org/elasticsearch/action/support/nodes/BaseNodesRequest.java b/server/src/main/java/org/elasticsearch/action/support/nodes/BaseNodesRequest.java index 626cdb8046f53..1461be729dd29 100644 --- a/server/src/main/java/org/elasticsearch/action/support/nodes/BaseNodesRequest.java +++ b/server/src/main/java/org/elasticsearch/action/support/nodes/BaseNodesRequest.java @@ -12,7 +12,6 @@ import org.elasticsearch.action.ActionRequestValidationException; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNodes; -import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.core.TimeValue; @@ -39,23 +38,6 @@ public abstract class BaseNodesRequest private TimeValue timeout; - /** - * @deprecated {@link BaseNodesRequest} derivatives are quite heavyweight and should never need sending over the wire. Do not include - * the full top-level request directly in the node-level requests. Instead, copy the needed fields over to a dedicated node-level - * request. - * - * @see #100878 - */ - @Deprecated(forRemoval = true) - protected BaseNodesRequest(StreamInput in) throws IOException { - // A bare `BaseNodesRequest` is never sent over the wire, but several implementations send the full top-level request to each node - // (wrapped up in another request). They shouldn't, but until we fix that we must keep this. See #100878. - super(in); - nodesIds = in.readStringArray(); - concreteNodes = in.readOptionalArray(DiscoveryNode::new, DiscoveryNode[]::new); - timeout = in.readOptionalTimeValue(); - } - protected BaseNodesRequest(String... nodesIds) { this.nodesIds = nodesIds; } diff --git a/server/src/main/java/org/elasticsearch/action/support/nodes/TransportNodesAction.java b/server/src/main/java/org/elasticsearch/action/support/nodes/TransportNodesAction.java index daf3334dcaf65..1e26ba33cf0db 100644 --- a/server/src/main/java/org/elasticsearch/action/support/nodes/TransportNodesAction.java +++ b/server/src/main/java/org/elasticsearch/action/support/nodes/TransportNodesAction.java @@ -10,6 +10,7 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import org.elasticsearch.TransportVersion; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionListenerResponseHandler; import org.elasticsearch.action.FailedNodeException; @@ -21,14 +22,18 @@ import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.Strings; import org.elasticsearch.common.collect.Iterators; import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.Writeable; import org.elasticsearch.common.util.concurrent.EsExecutors; import org.elasticsearch.core.CheckedConsumer; import org.elasticsearch.core.Releasables; +import org.elasticsearch.core.UpdateForV9; import org.elasticsearch.tasks.CancellableTask; import org.elasticsearch.tasks.Task; +import org.elasticsearch.tasks.TaskId; import org.elasticsearch.transport.TransportChannel; import org.elasticsearch.transport.TransportRequest; import org.elasticsearch.transport.TransportRequestHandler; @@ -251,4 +256,42 @@ public void messageReceived(NodeRequest request, TransportChannel channel, Task } } + /** + * Some {@link TransportNodesAction} implementations send the whole top-level request out to each individual node. However, the + * top-level request contains a lot of unnecessary junk, particularly the heavyweight {@link DiscoveryNode} instances, so we are + * migrating away from this practice. This method allows to skip over the unnecessary data received from an older node. + * + * @see #100878 + * @param fixVersion The {@link TransportVersion} in which the request representation was fixed, so no skipping is needed. + * @param in The {@link StreamInput} in which to skip the unneeded data. + */ + @UpdateForV9 // no longer necessary in v9 + public static void skipLegacyNodesRequestHeader(TransportVersion fixVersion, StreamInput in) throws IOException { + if (in.getTransportVersion().before(fixVersion)) { + TaskId.readFromStream(in); + in.readStringArray(); + in.readOptionalArray(DiscoveryNode::new, DiscoveryNode[]::new); + in.readOptionalTimeValue(); + } + } + + /** + * Some {@link TransportNodesAction} implementations send the whole top-level request out to each individual node. However, the + * top-level request contains a lot of unnecessary junk, particularly the heavyweight {@link DiscoveryNode} instances, so we are + * migrating away from this practice. This method allows to send a well-formed, but empty, header to older nodes that require it. + * + * @see #100878 + * @param fixVersion The {@link TransportVersion} in which the request representation was fixed, so no skipping is needed. + * @param out The {@link StreamOutput} to which to send the dummy data. + */ + @UpdateForV9 // no longer necessary in v9 + public static void sendLegacyNodesRequestHeader(TransportVersion fixVersion, StreamOutput out) throws IOException { + if (out.getTransportVersion().before(fixVersion)) { + TaskId.EMPTY_TASK_ID.writeTo(out); + out.writeStringArray(Strings.EMPTY_ARRAY); + out.writeOptionalArray(null); + out.writeOptionalTimeValue(null); + } + } + } diff --git a/server/src/test/java/org/elasticsearch/action/admin/cluster/hotthreads/NodesHotThreadsRequestTests.java b/server/src/test/java/org/elasticsearch/action/admin/cluster/node/hotthreads/NodesHotThreadsRequestTests.java similarity index 86% rename from server/src/test/java/org/elasticsearch/action/admin/cluster/hotthreads/NodesHotThreadsRequestTests.java rename to server/src/test/java/org/elasticsearch/action/admin/cluster/node/hotthreads/NodesHotThreadsRequestTests.java index bbfdd05879e36..4b02ddf5b4b94 100644 --- a/server/src/test/java/org/elasticsearch/action/admin/cluster/hotthreads/NodesHotThreadsRequestTests.java +++ b/server/src/test/java/org/elasticsearch/action/admin/cluster/node/hotthreads/NodesHotThreadsRequestTests.java @@ -6,10 +6,10 @@ * Side Public License, v 1. */ -package org.elasticsearch.action.admin.cluster.hotthreads; +package org.elasticsearch.action.admin.cluster.node.hotthreads; import org.elasticsearch.TransportVersion; -import org.elasticsearch.action.admin.cluster.node.hotthreads.NodesHotThreadsRequest; +import org.elasticsearch.common.Strings; import org.elasticsearch.common.io.stream.BytesStreamOutput; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.core.TimeValue; @@ -46,10 +46,13 @@ public void testBWCSerialization() throws IOException { try (BytesStreamOutput out = new BytesStreamOutput()) { out.setTransportVersion(latest); - request.writeTo(out); + request.requestOptions.writeTo(out); try (StreamInput in = out.bytes().streamInput()) { in.setTransportVersion(previous); - NodesHotThreadsRequest deserialized = new NodesHotThreadsRequest(in); + NodesHotThreadsRequest deserialized = new NodesHotThreadsRequest( + Strings.EMPTY_ARRAY, + HotThreads.RequestOptions.readFrom(in) + ); assertEquals(request.threads(), deserialized.threads()); assertEquals(request.ignoreIdleThreads(), deserialized.ignoreIdleThreads()); assertEquals(request.type(), deserialized.type()); @@ -61,10 +64,13 @@ public void testBWCSerialization() throws IOException { try (BytesStreamOutput out = new BytesStreamOutput()) { out.setTransportVersion(previous); - request.writeTo(out); + request.requestOptions.writeTo(out); try (StreamInput in = out.bytes().streamInput()) { in.setTransportVersion(previous); - NodesHotThreadsRequest deserialized = new NodesHotThreadsRequest(in); + NodesHotThreadsRequest deserialized = new NodesHotThreadsRequest( + Strings.EMPTY_ARRAY, + HotThreads.RequestOptions.readFrom(in) + ); assertEquals(request.threads(), deserialized.threads()); assertEquals(request.ignoreIdleThreads(), deserialized.ignoreIdleThreads()); assertEquals(request.type(), deserialized.type()); diff --git a/server/src/test/java/org/elasticsearch/action/admin/cluster/node/stats/NodesStatsRequestTests.java b/server/src/test/java/org/elasticsearch/action/admin/cluster/node/stats/NodesStatsRequestTests.java index ad5f1e5034dd6..000f99b270df2 100644 --- a/server/src/test/java/org/elasticsearch/action/admin/cluster/node/stats/NodesStatsRequestTests.java +++ b/server/src/test/java/org/elasticsearch/action/admin/cluster/node/stats/NodesStatsRequestTests.java @@ -130,9 +130,9 @@ public void testUnknownMetricsRejected() { */ private static NodesStatsRequest roundTripRequest(NodesStatsRequest request) throws Exception { try (BytesStreamOutput out = new BytesStreamOutput()) { - request.writeTo(out); + request.getNodesStatsRequestParameters().writeTo(out); try (StreamInput in = out.bytes().streamInput()) { - return new NodesStatsRequest(in); + return new NodesStatsRequest(new NodesStatsRequestParameters(in), request.nodesIds()); } } } diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/TrainedModelCacheInfoAction.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/TrainedModelCacheInfoAction.java index 2cbf577790b37..8ee928ba641de 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/TrainedModelCacheInfoAction.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/TrainedModelCacheInfoAction.java @@ -9,6 +9,7 @@ import org.elasticsearch.action.ActionType; import org.elasticsearch.action.FailedNodeException; +import org.elasticsearch.action.support.TransportAction; import org.elasticsearch.action.support.nodes.BaseNodeResponse; import org.elasticsearch.action.support.nodes.BaseNodesRequest; import org.elasticsearch.action.support.nodes.BaseNodesResponse; @@ -18,7 +19,6 @@ import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.Writeable; import org.elasticsearch.common.unit.ByteSizeValue; -import org.elasticsearch.core.UpdateForV9; import java.io.IOException; import java.util.Arrays; @@ -40,15 +40,9 @@ public Request(DiscoveryNode... concreteNodes) { super(concreteNodes); } - @UpdateForV9 // this constructor is unused in v9 - public Request(StreamInput in) throws IOException { - super(in); - } - - @UpdateForV9 // this method can just call localOnly() in v9 @Override public void writeTo(StreamOutput out) throws IOException { - super.writeTo(out); + TransportAction.localOnly(); } @Override diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/action/TrainedModelCacheInfoRequestTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/action/TrainedModelCacheInfoRequestTests.java deleted file mode 100644 index 8620b8d77755c..0000000000000 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/action/TrainedModelCacheInfoRequestTests.java +++ /dev/null @@ -1,39 +0,0 @@ -/* - * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one - * or more contributor license agreements. Licensed under the Elastic License - * 2.0; you may not use this file except in compliance with the Elastic License - * 2.0. - */ - -package org.elasticsearch.xpack.core.ml.action; - -import org.elasticsearch.cluster.node.DiscoveryNode; -import org.elasticsearch.cluster.node.DiscoveryNodeUtils; -import org.elasticsearch.common.io.stream.Writeable; -import org.elasticsearch.common.transport.TransportAddress; -import org.elasticsearch.test.AbstractWireSerializingTestCase; - -import java.net.InetAddress; - -public class TrainedModelCacheInfoRequestTests extends AbstractWireSerializingTestCase { - - @Override - protected Writeable.Reader instanceReader() { - return TrainedModelCacheInfoAction.Request::new; - } - - @Override - protected TrainedModelCacheInfoAction.Request createTestInstance() { - int numNodes = randomIntBetween(1, 20); - DiscoveryNode[] nodes = new DiscoveryNode[numNodes]; - for (int i = 0; i < numNodes; ++i) { - nodes[i] = DiscoveryNodeUtils.create(randomAlphaOfLength(20), new TransportAddress(InetAddress.getLoopbackAddress(), 9200 + i)); - } - return new TrainedModelCacheInfoAction.Request(nodes); - } - - @Override - protected TrainedModelCacheInfoAction.Request mutateInstance(TrainedModelCacheInfoAction.Request instance) { - return null;// TODO implement https://github.com/elastic/elasticsearch/issues/25929 - } -} diff --git a/x-pack/plugin/deprecation/src/main/java/org/elasticsearch/xpack/deprecation/NodesDeprecationCheckAction.java b/x-pack/plugin/deprecation/src/main/java/org/elasticsearch/xpack/deprecation/NodesDeprecationCheckAction.java index c867ef671811c..1d9fb86998b9b 100644 --- a/x-pack/plugin/deprecation/src/main/java/org/elasticsearch/xpack/deprecation/NodesDeprecationCheckAction.java +++ b/x-pack/plugin/deprecation/src/main/java/org/elasticsearch/xpack/deprecation/NodesDeprecationCheckAction.java @@ -21,6 +21,9 @@ import java.util.List; import java.util.Objects; +import static org.elasticsearch.action.support.nodes.TransportNodesAction.sendLegacyNodesRequestHeader; +import static org.elasticsearch.action.support.nodes.TransportNodesAction.skipLegacyNodesRequestHeader; + /** * Runs deprecation checks on each node. Deprecation checks are performed locally so that filtered settings * can be accessed in the deprecation checks. @@ -40,17 +43,13 @@ public NodeRequest() {} public NodeRequest(StreamInput in) throws IOException { super(in); - if (in.getTransportVersion().before(TransportVersions.DROP_UNUSED_NODES_REQUESTS)) { - new NodesDeprecationCheckRequest(in); - } + skipLegacyNodesRequestHeader(TransportVersions.DROP_UNUSED_NODES_REQUESTS, in); } @Override public void writeTo(StreamOutput out) throws IOException { super.writeTo(out); - if (out.getTransportVersion().before(TransportVersions.DROP_UNUSED_NODES_REQUESTS)) { - new NodesDeprecationCheckRequest().writeTo(out); - } + sendLegacyNodesRequestHeader(TransportVersions.DROP_UNUSED_NODES_REQUESTS, out); } } diff --git a/x-pack/plugin/deprecation/src/main/java/org/elasticsearch/xpack/deprecation/NodesDeprecationCheckRequest.java b/x-pack/plugin/deprecation/src/main/java/org/elasticsearch/xpack/deprecation/NodesDeprecationCheckRequest.java index 2e5f77ee52778..63966985ece6f 100644 --- a/x-pack/plugin/deprecation/src/main/java/org/elasticsearch/xpack/deprecation/NodesDeprecationCheckRequest.java +++ b/x-pack/plugin/deprecation/src/main/java/org/elasticsearch/xpack/deprecation/NodesDeprecationCheckRequest.java @@ -7,30 +7,22 @@ package org.elasticsearch.xpack.deprecation; +import org.elasticsearch.action.support.TransportAction; import org.elasticsearch.action.support.nodes.BaseNodesRequest; -import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; -import org.elasticsearch.core.UpdateForV9; -import java.io.IOException; import java.util.Arrays; import java.util.Objects; public class NodesDeprecationCheckRequest extends BaseNodesRequest { - @UpdateForV9 // this constructor is unused in v9 - public NodesDeprecationCheckRequest(StreamInput in) throws IOException { - super(in); - } - public NodesDeprecationCheckRequest(String... nodesIds) { super(nodesIds); } - @UpdateForV9 // this method can just call localOnly() in v9 @Override - public void writeTo(StreamOutput out) throws IOException { - super.writeTo(out); + public void writeTo(StreamOutput out) { + TransportAction.localOnly(); } @Override diff --git a/x-pack/plugin/deprecation/src/test/java/org/elasticsearch/xpack/deprecation/NodesDeprecationCheckRequestTests.java b/x-pack/plugin/deprecation/src/test/java/org/elasticsearch/xpack/deprecation/NodesDeprecationCheckRequestTests.java index a9e6e80df5040..24e2e9f76e125 100644 --- a/x-pack/plugin/deprecation/src/test/java/org/elasticsearch/xpack/deprecation/NodesDeprecationCheckRequestTests.java +++ b/x-pack/plugin/deprecation/src/test/java/org/elasticsearch/xpack/deprecation/NodesDeprecationCheckRequestTests.java @@ -7,25 +7,26 @@ package org.elasticsearch.xpack.deprecation; -import org.elasticsearch.common.io.stream.Writeable; -import org.elasticsearch.test.AbstractWireSerializingTestCase; +import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.test.EqualsHashCodeTestUtils; -public class NodesDeprecationCheckRequestTests extends AbstractWireSerializingTestCase { +public class NodesDeprecationCheckRequestTests extends ESTestCase { - @Override - protected Writeable.Reader instanceReader() { - return NodesDeprecationCheckRequest::new; + public void testEqualsAndHashCode() { + EqualsHashCodeTestUtils.checkEqualsAndHashCode( + createTestInstance(), + i -> new NodesDeprecationCheckRequest(i.nodesIds()), + this::mutateInstance + ); } - @Override - protected NodesDeprecationCheckRequest mutateInstance(NodesDeprecationCheckRequest instance) { + private NodesDeprecationCheckRequest mutateInstance(NodesDeprecationCheckRequest instance) { int newSize = randomValueOtherThan(instance.nodesIds().length, () -> randomIntBetween(0, 10)); String[] newNodeIds = randomArray(newSize, newSize, String[]::new, () -> randomAlphaOfLengthBetween(5, 10)); return new NodesDeprecationCheckRequest(newNodeIds); } - @Override - protected NodesDeprecationCheckRequest createTestInstance() { + private NodesDeprecationCheckRequest createTestInstance() { return new NodesDeprecationCheckRequest(randomArray(0, 10, String[]::new, () -> randomAlphaOfLengthBetween(5, 10))); } } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportTrainedModelCacheInfoAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportTrainedModelCacheInfoAction.java index f2c2b6de0e19d..0dda155043556 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportTrainedModelCacheInfoAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportTrainedModelCacheInfoAction.java @@ -94,9 +94,7 @@ public static class NodeModelCacheInfoRequest extends TransportRequest { public NodeModelCacheInfoRequest(StreamInput in) throws IOException { super(in); - if (in.getTransportVersion().before(TransportVersions.DROP_UNUSED_NODES_REQUESTS)) { - new TrainedModelCacheInfoAction.Request(in); - } + skipLegacyNodesRequestHeader(TransportVersions.DROP_UNUSED_NODES_REQUESTS, in); } @Override @@ -107,9 +105,7 @@ public Task createTask(long id, String type, String action, TaskId parentTaskId, @Override public void writeTo(StreamOutput out) throws IOException { super.writeTo(out); - if (out.getTransportVersion().before(TransportVersions.DROP_UNUSED_NODES_REQUESTS)) { - new TrainedModelCacheInfoAction.Request().writeTo(out); - } + sendLegacyNodesRequestHeader(TransportVersions.DROP_UNUSED_NODES_REQUESTS, out); } } } From 9f88c128ad0e4d96a4e726c203b4ea6477f5bdfc Mon Sep 17 00:00:00 2001 From: David Turner Date: Thu, 13 Jun 2024 16:59:50 +0100 Subject: [PATCH 2/2] Fix TransportLoggerTests --- .../transport/TransportLoggerTests.java | 12 +++++------- 1 file changed, 5 insertions(+), 7 deletions(-) diff --git a/server/src/test/java/org/elasticsearch/transport/TransportLoggerTests.java b/server/src/test/java/org/elasticsearch/transport/TransportLoggerTests.java index 679432f8b60a0..18ec4851707b1 100644 --- a/server/src/test/java/org/elasticsearch/transport/TransportLoggerTests.java +++ b/server/src/test/java/org/elasticsearch/transport/TransportLoggerTests.java @@ -9,8 +9,6 @@ import org.apache.logging.log4j.Level; import org.elasticsearch.TransportVersion; -import org.elasticsearch.action.admin.cluster.stats.ClusterStatsRequest; -import org.elasticsearch.action.admin.cluster.stats.TransportClusterStatsAction; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.io.stream.RecyclerBytesStreamOutput; import org.elasticsearch.common.settings.Settings; @@ -33,7 +31,7 @@ public void testLoggingHandler() throws IOException { + ", type: request" + ", version: .*" + ", header size: \\d+B" - + ", action: cluster:monitor/stats]" + + ", action: internal:test]" + " WRITE: \\d+B"; final MockLog.LoggingExpectation writeExpectation = new MockLog.PatternSeenEventExpectation( "hot threads request", @@ -47,11 +45,11 @@ public void testLoggingHandler() throws IOException { + ", type: request" + ", version: .*" + ", header size: \\d+B" - + ", action: cluster:monitor/stats]" + + ", action: internal:test]" + " READ: \\d+B"; final MockLog.LoggingExpectation readExpectation = new MockLog.PatternSeenEventExpectation( - "cluster monitor request", + "cluster state request", TransportLogger.class.getCanonicalName(), Level.TRACE, readPattern @@ -73,9 +71,9 @@ private BytesReference buildRequest() throws IOException { try (RecyclerBytesStreamOutput bytesStreamOutput = new RecyclerBytesStreamOutput(recycler)) { OutboundMessage.Request request = new OutboundMessage.Request( new ThreadContext(Settings.EMPTY), - new ClusterStatsRequest(), + new TransportRequest.Empty(), TransportVersion.current(), - TransportClusterStatsAction.TYPE.name(), + "internal:test", randomInt(30), false, compress