diff --git a/docs/reference/inference/get-inference.asciidoc b/docs/reference/inference/get-inference.asciidoc index 7e32bd05b5f56..bbf1d59c56213 100644 --- a/docs/reference/inference/get-inference.asciidoc +++ b/docs/reference/inference/get-inference.asciidoc @@ -24,10 +24,10 @@ Retrieves {infer} model information. [[get-inference-api-desc]] ==== {api-description-title} -You can get information in a single API request for: +You can get information in a single API request for: * a single {infer} model by providing the task type and the model ID, -* all of the {infer} models for a certain task type by providing the task type +* all of the {infer} models for a certain task type by providing the task type and a wildcard expression, * all of the {infer} models by using a wildcard expression. @@ -50,7 +50,7 @@ The type of {infer} task that the model performs. [[get-inference-api-example]] ==== {api-examples-title} -The following API call retrives information about the `my-elser-model` {infer} +The following API call retrives information about the `my-elser-model` {infer} model that can perform `sparse_embedding` tasks. @@ -68,7 +68,7 @@ The API returns the following response: { "model_id": "my-elser-model", "task_type": "sparse_embedding", - "service": "elser_mlnode", + "service": "elser", "service_settings": { "num_allocations": 1, "num_threads": 1 @@ -76,4 +76,4 @@ The API returns the following response: "task_settings": {} } ------------------------------------------------------------ -// NOTCONSOLE \ No newline at end of file +// NOTCONSOLE diff --git a/docs/reference/inference/put-inference.asciidoc b/docs/reference/inference/put-inference.asciidoc index c5ccd6a57a8dd..f4737875971c7 100644 --- a/docs/reference/inference/put-inference.asciidoc +++ b/docs/reference/inference/put-inference.asciidoc @@ -22,8 +22,8 @@ Creates a model to perform an {infer} task. [[put-inference-api-desc]] ==== {api-description-title} -The create {infer} API enables you to create and configure an {infer} model to -perform a specific {infer} task. +The create {infer} API enables you to create and configure an {infer} model to +perform a specific {infer} task. [discrete] @@ -50,17 +50,16 @@ The type of the {infer} task that the model will perform. Available task types: (Required, string) The type of service supported for the specified task type. Available services: -* `elser`, -* `elser_mlnode`. +* `elser` `service_settings`:: (Required, object) -Settings used to install the {infer} model. These settings are specific to the +Settings used to install the {infer} model. These settings are specific to the `service` you specified. `task_settings`:: (Optional, object) -Settings to configure the {infer} task. These settings are specific to the +Settings to configure the {infer} task. These settings are specific to the `` you specified. @@ -68,14 +67,14 @@ Settings to configure the {infer} task. These settings are specific to the [[put-inference-api-example]] ==== {api-examples-title} -The following example shows how to create an {infer} model called +The following example shows how to create an {infer} model called `my-elser-model` to perform a `sparse_embedding` task type. [source,console] ------------------------------------------------------------ PUT _inference/sparse_embedding/my-elser-model { - "service": "elser_mlnode", + "service": "elser", "service_settings": { "num_allocations": 1, "num_threads": 1 @@ -93,7 +92,7 @@ Example response: { "model_id": "my-elser-model", "task_type": "sparse_embedding", - "service": "elser_mlnode", + "service": "elser", "service_settings": { "num_allocations": 1, "num_threads": 1 diff --git a/modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/stats/GeoIpDownloaderStatsAction.java b/modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/stats/GeoIpDownloaderStatsAction.java index 71995864b0233..f1f2923ccd053 100644 --- a/modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/stats/GeoIpDownloaderStatsAction.java +++ b/modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/stats/GeoIpDownloaderStatsAction.java @@ -11,6 +11,7 @@ import org.elasticsearch.TransportVersions; import org.elasticsearch.action.ActionType; import org.elasticsearch.action.FailedNodeException; +import org.elasticsearch.action.support.TransportAction; import org.elasticsearch.action.support.nodes.BaseNodeResponse; import org.elasticsearch.action.support.nodes.BaseNodesRequest; import org.elasticsearch.action.support.nodes.BaseNodesResponse; @@ -44,10 +45,6 @@ public Request() { super((String[]) null); } - public Request(StreamInput in) throws IOException { - super(in); - } - @Override public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { builder.startObject(); @@ -71,6 +68,11 @@ public boolean equals(Object obj) { } return true; } + + @Override + public void writeTo(StreamOutput out) { + TransportAction.localOnly(); + } } public static class NodeRequest extends TransportRequest { @@ -78,9 +80,7 @@ public NodeRequest(StreamInput in) throws IOException { super(in); } - public NodeRequest(Request request) { - - } + public NodeRequest() {} } public static class Response extends BaseNodesResponse implements Writeable, ToXContentObject { diff --git a/modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/stats/GeoIpDownloaderStatsTransportAction.java b/modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/stats/GeoIpDownloaderStatsTransportAction.java index 57dea7b243d48..d0dcfb1ca966c 100644 --- a/modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/stats/GeoIpDownloaderStatsTransportAction.java +++ b/modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/stats/GeoIpDownloaderStatsTransportAction.java @@ -46,11 +46,9 @@ public GeoIpDownloaderStatsTransportAction( ) { super( GeoIpDownloaderStatsAction.NAME, - threadPool, clusterService, transportService, actionFilters, - Request::new, NodeRequest::new, threadPool.executor(ThreadPool.Names.MANAGEMENT) ); @@ -66,7 +64,7 @@ protected Response newResponse(Request request, List nodeResponses @Override protected NodeRequest newNodeRequest(Request request) { - return new NodeRequest(request); + return new NodeRequest(); } @Override diff --git a/rest-api-spec/src/yamlRestTest/resources/rest-api-spec/test/search/140_pre_filter_search_shards.yml b/rest-api-spec/src/yamlRestTest/resources/rest-api-spec/test/search/140_pre_filter_search_shards.yml index ac8194cdff7dd..a778fceee9476 100644 --- a/rest-api-spec/src/yamlRestTest/resources/rest-api-spec/test/search/140_pre_filter_search_shards.yml +++ b/rest-api-spec/src/yamlRestTest/resources/rest-api-spec/test/search/140_pre_filter_search_shards.yml @@ -57,6 +57,9 @@ setup: --- "pre_filter_shard_size with shards that have no hit": + - skip: + version: all + reason: "AwaitsFix https://github.com/elastic/elasticsearch/issues/92058" - do: index: index: index_1 diff --git a/server/src/internalClusterTest/java/org/elasticsearch/snapshots/ConcurrentSnapshotsIT.java b/server/src/internalClusterTest/java/org/elasticsearch/snapshots/ConcurrentSnapshotsIT.java index 6ebddcf879221..96a2fe69732df 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/snapshots/ConcurrentSnapshotsIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/snapshots/ConcurrentSnapshotsIT.java @@ -1066,6 +1066,7 @@ public void testEquivalentDeletesAreDeduplicated() throws Exception { } } + @AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/99355") public void testMasterFailoverOnFinalizationLoop() throws Exception { internalCluster().startMasterOnlyNodes(3); final String dataNode = internalCluster().startDataOnlyNode(); diff --git a/server/src/main/java/org/elasticsearch/action/admin/cluster/node/hotthreads/NodesHotThreadsAction.java b/server/src/main/java/org/elasticsearch/action/admin/cluster/node/hotthreads/NodesHotThreadsAction.java index 4909785c11b5f..d683e6a042939 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/cluster/node/hotthreads/NodesHotThreadsAction.java +++ b/server/src/main/java/org/elasticsearch/action/admin/cluster/node/hotthreads/NodesHotThreadsAction.java @@ -9,6 +9,7 @@ package org.elasticsearch.action.admin.cluster.node.hotthreads; import org.elasticsearch.action.ActionType; +import org.elasticsearch.common.io.stream.Writeable; public class NodesHotThreadsAction extends ActionType { @@ -16,6 +17,6 @@ public class NodesHotThreadsAction extends ActionType { public static final String NAME = "cluster:monitor/nodes/hot_threads"; private NodesHotThreadsAction() { - super(NAME, NodesHotThreadsResponse::new); + super(NAME, Writeable.Reader.localOnly()); } } diff --git a/server/src/main/java/org/elasticsearch/action/admin/cluster/node/hotthreads/NodesHotThreadsResponse.java b/server/src/main/java/org/elasticsearch/action/admin/cluster/node/hotthreads/NodesHotThreadsResponse.java index 333629d4f522b..59307009f785b 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/cluster/node/hotthreads/NodesHotThreadsResponse.java +++ b/server/src/main/java/org/elasticsearch/action/admin/cluster/node/hotthreads/NodesHotThreadsResponse.java @@ -9,6 +9,7 @@ package org.elasticsearch.action.admin.cluster.node.hotthreads; import org.elasticsearch.action.FailedNodeException; +import org.elasticsearch.action.support.TransportAction; import org.elasticsearch.action.support.nodes.BaseNodesResponse; import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.common.collect.Iterators; @@ -26,10 +27,6 @@ public class NodesHotThreadsResponse extends BaseNodesResponse { - public NodesHotThreadsResponse(StreamInput in) throws IOException { - super(in); - } - public NodesHotThreadsResponse(ClusterName clusterName, List nodes, List failures) { super(clusterName, nodes, failures); } @@ -52,7 +49,7 @@ protected List readNodesFrom(StreamInput in) throws IOException @Override protected void writeNodesTo(StreamOutput out, List nodes) throws IOException { - out.writeCollection(nodes); + TransportAction.localOnly(); } private static class LinesIterator implements Iterator { diff --git a/server/src/main/java/org/elasticsearch/action/admin/cluster/node/hotthreads/TransportNodesHotThreadsAction.java b/server/src/main/java/org/elasticsearch/action/admin/cluster/node/hotthreads/TransportNodesHotThreadsAction.java index 26a101270d972..3be934f93dd27 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/cluster/node/hotthreads/TransportNodesHotThreadsAction.java +++ b/server/src/main/java/org/elasticsearch/action/admin/cluster/node/hotthreads/TransportNodesHotThreadsAction.java @@ -41,11 +41,9 @@ public TransportNodesHotThreadsAction( ) { super( NodesHotThreadsAction.NAME, - threadPool, clusterService, transportService, actionFilters, - NodesHotThreadsRequest::new, NodeRequest::new, threadPool.executor(ThreadPool.Names.GENERIC) ); @@ -87,6 +85,7 @@ protected NodeHotThreads nodeOperation(NodeRequest request, Task task) { public static class NodeRequest extends TransportRequest { + // TODO don't wrap the whole top-level request, it contains heavy and irrelevant DiscoveryNode things; see #100878 NodesHotThreadsRequest request; public NodeRequest(StreamInput in) throws IOException { diff --git a/server/src/main/java/org/elasticsearch/action/admin/cluster/node/info/NodesInfoAction.java b/server/src/main/java/org/elasticsearch/action/admin/cluster/node/info/NodesInfoAction.java index d1840d6c12364..5cb4a8ef3dbb8 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/cluster/node/info/NodesInfoAction.java +++ b/server/src/main/java/org/elasticsearch/action/admin/cluster/node/info/NodesInfoAction.java @@ -9,6 +9,7 @@ package org.elasticsearch.action.admin.cluster.node.info; import org.elasticsearch.action.ActionType; +import org.elasticsearch.common.io.stream.Writeable; public class NodesInfoAction extends ActionType { @@ -16,6 +17,6 @@ public class NodesInfoAction extends ActionType { public static final String NAME = "cluster:monitor/nodes/info"; private NodesInfoAction() { - super(NAME, NodesInfoResponse::new); + super(NAME, Writeable.Reader.localOnly()); } } diff --git a/server/src/main/java/org/elasticsearch/action/admin/cluster/node/info/NodesInfoRequest.java b/server/src/main/java/org/elasticsearch/action/admin/cluster/node/info/NodesInfoRequest.java index d7734f7a0eea3..0cf0baa75a8de 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/cluster/node/info/NodesInfoRequest.java +++ b/server/src/main/java/org/elasticsearch/action/admin/cluster/node/info/NodesInfoRequest.java @@ -22,7 +22,7 @@ */ public class NodesInfoRequest extends BaseNodesRequest { - private NodesInfoMetrics nodesInfoMetrics; + private final NodesInfoMetrics nodesInfoMetrics; /** * Create a new NodeInfoRequest from a {@link StreamInput} object. @@ -118,20 +118,6 @@ public void writeTo(StreamOutput out) throws IOException { nodesInfoMetrics.writeTo(out); } - /** - * Helper method for creating NodesInfoRequests with desired metrics - * @param metrics the metrics to include in the request - * @return - */ - public static NodesInfoRequest requestWithMetrics(NodesInfoMetrics.Metric... metrics) { - NodesInfoRequest nodesInfoRequest = new NodesInfoRequest(); - nodesInfoRequest.clear(); - for (var metric : metrics) { - nodesInfoRequest.addMetric(metric.metricName()); - } - return nodesInfoRequest; - } - public NodesInfoMetrics getNodesInfoMetrics() { return nodesInfoMetrics; } diff --git a/server/src/main/java/org/elasticsearch/action/admin/cluster/node/info/NodesInfoResponse.java b/server/src/main/java/org/elasticsearch/action/admin/cluster/node/info/NodesInfoResponse.java index 4fa99db192db5..fbf285d40f698 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/cluster/node/info/NodesInfoResponse.java +++ b/server/src/main/java/org/elasticsearch/action/admin/cluster/node/info/NodesInfoResponse.java @@ -9,6 +9,7 @@ package org.elasticsearch.action.admin.cluster.node.info; import org.elasticsearch.action.FailedNodeException; +import org.elasticsearch.action.support.TransportAction; import org.elasticsearch.action.support.nodes.BaseNodesResponse; import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.node.DiscoveryNodeRole; @@ -34,22 +35,18 @@ public class NodesInfoResponse extends BaseNodesResponse implements ToXContentFragment { - public NodesInfoResponse(StreamInput in) throws IOException { - super(in); - } - public NodesInfoResponse(ClusterName clusterName, List nodes, List failures) { super(clusterName, nodes, failures); } @Override protected List readNodesFrom(StreamInput in) throws IOException { - return in.readCollectionAsList(NodeInfo::new); + return TransportAction.localOnly(); } @Override protected void writeNodesTo(StreamOutput out, List nodes) throws IOException { - out.writeCollection(nodes); + TransportAction.localOnly(); } @Override diff --git a/server/src/main/java/org/elasticsearch/action/admin/cluster/node/info/TransportNodesInfoAction.java b/server/src/main/java/org/elasticsearch/action/admin/cluster/node/info/TransportNodesInfoAction.java index c263bea92ffa3..69ecca8fc4f3a 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/cluster/node/info/TransportNodesInfoAction.java +++ b/server/src/main/java/org/elasticsearch/action/admin/cluster/node/info/TransportNodesInfoAction.java @@ -46,11 +46,9 @@ public TransportNodesInfoAction( ) { super( NodesInfoAction.NAME, - threadPool, clusterService, transportService, actionFilters, - NodesInfoRequest::new, NodeInfoRequest::new, threadPool.executor(ThreadPool.Names.MANAGEMENT) ); @@ -97,7 +95,7 @@ protected NodeInfo nodeOperation(NodeInfoRequest nodeRequest, Task task) { public static class NodeInfoRequest extends TransportRequest { - private NodesInfoMetrics nodesInfoMetrics; + private final NodesInfoMetrics nodesInfoMetrics; public NodeInfoRequest(StreamInput in) throws IOException { super(in); diff --git a/server/src/main/java/org/elasticsearch/action/admin/cluster/node/reload/NodesReloadSecureSettingsAction.java b/server/src/main/java/org/elasticsearch/action/admin/cluster/node/reload/NodesReloadSecureSettingsAction.java index ce44ba0621912..3b09694958dcd 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/cluster/node/reload/NodesReloadSecureSettingsAction.java +++ b/server/src/main/java/org/elasticsearch/action/admin/cluster/node/reload/NodesReloadSecureSettingsAction.java @@ -9,6 +9,7 @@ package org.elasticsearch.action.admin.cluster.node.reload; import org.elasticsearch.action.ActionType; +import org.elasticsearch.common.io.stream.Writeable; public class NodesReloadSecureSettingsAction extends ActionType { @@ -16,6 +17,6 @@ public class NodesReloadSecureSettingsAction extends ActionType nodes, List failures) { super(clusterName, nodes, failures); } @Override protected List readNodesFrom(StreamInput in) throws IOException { - return in.readCollectionAsList(NodeResponse::new); + return TransportAction.localOnly(); } @Override protected void writeNodesTo(StreamOutput out, List nodes) throws IOException { - out.writeCollection(nodes); + TransportAction.localOnly(); } @Override diff --git a/server/src/main/java/org/elasticsearch/action/admin/cluster/node/reload/TransportNodesReloadSecureSettingsAction.java b/server/src/main/java/org/elasticsearch/action/admin/cluster/node/reload/TransportNodesReloadSecureSettingsAction.java index dbfffdbe4245f..7fa97f1ee14b7 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/cluster/node/reload/TransportNodesReloadSecureSettingsAction.java +++ b/server/src/main/java/org/elasticsearch/action/admin/cluster/node/reload/TransportNodesReloadSecureSettingsAction.java @@ -55,11 +55,9 @@ public TransportNodesReloadSecureSettingsAction( ) { super( NodesReloadSecureSettingsAction.NAME, - threadPool, clusterService, transportService, actionFilters, - NodesReloadSecureSettingsRequest::new, NodesReloadSecureSettingsRequest.NodeRequest::new, threadPool.executor(ThreadPool.Names.GENERIC) ); diff --git a/server/src/main/java/org/elasticsearch/action/admin/cluster/node/shutdown/PrevalidateShardPathRequest.java b/server/src/main/java/org/elasticsearch/action/admin/cluster/node/shutdown/PrevalidateShardPathRequest.java index 18464346ad889..7b636d766dbf2 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/cluster/node/shutdown/PrevalidateShardPathRequest.java +++ b/server/src/main/java/org/elasticsearch/action/admin/cluster/node/shutdown/PrevalidateShardPathRequest.java @@ -8,8 +8,8 @@ package org.elasticsearch.action.admin.cluster.node.shutdown; +import org.elasticsearch.action.support.TransportAction; import org.elasticsearch.action.support.nodes.BaseNodesRequest; -import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.index.shard.ShardId; @@ -27,15 +27,10 @@ public PrevalidateShardPathRequest(Set shardIds, String... nodeIds) { this.shardIds = Set.copyOf(Objects.requireNonNull(shardIds)); } - public PrevalidateShardPathRequest(StreamInput in) throws IOException { - super(in); - this.shardIds = in.readCollectionAsImmutableSet(ShardId::new); - } - @Override public void writeTo(StreamOutput out) throws IOException { super.writeTo(out); - out.writeCollection(shardIds); + TransportAction.localOnly(); } public Set getShardIds() { diff --git a/server/src/main/java/org/elasticsearch/action/admin/cluster/node/shutdown/PrevalidateShardPathResponse.java b/server/src/main/java/org/elasticsearch/action/admin/cluster/node/shutdown/PrevalidateShardPathResponse.java index 0267d299fe57a..d036d653962f5 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/cluster/node/shutdown/PrevalidateShardPathResponse.java +++ b/server/src/main/java/org/elasticsearch/action/admin/cluster/node/shutdown/PrevalidateShardPathResponse.java @@ -9,6 +9,7 @@ package org.elasticsearch.action.admin.cluster.node.shutdown; import org.elasticsearch.action.FailedNodeException; +import org.elasticsearch.action.support.TransportAction; import org.elasticsearch.action.support.nodes.BaseNodesResponse; import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.common.io.stream.StreamInput; @@ -27,17 +28,13 @@ public PrevalidateShardPathResponse( super(clusterName, nodes, failures); } - public PrevalidateShardPathResponse(StreamInput in) throws IOException { - super(in); - } - @Override protected List readNodesFrom(StreamInput in) throws IOException { - return in.readCollectionAsList(NodePrevalidateShardPathResponse::new); + return TransportAction.localOnly(); } @Override protected void writeNodesTo(StreamOutput out, List nodes) throws IOException { - out.writeCollection(nodes); + TransportAction.localOnly(); } } diff --git a/server/src/main/java/org/elasticsearch/action/admin/cluster/node/shutdown/TransportPrevalidateShardPathAction.java b/server/src/main/java/org/elasticsearch/action/admin/cluster/node/shutdown/TransportPrevalidateShardPathAction.java index a3f1f1bf523a7..1ca56183b4fd6 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/cluster/node/shutdown/TransportPrevalidateShardPathAction.java +++ b/server/src/main/java/org/elasticsearch/action/admin/cluster/node/shutdown/TransportPrevalidateShardPathAction.java @@ -18,6 +18,7 @@ import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.Writeable; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.env.NodeEnvironment; import org.elasticsearch.index.IndexSettings; @@ -46,7 +47,7 @@ public class TransportPrevalidateShardPathAction extends TransportNodesAction< NodePrevalidateShardPathResponse> { public static final String ACTION_NAME = "internal:admin/indices/prevalidate_shard_path"; - public static final ActionType TYPE = new ActionType<>(ACTION_NAME, PrevalidateShardPathResponse::new); + public static final ActionType TYPE = new ActionType<>(ACTION_NAME, Writeable.Reader.localOnly()); private static final Logger logger = LogManager.getLogger(TransportPrevalidateShardPathAction.class); private final TransportService transportService; @@ -64,11 +65,9 @@ public TransportPrevalidateShardPathAction( ) { super( ACTION_NAME, - threadPool, clusterService, transportService, actionFilters, - PrevalidateShardPathRequest::new, NodePrevalidateShardPathRequest::new, threadPool.executor(ThreadPool.Names.MANAGEMENT) ); diff --git a/server/src/main/java/org/elasticsearch/action/admin/cluster/node/stats/NodesStatsAction.java b/server/src/main/java/org/elasticsearch/action/admin/cluster/node/stats/NodesStatsAction.java index 5bc467b37f4b5..c475088fc434e 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/cluster/node/stats/NodesStatsAction.java +++ b/server/src/main/java/org/elasticsearch/action/admin/cluster/node/stats/NodesStatsAction.java @@ -9,6 +9,7 @@ package org.elasticsearch.action.admin.cluster.node.stats; import org.elasticsearch.action.ActionType; +import org.elasticsearch.common.io.stream.Writeable; public class NodesStatsAction extends ActionType { @@ -16,6 +17,6 @@ public class NodesStatsAction extends ActionType { public static final String NAME = "cluster:monitor/nodes/stats"; private NodesStatsAction() { - super(NAME, NodesStatsResponse::new); + super(NAME, Writeable.Reader.localOnly()); } } diff --git a/server/src/main/java/org/elasticsearch/action/admin/cluster/node/stats/NodesStatsResponse.java b/server/src/main/java/org/elasticsearch/action/admin/cluster/node/stats/NodesStatsResponse.java index c99629bbdfd62..09bb6909191d1 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/cluster/node/stats/NodesStatsResponse.java +++ b/server/src/main/java/org/elasticsearch/action/admin/cluster/node/stats/NodesStatsResponse.java @@ -9,6 +9,7 @@ package org.elasticsearch.action.admin.cluster.node.stats; import org.elasticsearch.action.FailedNodeException; +import org.elasticsearch.action.support.TransportAction; import org.elasticsearch.action.support.nodes.BaseNodesXContentResponse; import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.common.Strings; @@ -24,22 +25,18 @@ public class NodesStatsResponse extends BaseNodesXContentResponse { - public NodesStatsResponse(StreamInput in) throws IOException { - super(in); - } - public NodesStatsResponse(ClusterName clusterName, List nodes, List failures) { super(clusterName, nodes, failures); } @Override protected List readNodesFrom(StreamInput in) throws IOException { - return in.readCollectionAsList(NodeStats::new); + return TransportAction.localOnly(); } @Override protected void writeNodesTo(StreamOutput out, List nodes) throws IOException { - out.writeCollection(nodes); + TransportAction.localOnly(); } @Override diff --git a/server/src/main/java/org/elasticsearch/action/admin/cluster/node/stats/TransportNodesStatsAction.java b/server/src/main/java/org/elasticsearch/action/admin/cluster/node/stats/TransportNodesStatsAction.java index 3b9298eac16b0..10b22694b8a74 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/cluster/node/stats/TransportNodesStatsAction.java +++ b/server/src/main/java/org/elasticsearch/action/admin/cluster/node/stats/TransportNodesStatsAction.java @@ -48,11 +48,9 @@ public TransportNodesStatsAction( ) { super( NodesStatsAction.NAME, - threadPool, clusterService, transportService, actionFilters, - NodesStatsRequest::new, NodeStatsRequest::new, threadPool.executor(ThreadPool.Names.MANAGEMENT) ); @@ -104,6 +102,7 @@ protected NodeStats nodeOperation(NodeStatsRequest nodeStatsRequest, Task task) public static class NodeStatsRequest extends TransportRequest { + // TODO don't wrap the whole top-level request, it contains heavy and irrelevant DiscoveryNode things; see #100878 NodesStatsRequest request; public NodeStatsRequest(StreamInput in) throws IOException { diff --git a/server/src/main/java/org/elasticsearch/action/admin/cluster/node/usage/NodesUsageAction.java b/server/src/main/java/org/elasticsearch/action/admin/cluster/node/usage/NodesUsageAction.java index 384f075904141..1e9232aae28ba 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/cluster/node/usage/NodesUsageAction.java +++ b/server/src/main/java/org/elasticsearch/action/admin/cluster/node/usage/NodesUsageAction.java @@ -9,6 +9,7 @@ package org.elasticsearch.action.admin.cluster.node.usage; import org.elasticsearch.action.ActionType; +import org.elasticsearch.common.io.stream.Writeable; public class NodesUsageAction extends ActionType { @@ -16,6 +17,6 @@ public class NodesUsageAction extends ActionType { public static final String NAME = "cluster:monitor/nodes/usage"; protected NodesUsageAction() { - super(NAME, NodesUsageResponse::new); + super(NAME, Writeable.Reader.localOnly()); } } diff --git a/server/src/main/java/org/elasticsearch/action/admin/cluster/node/usage/NodesUsageResponse.java b/server/src/main/java/org/elasticsearch/action/admin/cluster/node/usage/NodesUsageResponse.java index 0ce0bcfb884eb..715d8d09d9098 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/cluster/node/usage/NodesUsageResponse.java +++ b/server/src/main/java/org/elasticsearch/action/admin/cluster/node/usage/NodesUsageResponse.java @@ -9,6 +9,7 @@ package org.elasticsearch.action.admin.cluster.node.usage; import org.elasticsearch.action.FailedNodeException; +import org.elasticsearch.action.support.TransportAction; import org.elasticsearch.action.support.nodes.BaseNodesResponse; import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.common.Strings; @@ -26,22 +27,18 @@ */ public class NodesUsageResponse extends BaseNodesResponse implements ToXContentFragment { - public NodesUsageResponse(StreamInput in) throws IOException { - super(in); - } - public NodesUsageResponse(ClusterName clusterName, List nodes, List failures) { super(clusterName, nodes, failures); } @Override protected List readNodesFrom(StreamInput in) throws IOException { - return in.readCollectionAsList(NodeUsage::new); + return TransportAction.localOnly(); } @Override protected void writeNodesTo(StreamOutput out, List nodes) throws IOException { - out.writeCollection(nodes); + TransportAction.localOnly(); } @Override diff --git a/server/src/main/java/org/elasticsearch/action/admin/cluster/node/usage/TransportNodesUsageAction.java b/server/src/main/java/org/elasticsearch/action/admin/cluster/node/usage/TransportNodesUsageAction.java index 0619871fdb055..71802fea0a47c 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/cluster/node/usage/TransportNodesUsageAction.java +++ b/server/src/main/java/org/elasticsearch/action/admin/cluster/node/usage/TransportNodesUsageAction.java @@ -48,11 +48,9 @@ public TransportNodesUsageAction( ) { super( NodesUsageAction.NAME, - threadPool, clusterService, transportService, actionFilters, - NodesUsageRequest::new, NodeUsageRequest::new, threadPool.executor(ThreadPool.Names.MANAGEMENT) ); @@ -86,6 +84,7 @@ protected NodeUsage nodeOperation(NodeUsageRequest nodeUsageRequest, Task task) public static class NodeUsageRequest extends TransportRequest { + // TODO don't wrap the whole top-level request, it contains heavy and irrelevant DiscoveryNode things; see #100878 NodesUsageRequest request; public NodeUsageRequest(StreamInput in) throws IOException { diff --git a/server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/status/TransportNodesSnapshotsStatus.java b/server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/status/TransportNodesSnapshotsStatus.java index ce8b141c92a42..a5c8b13f8dc73 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/status/TransportNodesSnapshotsStatus.java +++ b/server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/status/TransportNodesSnapshotsStatus.java @@ -12,6 +12,7 @@ import org.elasticsearch.action.ActionType; import org.elasticsearch.action.FailedNodeException; import org.elasticsearch.action.support.ActionFilters; +import org.elasticsearch.action.support.TransportAction; import org.elasticsearch.action.support.nodes.BaseNodeResponse; import org.elasticsearch.action.support.nodes.BaseNodesRequest; import org.elasticsearch.action.support.nodes.BaseNodesResponse; @@ -22,6 +23,7 @@ import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.io.stream.Writeable; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.snapshots.IndexShardSnapshotStatus; import org.elasticsearch.snapshots.Snapshot; @@ -49,7 +51,7 @@ public class TransportNodesSnapshotsStatus extends TransportNodesAction< TransportNodesSnapshotsStatus.NodeSnapshotStatus> { public static final String ACTION_NAME = SnapshotsStatusAction.NAME + "[nodes]"; - public static final ActionType TYPE = new ActionType<>(ACTION_NAME, NodesSnapshotStatus::new); + public static final ActionType TYPE = new ActionType<>(ACTION_NAME, Writeable.Reader.localOnly()); private final SnapshotShardsService snapshotShardsService; @@ -63,11 +65,9 @@ public TransportNodesSnapshotsStatus( ) { super( ACTION_NAME, - threadPool, clusterService, transportService, actionFilters, - Request::new, NodeRequest::new, threadPool.executor(ThreadPool.Names.GENERIC) ); @@ -125,12 +125,6 @@ public static class Request extends BaseNodesRequest { private Snapshot[] snapshots; - public Request(StreamInput in) throws IOException { - super(in); - // This operation is never executed remotely - throw new UnsupportedOperationException("shouldn't be here"); - } - public Request(String[] nodesIds) { super(nodesIds); } @@ -142,17 +136,12 @@ public Request snapshots(Snapshot[] snapshots) { @Override public void writeTo(StreamOutput out) throws IOException { - // This operation is never executed remotely - throw new UnsupportedOperationException("shouldn't be here"); + TransportAction.localOnly(); } } public static class NodesSnapshotStatus extends BaseNodesResponse { - public NodesSnapshotStatus(StreamInput in) throws IOException { - super(in); - } - public NodesSnapshotStatus(ClusterName clusterName, List nodes, List failures) { super(clusterName, nodes, failures); } diff --git a/server/src/main/java/org/elasticsearch/action/admin/cluster/stats/ClusterStatsAction.java b/server/src/main/java/org/elasticsearch/action/admin/cluster/stats/ClusterStatsAction.java index 728289e46b50b..a6bd5bb1f66da 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/cluster/stats/ClusterStatsAction.java +++ b/server/src/main/java/org/elasticsearch/action/admin/cluster/stats/ClusterStatsAction.java @@ -9,6 +9,7 @@ package org.elasticsearch.action.admin.cluster.stats; import org.elasticsearch.action.ActionType; +import org.elasticsearch.common.io.stream.Writeable; public class ClusterStatsAction extends ActionType { @@ -16,6 +17,6 @@ public class ClusterStatsAction extends ActionType { public static final String NAME = "cluster:monitor/stats"; private ClusterStatsAction() { - super(NAME, ClusterStatsResponse::new); + super(NAME, Writeable.Reader.localOnly()); } } diff --git a/server/src/main/java/org/elasticsearch/action/admin/cluster/stats/ClusterStatsResponse.java b/server/src/main/java/org/elasticsearch/action/admin/cluster/stats/ClusterStatsResponse.java index 54dc5c7a90eff..36e7b247befac 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/cluster/stats/ClusterStatsResponse.java +++ b/server/src/main/java/org/elasticsearch/action/admin/cluster/stats/ClusterStatsResponse.java @@ -8,8 +8,8 @@ package org.elasticsearch.action.admin.cluster.stats; -import org.elasticsearch.TransportVersions; import org.elasticsearch.action.FailedNodeException; +import org.elasticsearch.action.support.TransportAction; import org.elasticsearch.action.support.nodes.BaseNodesResponse; import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.ClusterSnapshotStats; @@ -33,33 +33,6 @@ public class ClusterStatsResponse extends BaseNodesResponse readNodesFrom(StreamInput in) throws IOException { - return in.readCollectionAsList(ClusterStatsNodeResponse::readNodeResponse); + return TransportAction.localOnly(); } @Override protected void writeNodesTo(StreamOutput out, List nodes) throws IOException { - // nodeStats and indicesStats are rebuilt from nodes - out.writeCollection(nodes); + TransportAction.localOnly(); } @Override diff --git a/server/src/main/java/org/elasticsearch/action/admin/cluster/stats/TransportClusterStatsAction.java b/server/src/main/java/org/elasticsearch/action/admin/cluster/stats/TransportClusterStatsAction.java index 5e1aab81ae934..d8fff551c0551 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/cluster/stats/TransportClusterStatsAction.java +++ b/server/src/main/java/org/elasticsearch/action/admin/cluster/stats/TransportClusterStatsAction.java @@ -90,11 +90,9 @@ public TransportClusterStatsAction( ) { super( ClusterStatsAction.NAME, - threadPool, clusterService, transportService, actionFilters, - ClusterStatsRequest::new, ClusterStatsNodeRequest::new, threadPool.executor(ThreadPool.Names.MANAGEMENT) ); @@ -253,6 +251,7 @@ protected ClusterStatsNodeResponse nodeOperation(ClusterStatsNodeRequest nodeReq public static class ClusterStatsNodeRequest extends TransportRequest { + // TODO don't wrap the whole top-level request, it contains heavy and irrelevant DiscoveryNode things; see #100878 ClusterStatsRequest request; public ClusterStatsNodeRequest(StreamInput in) throws IOException { diff --git a/server/src/main/java/org/elasticsearch/action/admin/indices/dangling/find/FindDanglingIndexAction.java b/server/src/main/java/org/elasticsearch/action/admin/indices/dangling/find/FindDanglingIndexAction.java index 5f2e5f091e5fb..107d2d1734183 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/indices/dangling/find/FindDanglingIndexAction.java +++ b/server/src/main/java/org/elasticsearch/action/admin/indices/dangling/find/FindDanglingIndexAction.java @@ -9,6 +9,7 @@ package org.elasticsearch.action.admin.indices.dangling.find; import org.elasticsearch.action.ActionType; +import org.elasticsearch.common.io.stream.Writeable; /** * Represents a request to find a particular dangling index by UUID. @@ -19,6 +20,6 @@ public class FindDanglingIndexAction extends ActionType { private final String indexUUID; - public FindDanglingIndexRequest(StreamInput in) throws IOException { - super(in); - this.indexUUID = in.readString(); - } - public FindDanglingIndexRequest(String indexUUID) { super(Strings.EMPTY_ARRAY); this.indexUUID = indexUUID; @@ -39,7 +34,6 @@ public String toString() { @Override public void writeTo(StreamOutput out) throws IOException { - super.writeTo(out); - out.writeString(this.indexUUID); + TransportAction.localOnly(); } } diff --git a/server/src/main/java/org/elasticsearch/action/admin/indices/dangling/find/FindDanglingIndexResponse.java b/server/src/main/java/org/elasticsearch/action/admin/indices/dangling/find/FindDanglingIndexResponse.java index c782abed32190..db2e433a4b9d0 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/indices/dangling/find/FindDanglingIndexResponse.java +++ b/server/src/main/java/org/elasticsearch/action/admin/indices/dangling/find/FindDanglingIndexResponse.java @@ -9,6 +9,7 @@ package org.elasticsearch.action.admin.indices.dangling.find; import org.elasticsearch.action.FailedNodeException; +import org.elasticsearch.action.support.TransportAction; import org.elasticsearch.action.support.nodes.BaseNodesResponse; import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.common.io.stream.StreamInput; @@ -23,10 +24,6 @@ */ public class FindDanglingIndexResponse extends BaseNodesResponse { - public FindDanglingIndexResponse(StreamInput in) throws IOException { - super(in); - } - public FindDanglingIndexResponse( ClusterName clusterName, List nodes, @@ -37,11 +34,11 @@ public FindDanglingIndexResponse( @Override protected List readNodesFrom(StreamInput in) throws IOException { - return in.readCollectionAsList(NodeFindDanglingIndexResponse::new); + return TransportAction.localOnly(); } @Override protected void writeNodesTo(StreamOutput out, List nodes) throws IOException { - out.writeCollection(nodes); + TransportAction.localOnly(); } } diff --git a/server/src/main/java/org/elasticsearch/action/admin/indices/dangling/find/TransportFindDanglingIndexAction.java b/server/src/main/java/org/elasticsearch/action/admin/indices/dangling/find/TransportFindDanglingIndexAction.java index bc1ebd2455a73..553e3915b3e3f 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/indices/dangling/find/TransportFindDanglingIndexAction.java +++ b/server/src/main/java/org/elasticsearch/action/admin/indices/dangling/find/TransportFindDanglingIndexAction.java @@ -47,11 +47,9 @@ public TransportFindDanglingIndexAction( ) { super( FindDanglingIndexAction.NAME, - threadPool, clusterService, transportService, actionFilters, - FindDanglingIndexRequest::new, NodeFindDanglingIndexRequest::new, threadPool.executor(ThreadPool.Names.MANAGEMENT) ); diff --git a/server/src/main/java/org/elasticsearch/action/admin/indices/dangling/list/ListDanglingIndicesAction.java b/server/src/main/java/org/elasticsearch/action/admin/indices/dangling/list/ListDanglingIndicesAction.java index 635d8ea772d8f..3db80832f4959 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/indices/dangling/list/ListDanglingIndicesAction.java +++ b/server/src/main/java/org/elasticsearch/action/admin/indices/dangling/list/ListDanglingIndicesAction.java @@ -9,6 +9,7 @@ package org.elasticsearch.action.admin.indices.dangling.list; import org.elasticsearch.action.ActionType; +import org.elasticsearch.common.io.stream.Writeable; /** * Represents a request to list all dangling indices known to the cluster. @@ -19,6 +20,6 @@ public class ListDanglingIndicesAction extends ActionType implements ToXContentObject { - public ListDanglingIndicesResponse(StreamInput in) throws IOException { - super(in); - } - public ListDanglingIndicesResponse( ClusterName clusterName, List nodes, @@ -93,12 +90,12 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws @Override protected List readNodesFrom(StreamInput in) throws IOException { - return in.readCollectionAsList(NodeListDanglingIndicesResponse::new); + return TransportAction.localOnly(); } @Override protected void writeNodesTo(StreamOutput out, List nodes) throws IOException { - out.writeCollection(nodes); + TransportAction.localOnly(); } // visible for testing diff --git a/server/src/main/java/org/elasticsearch/action/admin/indices/dangling/list/TransportListDanglingIndicesAction.java b/server/src/main/java/org/elasticsearch/action/admin/indices/dangling/list/TransportListDanglingIndicesAction.java index 38a0cb0dd7272..6e0a27f7fe822 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/indices/dangling/list/TransportListDanglingIndicesAction.java +++ b/server/src/main/java/org/elasticsearch/action/admin/indices/dangling/list/TransportListDanglingIndicesAction.java @@ -48,11 +48,9 @@ public TransportListDanglingIndicesAction( ) { super( ListDanglingIndicesAction.NAME, - threadPool, clusterService, transportService, actionFilters, - ListDanglingIndicesRequest::new, NodeListDanglingIndicesRequest::new, threadPool.executor(ThreadPool.Names.MANAGEMENT) ); diff --git a/server/src/main/java/org/elasticsearch/action/support/nodes/BaseNodesRequest.java b/server/src/main/java/org/elasticsearch/action/support/nodes/BaseNodesRequest.java index c4518e878df00..5d6eca2ef005e 100644 --- a/server/src/main/java/org/elasticsearch/action/support/nodes/BaseNodesRequest.java +++ b/server/src/main/java/org/elasticsearch/action/support/nodes/BaseNodesRequest.java @@ -40,6 +40,8 @@ public abstract class BaseNodesRequest private TimeValue timeout; protected BaseNodesRequest(StreamInput in) throws IOException { + // A bare `BaseNodesRequest` is never sent over the wire, but several implementations send the full top-level request to each node + // (wrapped up in another request). They shouldn't, but until we fix that we must keep this. See #100878. super(in); nodesIds = in.readStringArray(); concreteNodes = in.readOptionalArray(DiscoveryNode::new, DiscoveryNode[]::new); @@ -96,6 +98,8 @@ public ActionRequestValidationException validate() { @Override public void writeTo(StreamOutput out) throws IOException { + // A bare `BaseNodesRequest` is never sent over the wire, but several implementations send the full top-level request to each node + // (wrapped up in another request). They shouldn't, but until we fix that we must keep this. See #100878. super.writeTo(out); out.writeStringArrayNullable(nodesIds); out.writeOptionalArray(concreteNodes); diff --git a/server/src/main/java/org/elasticsearch/action/support/nodes/TransportNodesAction.java b/server/src/main/java/org/elasticsearch/action/support/nodes/TransportNodesAction.java index e974e15966ce9..27dfa1ff609c7 100644 --- a/server/src/main/java/org/elasticsearch/action/support/nodes/TransportNodesAction.java +++ b/server/src/main/java/org/elasticsearch/action/support/nodes/TransportNodesAction.java @@ -15,8 +15,9 @@ import org.elasticsearch.action.FailedNodeException; import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.CancellableFanOut; -import org.elasticsearch.action.support.HandledTransportAction; +import org.elasticsearch.action.support.ChannelActionListener; import org.elasticsearch.action.support.ThreadedActionListener; +import org.elasticsearch.action.support.TransportAction; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.service.ClusterService; @@ -46,7 +47,7 @@ public abstract class TransportNodesAction< NodesRequest extends BaseNodesRequest, NodesResponse extends BaseNodesResponse, NodeRequest extends TransportRequest, - NodeResponse extends BaseNodeResponse> extends HandledTransportAction { + NodeResponse extends BaseNodeResponse> extends TransportAction { private static final Logger logger = LogManager.getLogger(TransportNodesAction.class); @@ -58,26 +59,21 @@ public abstract class TransportNodesAction< /** * @param actionName action name - * @param threadPool thread-pool * @param clusterService cluster service * @param transportService transport service * @param actionFilters action filters - * @param request node request writer * @param nodeRequest node request reader * @param executor executor to execute node action and final collection */ protected TransportNodesAction( String actionName, - ThreadPool threadPool, ClusterService clusterService, TransportService transportService, ActionFilters actionFilters, - Writeable.Reader request, Writeable.Reader nodeRequest, Executor executor ) { - // coordination can run on SAME because it's only O(#nodes) work - super(actionName, transportService, actionFilters, request, EsExecutors.DIRECT_EXECUTOR_SERVICE); + super(actionName, actionFilters, transportService.getTaskManager()); assert executor.equals(EsExecutors.DIRECT_EXECUTOR_SERVICE) == false : "TransportNodesAction must always fork off the transport thread"; this.clusterService = Objects.requireNonNull(clusterService); @@ -87,8 +83,34 @@ protected TransportNodesAction( transportService.registerRequestHandler(transportNodeAction, finalExecutor, nodeRequest, new NodeTransportHandler()); } + /** + * @deprecated Use the local-only constructor instead. + */ + @Deprecated(forRemoval = true) + protected TransportNodesAction( + String actionName, + ThreadPool threadPool, + ClusterService clusterService, + TransportService transportService, + ActionFilters actionFilters, + Writeable.Reader requestReader, + Writeable.Reader nodeRequest, + Executor executor + ) { + this(actionName, clusterService, transportService, actionFilters, nodeRequest, executor); + transportService.registerRequestHandler( + actionName, + executor, + false, + true, + requestReader, + (request, channel, task) -> execute(task, request, new ChannelActionListener<>(channel)) + ); + } + @Override protected void doExecute(Task task, NodesRequest request, ActionListener listener) { + // coordination can run on SAME because it's only O(#nodes) work if (request.concreteNodes() == null) { resolveRequest(request, clusterService.state()); assert request.concreteNodes() != null; diff --git a/server/src/main/java/org/elasticsearch/gateway/TransportNodesListGatewayStartedShards.java b/server/src/main/java/org/elasticsearch/gateway/TransportNodesListGatewayStartedShards.java index a0246eb29de82..68a1c54fd9869 100644 --- a/server/src/main/java/org/elasticsearch/gateway/TransportNodesListGatewayStartedShards.java +++ b/server/src/main/java/org/elasticsearch/gateway/TransportNodesListGatewayStartedShards.java @@ -15,6 +15,7 @@ import org.elasticsearch.action.ActionType; import org.elasticsearch.action.FailedNodeException; import org.elasticsearch.action.support.ActionFilters; +import org.elasticsearch.action.support.TransportAction; import org.elasticsearch.action.support.nodes.BaseNodeResponse; import org.elasticsearch.action.support.nodes.BaseNodesRequest; import org.elasticsearch.action.support.nodes.BaseNodesResponse; @@ -26,6 +27,7 @@ import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.io.stream.Writeable; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.core.Nullable; import org.elasticsearch.env.NodeEnvironment; @@ -61,7 +63,7 @@ public class TransportNodesListGatewayStartedShards extends TransportNodesAction private static final Logger logger = LogManager.getLogger(TransportNodesListGatewayStartedShards.class); public static final String ACTION_NAME = "internal:gateway/local/started_shards"; - public static final ActionType TYPE = new ActionType<>(ACTION_NAME, NodesGatewayStartedShards::new); + public static final ActionType TYPE = new ActionType<>(ACTION_NAME, Writeable.Reader.localOnly()); private final Settings settings; private final NodeEnvironment nodeEnv; @@ -81,11 +83,9 @@ public TransportNodesListGatewayStartedShards( ) { super( ACTION_NAME, - threadPool, clusterService, transportService, actionFilters, - Request::new, NodeRequest::new, threadPool.executor(ThreadPool.Names.FETCH_SHARD_STARTED) ); @@ -187,16 +187,6 @@ public static class Request extends BaseNodesRequest { @Nullable private final String customDataPath; - public Request(StreamInput in) throws IOException { - super(in); - shardId = new ShardId(in); - if (in.getTransportVersion().onOrAfter(TransportVersions.V_7_6_0)) { - customDataPath = in.readString(); - } else { - customDataPath = null; - } - } - public Request(ShardId shardId, String customDataPath, DiscoveryNode[] nodes) { super(nodes); this.shardId = Objects.requireNonNull(shardId); @@ -219,20 +209,12 @@ public String getCustomDataPath() { @Override public void writeTo(StreamOutput out) throws IOException { - super.writeTo(out); - shardId.writeTo(out); - if (out.getTransportVersion().onOrAfter(TransportVersions.V_7_6_0)) { - out.writeString(customDataPath); - } + TransportAction.localOnly(); } } public static class NodesGatewayStartedShards extends BaseNodesResponse { - public NodesGatewayStartedShards(StreamInput in) throws IOException { - super(in); - } - public NodesGatewayStartedShards( ClusterName clusterName, List nodes, @@ -243,12 +225,12 @@ public NodesGatewayStartedShards( @Override protected List readNodesFrom(StreamInput in) throws IOException { - return in.readCollectionAsList(NodeGatewayStartedShards::new); + return TransportAction.localOnly(); } @Override protected void writeNodesTo(StreamOutput out, List nodes) throws IOException { - out.writeCollection(nodes); + TransportAction.localOnly(); } } diff --git a/server/src/main/java/org/elasticsearch/health/stats/HealthApiStatsAction.java b/server/src/main/java/org/elasticsearch/health/stats/HealthApiStatsAction.java index 61894beade382..7ee616ab2f3b9 100644 --- a/server/src/main/java/org/elasticsearch/health/stats/HealthApiStatsAction.java +++ b/server/src/main/java/org/elasticsearch/health/stats/HealthApiStatsAction.java @@ -10,6 +10,7 @@ import org.elasticsearch.action.ActionType; import org.elasticsearch.action.FailedNodeException; +import org.elasticsearch.action.support.TransportAction; import org.elasticsearch.action.support.nodes.BaseNodeResponse; import org.elasticsearch.action.support.nodes.BaseNodesRequest; import org.elasticsearch.action.support.nodes.BaseNodesResponse; @@ -17,6 +18,7 @@ import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.io.stream.Writeable; import org.elasticsearch.common.metrics.Counters; import org.elasticsearch.core.Nullable; import org.elasticsearch.transport.TransportRequest; @@ -34,7 +36,7 @@ public class HealthApiStatsAction extends ActionType { @@ -43,13 +45,9 @@ public Request() { super((String[]) null); } - public Request(StreamInput in) throws IOException { - super(in); - } - @Override public void writeTo(StreamOutput out) throws IOException { - super.writeTo(out); + TransportAction.localOnly(); } @Override @@ -74,27 +72,23 @@ public void writeTo(StreamOutput out) throws IOException { public static class Response extends BaseNodesResponse { - public Response(StreamInput in) throws IOException { - super(in); - } - public Response(ClusterName clusterName, List nodes, List failures) { super(clusterName, nodes, failures); } @Override public void writeTo(StreamOutput out) throws IOException { - super.writeTo(out); + TransportAction.localOnly(); } @Override protected List readNodesFrom(StreamInput in) throws IOException { - return in.readCollectionAsList(Node::new); + return TransportAction.localOnly(); } @Override protected void writeNodesTo(StreamOutput out, List nodes) throws IOException { - out.writeCollection(nodes); + TransportAction.localOnly(); } public Counters getStats() { diff --git a/server/src/main/java/org/elasticsearch/health/stats/HealthApiStatsTransportAction.java b/server/src/main/java/org/elasticsearch/health/stats/HealthApiStatsTransportAction.java index 55d496e2b8c59..f329cb11430c8 100644 --- a/server/src/main/java/org/elasticsearch/health/stats/HealthApiStatsTransportAction.java +++ b/server/src/main/java/org/elasticsearch/health/stats/HealthApiStatsTransportAction.java @@ -42,11 +42,9 @@ public HealthApiStatsTransportAction( ) { super( HealthApiStatsAction.NAME, - threadPool, clusterService, transportService, actionFilters, - HealthApiStatsAction.Request::new, HealthApiStatsAction.Request.Node::new, threadPool.executor(ThreadPool.Names.MANAGEMENT) ); diff --git a/server/src/main/java/org/elasticsearch/indices/store/TransportNodesListShardStoreMetadata.java b/server/src/main/java/org/elasticsearch/indices/store/TransportNodesListShardStoreMetadata.java index bbadd7038b6e5..ae9f7b730d60b 100644 --- a/server/src/main/java/org/elasticsearch/indices/store/TransportNodesListShardStoreMetadata.java +++ b/server/src/main/java/org/elasticsearch/indices/store/TransportNodesListShardStoreMetadata.java @@ -15,6 +15,7 @@ import org.elasticsearch.action.ActionType; import org.elasticsearch.action.FailedNodeException; import org.elasticsearch.action.support.ActionFilters; +import org.elasticsearch.action.support.TransportAction; import org.elasticsearch.action.support.nodes.BaseNodeResponse; import org.elasticsearch.action.support.nodes.BaseNodesRequest; import org.elasticsearch.action.support.nodes.BaseNodesResponse; @@ -63,7 +64,7 @@ public class TransportNodesListShardStoreMetadata extends TransportNodesAction< private static final Logger logger = LogManager.getLogger(TransportNodesListShardStoreMetadata.class); public static final String ACTION_NAME = "internal:cluster/nodes/indices/shard/store"; - public static final ActionType TYPE = new ActionType<>(ACTION_NAME, NodesStoreFilesMetadata::new); + public static final ActionType TYPE = new ActionType<>(ACTION_NAME, Writeable.Reader.localOnly()); private final Settings settings; private final IndicesService indicesService; @@ -81,11 +82,9 @@ public TransportNodesListShardStoreMetadata( ) { super( ACTION_NAME, - threadPool, clusterService, transportService, actionFilters, - Request::new, NodeRequest::new, threadPool.executor(ThreadPool.Names.FETCH_SHARD_STORE) ); @@ -283,12 +282,6 @@ public static class Request extends BaseNodesRequest { @Nullable private final String customDataPath; - public Request(StreamInput in) throws IOException { - super(in); - shardId = new ShardId(in); - customDataPath = in.readString(); - } - public Request(ShardId shardId, String customDataPath, DiscoveryNode[] nodes) { super(nodes); this.shardId = Objects.requireNonNull(shardId); @@ -311,30 +304,24 @@ public String getCustomDataPath() { @Override public void writeTo(StreamOutput out) throws IOException { - super.writeTo(out); - shardId.writeTo(out); - out.writeString(customDataPath); + TransportAction.localOnly(); } } public static class NodesStoreFilesMetadata extends BaseNodesResponse { - public NodesStoreFilesMetadata(StreamInput in) throws IOException { - super(in); - } - public NodesStoreFilesMetadata(ClusterName clusterName, List nodes, List failures) { super(clusterName, nodes, failures); } @Override protected List readNodesFrom(StreamInput in) throws IOException { - return in.readCollectionAsList(NodeStoreFilesMetadata::readListShardStoreNodeOperationResponse); + return TransportAction.localOnly(); } @Override protected void writeNodesTo(StreamOutput out, List nodes) throws IOException { - out.writeCollection(nodes); + TransportAction.localOnly(); } } diff --git a/server/src/main/java/org/elasticsearch/repositories/RepositoryData.java b/server/src/main/java/org/elasticsearch/repositories/RepositoryData.java index b012c1dd7de24..02a92bd3d848d 100644 --- a/server/src/main/java/org/elasticsearch/repositories/RepositoryData.java +++ b/server/src/main/java/org/elasticsearch/repositories/RepositoryData.java @@ -15,6 +15,7 @@ import org.elasticsearch.cluster.metadata.Metadata; import org.elasticsearch.common.Strings; import org.elasticsearch.common.UUIDs; +import org.elasticsearch.common.collect.Iterators; import org.elasticsearch.common.util.CollectionUtils; import org.elasticsearch.common.util.Maps; import org.elasticsearch.common.xcontent.XContentParserUtils; @@ -35,6 +36,7 @@ import java.util.Collections; import java.util.HashMap; import java.util.HashSet; +import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Objects; @@ -326,25 +328,36 @@ public Map getIndices() { } /** - * Returns the list of {@link IndexId} that have their snapshots updated but not removed (because they are still referenced by other - * snapshots) after removing the given snapshot from the repository. + * Returns an iterator over {@link IndexId} that have their snapshots updated but not removed (because they are still referenced by + * other snapshots) after removing the given snapshot from the repository. * * @param snapshotIds SnapshotId to remove - * @return List of indices that are changed but not removed + * @return Iterator over indices that are changed but not removed */ - public List indicesToUpdateAfterRemovingSnapshot(Collection snapshotIds) { - return indexSnapshots.entrySet().stream().filter(entry -> { - final Collection existingIds = entry.getValue(); - if (snapshotIds.containsAll(existingIds)) { - return existingIds.size() > snapshotIds.size(); + public Iterator indicesToUpdateAfterRemovingSnapshot(Collection snapshotIds) { + return Iterators.flatMap(indexSnapshots.entrySet().iterator(), entry -> { + if (isIndexToUpdateAfterRemovingSnapshots(entry.getValue(), snapshotIds)) { + return Iterators.single(entry.getKey()); + } else { + return Collections.emptyIterator(); } - for (SnapshotId snapshotId : snapshotIds) { - if (entry.getValue().contains(snapshotId)) { - return true; - } + }); + } + + private static boolean isIndexToUpdateAfterRemovingSnapshots( + Collection snapshotsContainingIndex, + Collection snapshotsToDelete + ) { + // TODO this method is pretty opaque, let's add some comments + if (snapshotsToDelete.containsAll(snapshotsContainingIndex)) { + return snapshotsContainingIndex.size() > snapshotsToDelete.size(); + } + for (SnapshotId snapshotId : snapshotsToDelete) { + if (snapshotsContainingIndex.contains(snapshotId)) { + return true; } - return false; - }).map(Map.Entry::getKey).toList(); + } + return false; } /** @@ -356,7 +369,7 @@ public List indicesToUpdateAfterRemovingSnapshot(Collection * @return map of index to index metadata blob id to delete */ public Map> indexMetaDataToRemoveAfterRemovingSnapshots(Collection snapshotIds) { - Collection indicesForSnapshot = indicesToUpdateAfterRemovingSnapshot(snapshotIds); + Iterator indicesForSnapshot = indicesToUpdateAfterRemovingSnapshot(snapshotIds); final Set allRemainingIdentifiers = indexMetaDataGenerations.lookup.entrySet() .stream() .filter(e -> snapshotIds.contains(e.getKey()) == false) @@ -364,7 +377,8 @@ public Map> indexMetaDataToRemoveAfterRemovingSnapsh .map(indexMetaDataGenerations::getIndexMetaBlobId) .collect(Collectors.toSet()); final Map> toRemove = new HashMap<>(); - for (IndexId indexId : indicesForSnapshot) { + while (indicesForSnapshot.hasNext()) { + final var indexId = indicesForSnapshot.next(); for (SnapshotId snapshotId : snapshotIds) { final String identifier = indexMetaDataGenerations.indexMetaBlobId(snapshotId, indexId); if (allRemainingIdentifiers.contains(identifier) == false) { diff --git a/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java b/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java index ffe63b54854b5..79e2ed3c5c206 100644 --- a/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java +++ b/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java @@ -72,6 +72,7 @@ import org.elasticsearch.common.util.concurrent.EsExecutors; import org.elasticsearch.common.util.concurrent.FutureUtils; import org.elasticsearch.common.util.concurrent.ListenableFuture; +import org.elasticsearch.common.util.concurrent.ThrottledIterator; import org.elasticsearch.common.util.concurrent.ThrottledTaskRunner; import org.elasticsearch.common.xcontent.ChunkedToXContent; import org.elasticsearch.common.xcontent.LoggingDeprecationHandler; @@ -1140,11 +1141,23 @@ void runCleanup(ActionListener listener) { // Updating the shard-level metadata and accumulating results private void writeUpdatedShardMetadataAndComputeDeletes(ActionListener listener) { - try (var listeners = new RefCountingListener(listener)) { - for (IndexId indexId : originalRepositoryData.indicesToUpdateAfterRemovingSnapshot(snapshotIds)) { - new IndexSnapshotsDeletion(indexId).run(listeners.acquire()); - } - } + // noinspection resource -- closed safely at the end of the iteration + final var listeners = new RefCountingListener(listener); + + // Each per-index process takes some nonzero amount of working memory to hold the relevant snapshot IDs and metadata generations + // etc. which we can keep under tighter limits and release sooner if we limit the number of concurrently processing indices. + // Each one needs at least one snapshot thread at all times, so threadPool.info(SNAPSHOT).getMax() of them at once is enough to + // keep the threadpool fully utilized. + ThrottledIterator.run( + originalRepositoryData.indicesToUpdateAfterRemovingSnapshot(snapshotIds), + (ref, indexId) -> ActionListener.run( + ActionListener.releaseAfter(listeners.acquire(), ref), + l -> new IndexSnapshotsDeletion(indexId).run(l) + ), + threadPool.info(ThreadPool.Names.SNAPSHOT).getMax(), + () -> {}, + listeners::close + ); } private class IndexSnapshotsDeletion { diff --git a/server/src/test/java/org/elasticsearch/action/admin/cluster/node/info/NodesInfoRequestTests.java b/server/src/test/java/org/elasticsearch/action/admin/cluster/node/info/NodesInfoRequestTests.java index 16c0ed251aa3d..1bad105e72180 100644 --- a/server/src/test/java/org/elasticsearch/action/admin/cluster/node/info/NodesInfoRequestTests.java +++ b/server/src/test/java/org/elasticsearch/action/admin/cluster/node/info/NodesInfoRequestTests.java @@ -8,17 +8,14 @@ package org.elasticsearch.action.admin.cluster.node.info; -import org.elasticsearch.common.io.stream.BytesStreamOutput; -import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.test.ESTestCase; import java.util.HashSet; import java.util.Set; +import java.util.stream.Collectors; import static org.hamcrest.Matchers.empty; import static org.hamcrest.Matchers.equalTo; -import static org.hamcrest.Matchers.in; -import static org.hamcrest.Matchers.not; /** * Granular tests for the {@link NodesInfoRequest} class. Higher-level tests @@ -30,35 +27,27 @@ public class NodesInfoRequestTests extends ESTestCase { * Make sure that we can set, serialize, and deserialize arbitrary sets * of metrics. */ - public void testAddMetricsSet() throws Exception { - NodesInfoRequest request = new NodesInfoRequest(randomAlphaOfLength(8)); - randomSubsetOf(NodesInfoMetrics.Metric.allMetrics()).forEach(request::addMetric); - NodesInfoRequest deserializedRequest = roundTripRequest(request); - assertThat(request.requestedMetrics(), equalTo(deserializedRequest.requestedMetrics())); - } - - /** - * Check that we can add a metric. - */ - public void testAddSingleMetric() throws Exception { - NodesInfoRequest request = new NodesInfoRequest(randomAlphaOfLength(8)); - request.addMetric(randomFrom(NodesInfoMetrics.Metric.allMetrics())); - NodesInfoRequest deserializedRequest = roundTripRequest(request); - assertThat(request.requestedMetrics(), equalTo(deserializedRequest.requestedMetrics())); + public void testAddMetricsSet() { + final NodesInfoRequest request = new NodesInfoRequest(randomAlphaOfLength(8)); + request.clear(); + final var requestedMetrics = randomSubsetOf(NodesInfoMetrics.Metric.allMetrics()); + requestedMetrics.forEach(request::addMetric); + assertThat(request.requestedMetrics(), equalTo(Set.copyOf(requestedMetrics))); } /** * Check that we can remove a metric. */ - public void testRemoveSingleMetric() throws Exception { + public void testRemoveSingleMetric() { NodesInfoRequest request = new NodesInfoRequest(randomAlphaOfLength(8)); request.all(); String metric = randomFrom(NodesInfoMetrics.Metric.allMetrics()); request.removeMetric(metric); - NodesInfoRequest deserializedRequest = roundTripRequest(request); - assertThat(request.requestedMetrics(), equalTo(deserializedRequest.requestedMetrics())); - assertThat(metric, not(in(request.requestedMetrics()))); + assertThat( + request.requestedMetrics(), + equalTo(NodesInfoMetrics.Metric.allMetrics().stream().filter(m -> m.equals(metric) == false).collect(Collectors.toSet())) + ); } /** @@ -118,18 +107,4 @@ public void testUnknownMetricsRejected() { exception = expectThrows(IllegalStateException.class, () -> request.addMetrics(unknownMetrics.toArray(String[]::new))); assertThat(exception.getMessage(), equalTo("Used illegal metrics: [" + unknownMetric1 + ", " + unknownMetric2 + "]")); } - - /** - * Serialize and deserialize a request. - * @param request A request to serialize. - * @return The deserialized, "round-tripped" request. - */ - private static NodesInfoRequest roundTripRequest(NodesInfoRequest request) throws Exception { - try (BytesStreamOutput out = new BytesStreamOutput()) { - request.writeTo(out); - try (StreamInput in = out.bytes().streamInput()) { - return new NodesInfoRequest(in); - } - } - } } diff --git a/server/src/test/java/org/elasticsearch/action/admin/cluster/node/shutdown/NodePrevalidateShardPathRequestSerializationTests.java b/server/src/test/java/org/elasticsearch/action/admin/cluster/node/shutdown/NodePrevalidateShardPathRequestSerializationTests.java index 16a81f212458d..ea150a343cdb1 100644 --- a/server/src/test/java/org/elasticsearch/action/admin/cluster/node/shutdown/NodePrevalidateShardPathRequestSerializationTests.java +++ b/server/src/test/java/org/elasticsearch/action/admin/cluster/node/shutdown/NodePrevalidateShardPathRequestSerializationTests.java @@ -11,7 +11,7 @@ import org.elasticsearch.common.io.stream.Writeable; import org.elasticsearch.test.AbstractWireSerializingTestCase; -import static org.elasticsearch.action.admin.cluster.node.shutdown.PrevalidateShardPathRequestSerializationTests.createSetMutation; +import static org.elasticsearch.action.admin.cluster.node.shutdown.PrevalidateShardPathRequestSerializationTestUtils.createSetMutation; public class NodePrevalidateShardPathRequestSerializationTests extends AbstractWireSerializingTestCase { @@ -22,13 +22,13 @@ protected Writeable.Reader instanceReader() { @Override protected NodePrevalidateShardPathRequest createTestInstance() { - return new NodePrevalidateShardPathRequest(randomSet(0, 50, PrevalidateShardPathRequestSerializationTests::randomShardId)); + return new NodePrevalidateShardPathRequest(randomSet(0, 50, PrevalidateShardPathRequestSerializationTestUtils::randomShardId)); } @Override protected NodePrevalidateShardPathRequest mutateInstance(NodePrevalidateShardPathRequest request) { return new NodePrevalidateShardPathRequest( - createSetMutation(request.getShardIds(), PrevalidateShardPathRequestSerializationTests::randomShardId) + createSetMutation(request.getShardIds(), PrevalidateShardPathRequestSerializationTestUtils::randomShardId) ); } } diff --git a/server/src/test/java/org/elasticsearch/action/admin/cluster/node/shutdown/NodePrevalidateShardPathResponseSerializationTests.java b/server/src/test/java/org/elasticsearch/action/admin/cluster/node/shutdown/NodePrevalidateShardPathResponseSerializationTests.java index dcbbb0a4c30ee..78428e969984a 100644 --- a/server/src/test/java/org/elasticsearch/action/admin/cluster/node/shutdown/NodePrevalidateShardPathResponseSerializationTests.java +++ b/server/src/test/java/org/elasticsearch/action/admin/cluster/node/shutdown/NodePrevalidateShardPathResponseSerializationTests.java @@ -13,7 +13,7 @@ import org.elasticsearch.common.io.stream.Writeable; import org.elasticsearch.test.AbstractWireSerializingTestCase; -import static org.elasticsearch.action.admin.cluster.node.shutdown.PrevalidateShardPathRequestSerializationTests.createSetMutation; +import static org.elasticsearch.action.admin.cluster.node.shutdown.PrevalidateShardPathRequestSerializationTestUtils.createSetMutation; public class NodePrevalidateShardPathResponseSerializationTests extends AbstractWireSerializingTestCase { @@ -30,7 +30,7 @@ protected NodePrevalidateShardPathResponse createTestInstance() { public static NodePrevalidateShardPathResponse getRandomResponse() { return new NodePrevalidateShardPathResponse( getRandomNode(), - randomSet(0, 100, PrevalidateShardPathRequestSerializationTests::randomShardId) + randomSet(0, 100, PrevalidateShardPathRequestSerializationTestUtils::randomShardId) ); } @@ -45,7 +45,7 @@ protected NodePrevalidateShardPathResponse mutateInstance(NodePrevalidateShardPa } return new NodePrevalidateShardPathResponse( response.getNode(), - createSetMutation(response.getShardIds(), PrevalidateShardPathRequestSerializationTests::randomShardId) + createSetMutation(response.getShardIds(), PrevalidateShardPathRequestSerializationTestUtils::randomShardId) ); } } diff --git a/server/src/test/java/org/elasticsearch/action/admin/cluster/node/shutdown/PrevalidateShardPathRequestSerializationTestUtils.java b/server/src/test/java/org/elasticsearch/action/admin/cluster/node/shutdown/PrevalidateShardPathRequestSerializationTestUtils.java new file mode 100644 index 0000000000000..54d3104b1c6dc --- /dev/null +++ b/server/src/test/java/org/elasticsearch/action/admin/cluster/node/shutdown/PrevalidateShardPathRequestSerializationTestUtils.java @@ -0,0 +1,45 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +package org.elasticsearch.action.admin.cluster.node.shutdown; + +import org.elasticsearch.common.UUIDs; +import org.elasticsearch.index.shard.ShardId; + +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.function.Supplier; + +import static org.elasticsearch.test.ESTestCase.randomAlphaOfLength; +import static org.elasticsearch.test.ESTestCase.randomBoolean; +import static org.elasticsearch.test.ESTestCase.randomInt; +import static org.elasticsearch.test.ESTestCase.randomIntBetween; + +class PrevalidateShardPathRequestSerializationTestUtils { + + public static ShardId randomShardId() { + return new ShardId(randomAlphaOfLength(20), UUIDs.randomBase64UUID(), randomIntBetween(0, 25)); + } + + public static void mutateList(List list, Supplier supplier) { + if (list.size() > 0 && randomBoolean()) { + // just remove one + list.remove(randomInt(list.size() - 1)); + } else { + list.add(supplier.get()); + } + } + + public static Set createSetMutation(Set set, Supplier supplier) { + List list = new ArrayList<>(set); + mutateList(list, supplier); + return new HashSet<>(list); + } +} diff --git a/server/src/test/java/org/elasticsearch/action/admin/cluster/node/shutdown/PrevalidateShardPathRequestSerializationTests.java b/server/src/test/java/org/elasticsearch/action/admin/cluster/node/shutdown/PrevalidateShardPathRequestSerializationTests.java deleted file mode 100644 index 81123a0180c7c..0000000000000 --- a/server/src/test/java/org/elasticsearch/action/admin/cluster/node/shutdown/PrevalidateShardPathRequestSerializationTests.java +++ /dev/null @@ -1,83 +0,0 @@ -/* - * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one - * or more contributor license agreements. Licensed under the Elastic License - * 2.0 and the Server Side Public License, v 1; you may not use this file except - * in compliance with, at your election, the Elastic License 2.0 or the Server - * Side Public License, v 1. - */ - -package org.elasticsearch.action.admin.cluster.node.shutdown; - -import org.elasticsearch.common.UUIDs; -import org.elasticsearch.common.io.stream.Writeable; -import org.elasticsearch.core.TimeValue; -import org.elasticsearch.index.shard.ShardId; -import org.elasticsearch.test.AbstractWireSerializingTestCase; - -import java.util.ArrayList; -import java.util.Arrays; -import java.util.HashSet; -import java.util.List; -import java.util.Set; -import java.util.function.IntFunction; -import java.util.function.Supplier; - -public class PrevalidateShardPathRequestSerializationTests extends AbstractWireSerializingTestCase { - - @Override - protected Writeable.Reader instanceReader() { - return PrevalidateShardPathRequest::new; - } - - @Override - protected PrevalidateShardPathRequest createTestInstance() { - Set shardIds = randomSet(0, 100, PrevalidateShardPathRequestSerializationTests::randomShardId); - String[] nodeIds = randomArray(1, 5, String[]::new, () -> randomAlphaOfLength(20)); - PrevalidateShardPathRequest request = new PrevalidateShardPathRequest(shardIds, nodeIds); - return randomBoolean() ? request : request.timeout(randomTimeValue()); - } - - @Override - protected PrevalidateShardPathRequest mutateInstance(PrevalidateShardPathRequest request) { - int i = randomInt(2); - return switch (i) { - case 0 -> new PrevalidateShardPathRequest( - createSetMutation(request.getShardIds(), PrevalidateShardPathRequestSerializationTests::randomShardId), - request.nodesIds() - ).timeout(request.timeout()); - case 1 -> new PrevalidateShardPathRequest( - request.getShardIds(), - createArrayMutation(request.nodesIds(), () -> randomAlphaOfLength(20), String[]::new) - ).timeout(request.timeout()); - case 2 -> new PrevalidateShardPathRequest(request.getShardIds(), request.nodesIds()).timeout( - randomValueOtherThan(request.timeout(), () -> new TimeValue(randomLongBetween(1000, 10000))) - ); - default -> throw new IllegalStateException("unexpected value: " + i); - }; - } - - public static ShardId randomShardId() { - return new ShardId(randomAlphaOfLength(20), UUIDs.randomBase64UUID(), randomIntBetween(0, 25)); - } - - public static void mutateList(List list, Supplier supplier) { - if (list.size() > 0 && randomBoolean()) { - // just remove one - list.remove(randomInt(list.size() - 1)); - } else { - list.add(supplier.get()); - } - } - - public static Set createSetMutation(Set set, Supplier supplier) { - List list = new ArrayList<>(set); - mutateList(list, supplier); - return new HashSet<>(list); - } - - public static T[] createArrayMutation(T[] array, Supplier supplier, IntFunction arrayConstructor) { - List list = new ArrayList<>(Arrays.asList(array)); - mutateList(list, supplier); - return list.toArray(arrayConstructor.apply(list.size())); - } -} diff --git a/server/src/test/java/org/elasticsearch/action/admin/cluster/node/tasks/TaskManagerTestCase.java b/server/src/test/java/org/elasticsearch/action/admin/cluster/node/tasks/TaskManagerTestCase.java index 6ca3e6bbe5400..882da84d22fe2 100644 --- a/server/src/test/java/org/elasticsearch/action/admin/cluster/node/tasks/TaskManagerTestCase.java +++ b/server/src/test/java/org/elasticsearch/action/admin/cluster/node/tasks/TaskManagerTestCase.java @@ -148,11 +148,9 @@ abstract class AbstractTestNodesAction()), - request, nodeRequest, threadPool.executor(ThreadPool.Names.GENERIC) ); diff --git a/server/src/test/java/org/elasticsearch/action/admin/cluster/node/tasks/TestTaskPlugin.java b/server/src/test/java/org/elasticsearch/action/admin/cluster/node/tasks/TestTaskPlugin.java index 439c87bdeace6..1a772f77f2d91 100644 --- a/server/src/test/java/org/elasticsearch/action/admin/cluster/node/tasks/TestTaskPlugin.java +++ b/server/src/test/java/org/elasticsearch/action/admin/cluster/node/tasks/TestTaskPlugin.java @@ -263,11 +263,9 @@ public static class TransportTestTaskAction extends TransportNodesAction()), - NodesRequest::new, NodeRequest::new, threadPool.executor(ThreadPool.Names.GENERIC) ); diff --git a/server/src/test/java/org/elasticsearch/action/support/nodes/TransportNodesActionTests.java b/server/src/test/java/org/elasticsearch/action/support/nodes/TransportNodesActionTests.java index 5454ab3eb6f50..9b2063d742f2e 100644 --- a/server/src/test/java/org/elasticsearch/action/support/nodes/TransportNodesActionTests.java +++ b/server/src/test/java/org/elasticsearch/action/support/nodes/TransportNodesActionTests.java @@ -310,7 +310,7 @@ private static class TestTransportNodesAction extends TransportNodesAction< Writeable.Reader nodeRequest, Executor nodeExecutor ) { - super("indices:admin/test", threadPool, clusterService, transportService, actionFilters, request, nodeRequest, nodeExecutor); + super("indices:admin/test", clusterService, transportService, actionFilters, nodeRequest, nodeExecutor); } @Override diff --git a/server/src/test/java/org/elasticsearch/repositories/RepositoryDataTests.java b/server/src/test/java/org/elasticsearch/repositories/RepositoryDataTests.java index 45e6750ceb0d3..bb4d4dac31a5a 100644 --- a/server/src/test/java/org/elasticsearch/repositories/RepositoryDataTests.java +++ b/server/src/test/java/org/elasticsearch/repositories/RepositoryDataTests.java @@ -62,10 +62,7 @@ public void testIndicesToUpdateAfterRemovingSnapshot() { final List snapshotIds = repositoryData.getSnapshots(index); return snapshotIds.contains(randomSnapshot) && snapshotIds.size() > 1; }).toArray(IndexId[]::new); - assertThat( - repositoryData.indicesToUpdateAfterRemovingSnapshot(Collections.singleton(randomSnapshot)), - containsInAnyOrder(indicesToUpdate) - ); + assertThat(getIndicesToUpdateAfterRemovingSnapshot(repositoryData, randomSnapshot), containsInAnyOrder(indicesToUpdate)); } public void testXContent() throws IOException { @@ -347,7 +344,7 @@ public void testIndexMetaDataToRemoveAfterRemovingSnapshotNoSharing() { final RepositoryData repositoryData = generateRandomRepoData(); final SnapshotId snapshotId = randomFrom(repositoryData.getSnapshotIds()); final IndexMetaDataGenerations indexMetaDataGenerations = repositoryData.indexMetaDataGenerations(); - final Collection indicesToUpdate = repositoryData.indicesToUpdateAfterRemovingSnapshot(Collections.singleton(snapshotId)); + final Collection indicesToUpdate = getIndicesToUpdateAfterRemovingSnapshot(repositoryData, snapshotId); final Map> identifiersToRemove = indexMetaDataGenerations.lookup.get(snapshotId) .entrySet() .stream() @@ -485,4 +482,10 @@ private static Map> randomIndices(final Map getIndicesToUpdateAfterRemovingSnapshot(RepositoryData repositoryData, SnapshotId snapshotToDelete) { + final var result = new ArrayList(); + repositoryData.indicesToUpdateAfterRemovingSnapshot(List.of(snapshotToDelete)).forEachRemaining(result::add); + return result; + } } diff --git a/server/src/test/java/org/elasticsearch/repositories/blobstore/BlobStoreRepositoryDeleteThrottlingTests.java b/server/src/test/java/org/elasticsearch/repositories/blobstore/BlobStoreRepositoryDeleteThrottlingTests.java new file mode 100644 index 0000000000000..0d322cf2542a1 --- /dev/null +++ b/server/src/test/java/org/elasticsearch/repositories/blobstore/BlobStoreRepositoryDeleteThrottlingTests.java @@ -0,0 +1,197 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +package org.elasticsearch.repositories.blobstore; + +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.blobstore.BlobContainer; +import org.elasticsearch.common.blobstore.BlobPath; +import org.elasticsearch.common.blobstore.BlobStore; +import org.elasticsearch.common.blobstore.OperationPurpose; +import org.elasticsearch.common.blobstore.support.FilterBlobContainer; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.util.BigArrays; +import org.elasticsearch.common.util.concurrent.ConcurrentCollections; +import org.elasticsearch.core.CheckedConsumer; +import org.elasticsearch.env.Environment; +import org.elasticsearch.indices.recovery.RecoverySettings; +import org.elasticsearch.plugins.Plugin; +import org.elasticsearch.plugins.RepositoryPlugin; +import org.elasticsearch.repositories.Repository; +import org.elasticsearch.repositories.fs.FsRepository; +import org.elasticsearch.test.ESIntegTestCase; +import org.elasticsearch.test.ESSingleNodeTestCase; +import org.elasticsearch.xcontent.NamedXContentRegistry; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.util.Collection; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.CountDownLatch; + +import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; +import static org.hamcrest.Matchers.lessThanOrEqualTo; + +public class BlobStoreRepositoryDeleteThrottlingTests extends ESSingleNodeTestCase { + + // This test ensures that we appropriately throttle the per-index activity when deleting a snapshot by marking an index as "active" when + // its index metadata is read, and then as "inactive" when it updates the shard-level metadata. Without throttling, we would pretty much + // read all the index metadata first, and then update all the shard-level metadata. With too much throttling, we would work one index at + // a time and would not fully utilize all snapshot threads. This test shows that we do neither of these things. + + private static final String TEST_REPO_TYPE = "concurrency-limiting-fs"; + private static final String TEST_REPO_NAME = "test-repo"; + private static final int MAX_SNAPSHOT_THREADS = 3; + + @Override + protected Settings nodeSettings() { + return Settings.builder().put(super.nodeSettings()).put("thread_pool.snapshot.max", MAX_SNAPSHOT_THREADS).build(); + } + + protected Collection> getPlugins() { + return List.of(ConcurrencyLimitingFsRepositoryPlugin.class); + } + + public static class ConcurrencyLimitingFsRepositoryPlugin extends Plugin implements RepositoryPlugin { + @Override + public Map getRepositories( + Environment env, + NamedXContentRegistry namedXContentRegistry, + ClusterService clusterService, + BigArrays bigArrays, + RecoverySettings recoverySettings + ) { + return Collections.singletonMap( + TEST_REPO_TYPE, + (metadata) -> new FsRepository(metadata, env, namedXContentRegistry, clusterService, bigArrays, recoverySettings) { + @Override + protected BlobStore createBlobStore() throws Exception { + return new ConcurrencyLimitingBlobStore(super.createBlobStore()); + } + } + ); + } + } + + private static class ConcurrencyLimitingBlobStore implements BlobStore { + private final BlobStore delegate; + private final Set activeIndices = ConcurrentCollections.newConcurrentSet(); + private final CountDownLatch countDownLatch = new CountDownLatch(MAX_SNAPSHOT_THREADS); + + private ConcurrencyLimitingBlobStore(BlobStore delegate) { + this.delegate = delegate; + } + + @Override + public BlobContainer blobContainer(BlobPath path) { + return new ConcurrencyLimitingBlobContainer(delegate.blobContainer(path), activeIndices, countDownLatch); + } + + @Override + public void deleteBlobsIgnoringIfNotExists(OperationPurpose purpose, Iterator blobNames) throws IOException { + delegate.deleteBlobsIgnoringIfNotExists(purpose, blobNames); + } + + @Override + public void close() throws IOException { + delegate.close(); + } + } + + private static class ConcurrencyLimitingBlobContainer extends FilterBlobContainer { + private final Set activeIndices; + private final CountDownLatch countDownLatch; + + ConcurrencyLimitingBlobContainer(BlobContainer delegate, Set activeIndices, CountDownLatch countDownLatch) { + super(delegate); + this.activeIndices = activeIndices; + this.countDownLatch = countDownLatch; + } + + @Override + protected BlobContainer wrapChild(BlobContainer child) { + return new ConcurrencyLimitingBlobContainer(child, activeIndices, countDownLatch); + } + + @Override + public InputStream readBlob(OperationPurpose purpose, String blobName) throws IOException { + final var pathParts = path().parts(); + if (pathParts.size() == 2 && pathParts.get(0).equals("indices") && blobName.startsWith("meta-")) { + // reading index metadata, so mark index as active + assertTrue(activeIndices.add(pathParts.get(1))); + assertThat(activeIndices.size(), lessThanOrEqualTo(MAX_SNAPSHOT_THREADS)); + countDownLatch.countDown(); + safeAwait(countDownLatch); // ensure that we do use all the threads + } + return super.readBlob(purpose, blobName); + } + + @Override + public void writeMetadataBlob( + OperationPurpose purpose, + String blobName, + boolean failIfAlreadyExists, + boolean atomic, + CheckedConsumer writer + ) throws IOException { + final var pathParts = path().parts(); + if (pathParts.size() == 3 + && pathParts.get(0).equals("indices") + && pathParts.get(2).equals("0") + && blobName.startsWith("index-")) { + // writing shard-level BlobStoreIndexShardSnapshots, mark index as inactive again + assertTrue(activeIndices.remove(pathParts.get(1))); + } + super.writeMetadataBlob(purpose, blobName, failIfAlreadyExists, atomic, writer); + } + } + + public void testDeleteThrottling() { + final var repoPath = ESIntegTestCase.randomRepoPath(node().settings()); + + // Create enough indices that we cannot process them all at once + + for (int i = 0; i < 3 * MAX_SNAPSHOT_THREADS; i++) { + createIndex("index-" + i, indexSettings(between(1, 3), 0).build()); + } + + // Set up the repository contents including containing a couple of snapshots, using a regular 'fs' repo + + assertAcked( + client().admin() + .cluster() + .preparePutRepository(TEST_REPO_NAME) + .setType(FsRepository.TYPE) + .setSettings(Settings.builder().put("location", repoPath)) + ); + + client().admin().cluster().prepareCreateSnapshot(TEST_REPO_NAME, "snapshot-1").setWaitForCompletion(true).get(); + client().admin().cluster().prepareCreateSnapshot(TEST_REPO_NAME, "snapshot-2").setWaitForCompletion(true).get(); + + assertAcked(client().admin().cluster().prepareDeleteRepository(TEST_REPO_NAME)); + + // Now delete one of the snapshots using the test repo implementation which verifies the throttling behaviour + + assertAcked( + client().admin() + .cluster() + .preparePutRepository(TEST_REPO_NAME) + .setType(TEST_REPO_TYPE) + .setSettings(Settings.builder().put("location", repoPath)) + ); + + assertAcked(client().admin().cluster().prepareDeleteSnapshot(TEST_REPO_NAME, "snapshot-1").get()); + + assertAcked(client().admin().cluster().prepareDeleteRepository(TEST_REPO_NAME)); + } +} diff --git a/test/external-modules/seek-tracking-directory/src/main/java/org/elasticsearch/test/seektracker/SeekStatsAction.java b/test/external-modules/seek-tracking-directory/src/main/java/org/elasticsearch/test/seektracker/SeekStatsAction.java index 6609382e16131..258cbaec3281c 100644 --- a/test/external-modules/seek-tracking-directory/src/main/java/org/elasticsearch/test/seektracker/SeekStatsAction.java +++ b/test/external-modules/seek-tracking-directory/src/main/java/org/elasticsearch/test/seektracker/SeekStatsAction.java @@ -9,6 +9,7 @@ package org.elasticsearch.test.seektracker; import org.elasticsearch.action.ActionType; +import org.elasticsearch.common.io.stream.Writeable; public class SeekStatsAction extends ActionType { @@ -16,6 +17,6 @@ public class SeekStatsAction extends ActionType { public static final String NAME = "cluster:monitor/seek_stats"; public SeekStatsAction() { - super(NAME, SeekStatsResponse::new); + super(NAME, Writeable.Reader.localOnly()); } } diff --git a/test/external-modules/seek-tracking-directory/src/main/java/org/elasticsearch/test/seektracker/SeekStatsResponse.java b/test/external-modules/seek-tracking-directory/src/main/java/org/elasticsearch/test/seektracker/SeekStatsResponse.java index c07d4bf18e603..27c28345091e7 100644 --- a/test/external-modules/seek-tracking-directory/src/main/java/org/elasticsearch/test/seektracker/SeekStatsResponse.java +++ b/test/external-modules/seek-tracking-directory/src/main/java/org/elasticsearch/test/seektracker/SeekStatsResponse.java @@ -9,6 +9,7 @@ package org.elasticsearch.test.seektracker; import org.elasticsearch.action.FailedNodeException; +import org.elasticsearch.action.support.TransportAction; import org.elasticsearch.action.support.nodes.BaseNodesResponse; import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.common.Strings; @@ -30,18 +31,14 @@ public SeekStatsResponse(ClusterName clusterName, List seekStats, super(clusterName, seekStats, failures); } - public SeekStatsResponse(StreamInput in) throws IOException { - super(in); - } - @Override - protected List readNodesFrom(StreamInput in) throws IOException { - return in.readCollectionAsList(NodeSeekStats::new); + protected List readNodesFrom(StreamInput in) { + return TransportAction.localOnly(); } @Override - protected void writeNodesTo(StreamOutput out, List nodes) throws IOException { - out.writeCollection(nodes); + protected void writeNodesTo(StreamOutput out, List nodes) { + TransportAction.localOnly(); } @Override diff --git a/test/external-modules/seek-tracking-directory/src/main/java/org/elasticsearch/test/seektracker/TransportSeekStatsAction.java b/test/external-modules/seek-tracking-directory/src/main/java/org/elasticsearch/test/seektracker/TransportSeekStatsAction.java index bfc528f04b215..f77a31389bd1f 100644 --- a/test/external-modules/seek-tracking-directory/src/main/java/org/elasticsearch/test/seektracker/TransportSeekStatsAction.java +++ b/test/external-modules/seek-tracking-directory/src/main/java/org/elasticsearch/test/seektracker/TransportSeekStatsAction.java @@ -38,12 +38,10 @@ public TransportSeekStatsAction( ) { super( SeekStatsAction.NAME, - threadPool, clusterService, transportService, actionFilters, SeekStatsRequest::new, - SeekStatsRequest::new, threadPool.executor(ThreadPool.Names.MANAGEMENT) ); this.seekStatsService = seekStatsService; @@ -56,6 +54,7 @@ protected SeekStatsResponse newResponse(SeekStatsRequest request, List nodeResponses = Arrays.stream(nodeUsages) - .map(usage -> action(usage).nodeOperation(new AnalyticsStatsAction.NodeRequest(request), null)) + .map(usage -> action(usage).nodeOperation(new AnalyticsStatsAction.NodeRequest(), null)) .collect(toList()); AnalyticsStatsAction.Response response = new AnalyticsStatsAction.Response( new ClusterName("cluster_name"), diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/analytics/action/AnalyticsStatsAction.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/analytics/action/AnalyticsStatsAction.java index ad4c54b0d0465..48a248d075ca5 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/analytics/action/AnalyticsStatsAction.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/analytics/action/AnalyticsStatsAction.java @@ -33,7 +33,7 @@ public class AnalyticsStatsAction extends ActionType implements Writeable, ToXContentObject { diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/TrainedModelCacheInfoAction.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/TrainedModelCacheInfoAction.java index e3d6f578d2d1f..e58742799d4e6 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/TrainedModelCacheInfoAction.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/TrainedModelCacheInfoAction.java @@ -30,7 +30,7 @@ public class TrainedModelCacheInfoAction extends ActionType { diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/security/action/ClearSecurityCacheAction.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/security/action/ClearSecurityCacheAction.java index 9801212620f89..5c7bdb39dc49e 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/security/action/ClearSecurityCacheAction.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/security/action/ClearSecurityCacheAction.java @@ -8,6 +8,7 @@ package org.elasticsearch.xpack.core.security.action; import org.elasticsearch.action.ActionType; +import org.elasticsearch.common.io.stream.Writeable; public class ClearSecurityCacheAction extends ActionType { @@ -15,6 +16,6 @@ public class ClearSecurityCacheAction extends ActionType implements ToXContentFragment { - public ClearSecurityCacheResponse(StreamInput in) throws IOException { - super(in); - } - public ClearSecurityCacheResponse(ClusterName clusterName, List nodes, List failures) { super(clusterName, nodes, failures); } @Override protected List readNodesFrom(StreamInput in) throws IOException { - return in.readCollectionAsList(Node::new); + return TransportAction.localOnly(); } @Override protected void writeNodesTo(StreamOutput out, List nodes) throws IOException { - out.writeCollection(nodes); + TransportAction.localOnly(); } @Override diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/security/action/privilege/ClearPrivilegesCacheAction.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/security/action/privilege/ClearPrivilegesCacheAction.java index 4bd58f903fbb9..39f62d0a1241d 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/security/action/privilege/ClearPrivilegesCacheAction.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/security/action/privilege/ClearPrivilegesCacheAction.java @@ -8,6 +8,7 @@ package org.elasticsearch.xpack.core.security.action.privilege; import org.elasticsearch.action.ActionType; +import org.elasticsearch.common.io.stream.Writeable; public class ClearPrivilegesCacheAction extends ActionType { @@ -15,6 +16,6 @@ public class ClearPrivilegesCacheAction extends ActionType implements ToXContentFragment { - public ClearPrivilegesCacheResponse(StreamInput in) throws IOException { - super(in); - } - public ClearPrivilegesCacheResponse(ClusterName clusterName, List nodes, List failures) { super(clusterName, nodes, failures); } @Override protected List readNodesFrom(StreamInput in) throws IOException { - return in.readCollectionAsList(Node::new); + return TransportAction.localOnly(); } @Override protected void writeNodesTo(StreamOutput out, List nodes) throws IOException { - out.writeCollection(nodes); + TransportAction.localOnly(); } @Override diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/security/action/realm/ClearRealmCacheAction.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/security/action/realm/ClearRealmCacheAction.java index 99b977c963491..443babe3fdee7 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/security/action/realm/ClearRealmCacheAction.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/security/action/realm/ClearRealmCacheAction.java @@ -7,6 +7,7 @@ package org.elasticsearch.xpack.core.security.action.realm; import org.elasticsearch.action.ActionType; +import org.elasticsearch.common.io.stream.Writeable; public class ClearRealmCacheAction extends ActionType { @@ -14,6 +15,6 @@ public class ClearRealmCacheAction extends ActionType { public static final String NAME = "cluster:admin/xpack/security/realm/cache/clear"; protected ClearRealmCacheAction() { - super(NAME, ClearRealmCacheResponse::new); + super(NAME, Writeable.Reader.localOnly()); } } diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/security/action/realm/ClearRealmCacheRequest.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/security/action/realm/ClearRealmCacheRequest.java index 1eb214af7c604..ceee6cea8481a 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/security/action/realm/ClearRealmCacheRequest.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/security/action/realm/ClearRealmCacheRequest.java @@ -6,6 +6,7 @@ */ package org.elasticsearch.xpack.core.security.action.realm; +import org.elasticsearch.action.support.TransportAction; import org.elasticsearch.action.support.nodes.BaseNodesRequest; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; @@ -22,12 +23,6 @@ public ClearRealmCacheRequest() { super((String[]) null); } - public ClearRealmCacheRequest(StreamInput in) throws IOException { - super(in); - realms = in.readStringArray(); - usernames = in.readStringArray(); - } - /** * @return {@code true} if this request targets realms, {@code false} otherwise. */ @@ -80,9 +75,7 @@ public ClearRealmCacheRequest usernames(String... usernames) { @Override public void writeTo(StreamOutput out) throws IOException { - super.writeTo(out); - out.writeStringArrayNullable(realms); - out.writeStringArrayNullable(usernames); + TransportAction.localOnly(); } public static class Node extends TransportRequest { diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/security/action/realm/ClearRealmCacheResponse.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/security/action/realm/ClearRealmCacheResponse.java index 51c1746792067..8c2b086ef3a72 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/security/action/realm/ClearRealmCacheResponse.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/security/action/realm/ClearRealmCacheResponse.java @@ -7,6 +7,7 @@ package org.elasticsearch.xpack.core.security.action.realm; import org.elasticsearch.action.FailedNodeException; +import org.elasticsearch.action.support.TransportAction; import org.elasticsearch.action.support.nodes.BaseNodeResponse; import org.elasticsearch.action.support.nodes.BaseNodesResponse; import org.elasticsearch.cluster.ClusterName; @@ -22,22 +23,18 @@ public class ClearRealmCacheResponse extends BaseNodesResponse implements ToXContentFragment { - public ClearRealmCacheResponse(StreamInput in) throws IOException { - super(in); - } - public ClearRealmCacheResponse(ClusterName clusterName, List nodes, List failures) { super(clusterName, nodes, failures); } @Override protected List readNodesFrom(StreamInput in) throws IOException { - return in.readCollectionAsList(Node::new); + return TransportAction.localOnly(); } @Override protected void writeNodesTo(StreamOutput out, List nodes) throws IOException { - out.writeCollection(nodes); + TransportAction.localOnly(); } @Override diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/security/action/role/ClearRolesCacheAction.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/security/action/role/ClearRolesCacheAction.java index d91239393c1cd..cef2665df87be 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/security/action/role/ClearRolesCacheAction.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/security/action/role/ClearRolesCacheAction.java @@ -7,6 +7,7 @@ package org.elasticsearch.xpack.core.security.action.role; import org.elasticsearch.action.ActionType; +import org.elasticsearch.common.io.stream.Writeable; /** * The action for clearing the cache used by native roles that are stored in an index. @@ -17,6 +18,6 @@ public class ClearRolesCacheAction extends ActionType { public static final String NAME = "cluster:admin/xpack/security/roles/cache/clear"; protected ClearRolesCacheAction() { - super(NAME, ClearRolesCacheResponse::new); + super(NAME, Writeable.Reader.localOnly()); } } diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/security/action/role/ClearRolesCacheRequest.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/security/action/role/ClearRolesCacheRequest.java index 9d24bee7b0c00..0d06382a891da 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/security/action/role/ClearRolesCacheRequest.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/security/action/role/ClearRolesCacheRequest.java @@ -6,6 +6,7 @@ */ package org.elasticsearch.xpack.core.security.action.role; +import org.elasticsearch.action.support.TransportAction; import org.elasticsearch.action.support.nodes.BaseNodesRequest; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; @@ -24,11 +25,6 @@ public ClearRolesCacheRequest() { super((String[]) null); } - public ClearRolesCacheRequest(StreamInput in) throws IOException { - super(in); - names = in.readOptionalStringArray(); - } - /** * Sets the roles for which caches will be evicted. When not set all the roles will be evicted from the cache. * @@ -48,8 +44,7 @@ public String[] names() { @Override public void writeTo(StreamOutput out) throws IOException { - super.writeTo(out); - out.writeOptionalStringArray(names); + TransportAction.localOnly(); } public static class Node extends TransportRequest { diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/security/action/role/ClearRolesCacheResponse.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/security/action/role/ClearRolesCacheResponse.java index 6b09ebffa1f21..554d7204985db 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/security/action/role/ClearRolesCacheResponse.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/security/action/role/ClearRolesCacheResponse.java @@ -7,6 +7,7 @@ package org.elasticsearch.xpack.core.security.action.role; import org.elasticsearch.action.FailedNodeException; +import org.elasticsearch.action.support.TransportAction; import org.elasticsearch.action.support.nodes.BaseNodeResponse; import org.elasticsearch.action.support.nodes.BaseNodesResponse; import org.elasticsearch.cluster.ClusterName; @@ -25,22 +26,18 @@ */ public class ClearRolesCacheResponse extends BaseNodesResponse implements ToXContentFragment { - public ClearRolesCacheResponse(StreamInput in) throws IOException { - super(in); - } - public ClearRolesCacheResponse(ClusterName clusterName, List nodes, List failures) { super(clusterName, nodes, failures); } @Override protected List readNodesFrom(StreamInput in) throws IOException { - return in.readCollectionAsList(Node::new); + return TransportAction.localOnly(); } @Override protected void writeNodesTo(StreamOutput out, List nodes) throws IOException { - out.writeCollection(nodes); + TransportAction.localOnly(); } @Override diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/security/action/service/GetServiceAccountCredentialsNodesRequest.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/security/action/service/GetServiceAccountCredentialsNodesRequest.java index 89ff31e3fde81..a2ebb338c15f0 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/security/action/service/GetServiceAccountCredentialsNodesRequest.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/security/action/service/GetServiceAccountCredentialsNodesRequest.java @@ -7,6 +7,7 @@ package org.elasticsearch.xpack.core.security.action.service; +import org.elasticsearch.action.support.TransportAction; import org.elasticsearch.action.support.nodes.BaseNodesRequest; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; @@ -29,17 +30,9 @@ public GetServiceAccountCredentialsNodesRequest(String namespace, String service this.serviceName = serviceName; } - public GetServiceAccountCredentialsNodesRequest(StreamInput in) throws IOException { - super(in); - this.namespace = in.readString(); - this.serviceName = in.readString(); - } - @Override public void writeTo(StreamOutput out) throws IOException { - super.writeTo(out); - out.writeString(namespace); - out.writeString(serviceName); + TransportAction.localOnly(); } public static class Node extends TransportRequest { diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/security/action/service/GetServiceAccountNodesCredentialsAction.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/security/action/service/GetServiceAccountNodesCredentialsAction.java index ae13d8c04038c..20c672716431d 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/security/action/service/GetServiceAccountNodesCredentialsAction.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/security/action/service/GetServiceAccountNodesCredentialsAction.java @@ -8,6 +8,7 @@ package org.elasticsearch.xpack.core.security.action.service; import org.elasticsearch.action.ActionType; +import org.elasticsearch.common.io.stream.Writeable; public class GetServiceAccountNodesCredentialsAction extends ActionType { @@ -15,6 +16,6 @@ public class GetServiceAccountNodesCredentialsAction extends ActionType public static final String NAME = "cluster:monitor/xpack/spatial/stats"; private SpatialStatsAction() { - super(NAME, Response::new); + super(NAME, Writeable.Reader.localOnly()); } /** @@ -51,10 +52,6 @@ public Request() { super((String[]) null); } - public Request(StreamInput in) throws IOException { - super(in); - } - @Override public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { builder.startObject(); @@ -78,6 +75,11 @@ public boolean equals(Object obj) { } return true; } + + @Override + public void writeTo(StreamOutput out) throws IOException { + TransportAction.localOnly(); + } } public static class NodeRequest extends TransportRequest { @@ -85,9 +87,7 @@ public NodeRequest(StreamInput in) throws IOException { super(in); } - public NodeRequest(Request request) { - - } + public NodeRequest() {} } public static class Response extends BaseNodesResponse implements Writeable, ToXContentObject { diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/watcher/transport/actions/stats/WatcherStatsAction.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/watcher/transport/actions/stats/WatcherStatsAction.java index 8aedb26052d71..5e102ad446087 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/watcher/transport/actions/stats/WatcherStatsAction.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/watcher/transport/actions/stats/WatcherStatsAction.java @@ -7,6 +7,7 @@ package org.elasticsearch.xpack.core.watcher.transport.actions.stats; import org.elasticsearch.action.ActionType; +import org.elasticsearch.common.io.stream.Writeable; /** * This ActionType gets the stats for the watcher plugin @@ -17,6 +18,6 @@ public class WatcherStatsAction extends ActionType { public static final String NAME = "cluster:monitor/xpack/watcher/stats/dist"; private WatcherStatsAction() { - super(NAME, WatcherStatsResponse::new); + super(NAME, Writeable.Reader.localOnly()); } } diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/watcher/transport/actions/stats/WatcherStatsRequest.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/watcher/transport/actions/stats/WatcherStatsRequest.java index 10b658456aab9..ac55db16802d2 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/watcher/transport/actions/stats/WatcherStatsRequest.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/watcher/transport/actions/stats/WatcherStatsRequest.java @@ -6,6 +6,7 @@ */ package org.elasticsearch.xpack.core.watcher.transport.actions.stats; +import org.elasticsearch.action.support.TransportAction; import org.elasticsearch.action.support.nodes.BaseNodesRequest; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; @@ -26,13 +27,6 @@ public WatcherStatsRequest() { super((String[]) null); } - public WatcherStatsRequest(StreamInput in) throws IOException { - super(in); - includeCurrentWatches = in.readBoolean(); - includeQueuedWatches = in.readBoolean(); - includeStats = in.readBoolean(); - } - public boolean includeCurrentWatches() { return includeCurrentWatches; } @@ -59,10 +53,7 @@ public void includeStats(boolean includeStats) { @Override public void writeTo(StreamOutput out) throws IOException { - super.writeTo(out); - out.writeBoolean(includeCurrentWatches); - out.writeBoolean(includeQueuedWatches); - out.writeBoolean(includeStats); + TransportAction.localOnly(); } @Override diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/watcher/transport/actions/stats/WatcherStatsResponse.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/watcher/transport/actions/stats/WatcherStatsResponse.java index 1ac98cd42d2e3..223c720f05232 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/watcher/transport/actions/stats/WatcherStatsResponse.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/watcher/transport/actions/stats/WatcherStatsResponse.java @@ -7,6 +7,7 @@ package org.elasticsearch.xpack.core.watcher.transport.actions.stats; import org.elasticsearch.action.FailedNodeException; +import org.elasticsearch.action.support.TransportAction; import org.elasticsearch.action.support.nodes.BaseNodeResponse; import org.elasticsearch.action.support.nodes.BaseNodesResponse; import org.elasticsearch.cluster.ClusterName; @@ -31,11 +32,6 @@ public class WatcherStatsResponse extends BaseNodesResponse readNodesFrom(StreamInput in) throws IOException { - return in.readCollectionAsList(Node::new); + return TransportAction.localOnly(); } @Override protected void writeNodesTo(StreamOutput out, List nodes) throws IOException { - out.writeCollection(nodes); + TransportAction.localOnly(); } @Override diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/security/action/ClearSecurityCacheRequestTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/security/action/ClearSecurityCacheRequestTests.java deleted file mode 100644 index b14179f38d858..0000000000000 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/security/action/ClearSecurityCacheRequestTests.java +++ /dev/null @@ -1,33 +0,0 @@ -/* - * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one - * or more contributor license agreements. Licensed under the Elastic License - * 2.0; you may not use this file except in compliance with the Elastic License - * 2.0. - */ - -package org.elasticsearch.xpack.core.security.action; - -import org.elasticsearch.common.io.stream.BytesStreamOutput; -import org.elasticsearch.common.io.stream.StreamInput; -import org.elasticsearch.test.ESTestCase; - -import java.io.IOException; - -public class ClearSecurityCacheRequestTests extends ESTestCase { - - public void testSerialisation() throws IOException { - final String cacheName = randomAlphaOfLengthBetween(4, 8); - final String[] keys = randomArray(0, 8, String[]::new, () -> randomAlphaOfLength(12)); - final ClearSecurityCacheRequest request = new ClearSecurityCacheRequest(); - request.cacheName(cacheName).keys(keys); - - try (BytesStreamOutput out = new BytesStreamOutput()) { - request.writeTo(out); - try (StreamInput in = out.bytes().streamInput()) { - final ClearSecurityCacheRequest serialized = new ClearSecurityCacheRequest(in); - assertEquals(request.cacheName(), serialized.cacheName()); - assertArrayEquals(request.keys(), serialized.keys()); - } - } - } -} diff --git a/x-pack/plugin/deprecation/src/main/java/org/elasticsearch/xpack/deprecation/NodesDeprecationCheckAction.java b/x-pack/plugin/deprecation/src/main/java/org/elasticsearch/xpack/deprecation/NodesDeprecationCheckAction.java index 9ee62fbca057c..596fb69d06f69 100644 --- a/x-pack/plugin/deprecation/src/main/java/org/elasticsearch/xpack/deprecation/NodesDeprecationCheckAction.java +++ b/x-pack/plugin/deprecation/src/main/java/org/elasticsearch/xpack/deprecation/NodesDeprecationCheckAction.java @@ -14,6 +14,7 @@ import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.io.stream.Writeable; import org.elasticsearch.transport.TransportRequest; import org.elasticsearch.xpack.core.deprecation.DeprecationIssue; @@ -30,11 +31,12 @@ public class NodesDeprecationCheckAction extends ActionType implements ToXContentObject { @@ -41,10 +42,6 @@ public Request() { super((String[]) null); } - public Request(StreamInput in) throws IOException { - super(in); - } - @Override public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { builder.startObject(); @@ -71,26 +68,22 @@ public boolean equals(Object obj) { } public static class Response extends BaseNodesResponse implements Writeable, ToXContentObject { - public Response(StreamInput in) throws IOException { - super(in); - } - public Response(ClusterName clusterName, List nodes, List failures) { super(clusterName, nodes, failures); } @Override - protected List readNodesFrom(StreamInput in) throws IOException { - return in.readCollectionAsList(NodeResponse::new); + protected List readNodesFrom(StreamInput in) { + return TransportAction.localOnly(); } @Override - protected void writeNodesTo(StreamOutput out, List nodes) throws IOException { - out.writeCollection(nodes); + protected void writeNodesTo(StreamOutput out, List nodes) { + TransportAction.localOnly(); } @Override - public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + public XContentBuilder toXContent(XContentBuilder builder, Params params) { return builder; } @@ -113,7 +106,7 @@ public NodeRequest(StreamInput in) throws IOException { super(in); } - public NodeRequest(Request request) {} + public NodeRequest() {} } public static class NodeResponse extends BaseNodeResponse { diff --git a/x-pack/plugin/deprecation/src/main/java/org/elasticsearch/xpack/deprecation/logging/TransportDeprecationCacheResetAction.java b/x-pack/plugin/deprecation/src/main/java/org/elasticsearch/xpack/deprecation/logging/TransportDeprecationCacheResetAction.java index a73371a74b2f4..252fc04a1aac5 100644 --- a/x-pack/plugin/deprecation/src/main/java/org/elasticsearch/xpack/deprecation/logging/TransportDeprecationCacheResetAction.java +++ b/x-pack/plugin/deprecation/src/main/java/org/elasticsearch/xpack/deprecation/logging/TransportDeprecationCacheResetAction.java @@ -44,11 +44,9 @@ public TransportDeprecationCacheResetAction( ) { super( DeprecationCacheResetAction.NAME, - threadPool, clusterService, transportService, actionFilters, - DeprecationCacheResetAction.Request::new, DeprecationCacheResetAction.NodeRequest::new, threadPool.executor(ThreadPool.Names.MANAGEMENT) ); @@ -66,7 +64,7 @@ protected DeprecationCacheResetAction.Response newResponse( @Override protected DeprecationCacheResetAction.NodeRequest newNodeRequest(DeprecationCacheResetAction.Request request) { - return new DeprecationCacheResetAction.NodeRequest(request); + return new DeprecationCacheResetAction.NodeRequest(); } @Override diff --git a/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/action/EnrichCoordinatorStatsAction.java b/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/action/EnrichCoordinatorStatsAction.java index f50995f9d5b4a..a5f329c47d4d7 100644 --- a/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/action/EnrichCoordinatorStatsAction.java +++ b/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/action/EnrichCoordinatorStatsAction.java @@ -21,6 +21,7 @@ import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.io.stream.Writeable; import org.elasticsearch.tasks.Task; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportRequest; @@ -42,18 +43,18 @@ public class EnrichCoordinatorStatsAction extends ActionType { - public Request() { super(new String[0]); } - Request(StreamInput in) throws IOException { - super(in); + @Override + public void writeTo(StreamOutput out) { + org.elasticsearch.action.support.TransportAction.localOnly(); } } @@ -69,22 +70,18 @@ public static class NodeRequest extends TransportRequest { public static class Response extends BaseNodesResponse { - Response(StreamInput in) throws IOException { - super(in); - } - Response(ClusterName clusterName, List nodes, List failures) { super(clusterName, nodes, failures); } @Override protected List readNodesFrom(StreamInput in) throws IOException { - return in.readCollectionAsList(NodeResponse::new); + return org.elasticsearch.action.support.TransportAction.localOnly(); } @Override protected void writeNodesTo(StreamOutput out, List nodes) throws IOException { - out.writeCollection(nodes); + org.elasticsearch.action.support.TransportAction.localOnly(); } } @@ -139,16 +136,7 @@ public TransportAction( EnrichCache enrichCache, EnrichCoordinatorProxyAction.Coordinator coordinator ) { - super( - NAME, - threadPool, - clusterService, - transportService, - actionFilters, - Request::new, - NodeRequest::new, - threadPool.executor(ThreadPool.Names.GENERIC) - ); + super(NAME, clusterService, transportService, actionFilters, NodeRequest::new, threadPool.executor(ThreadPool.Names.GENERIC)); this.enrichCache = enrichCache; this.coordinator = coordinator; } diff --git a/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/plugin/EqlStatsAction.java b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/plugin/EqlStatsAction.java index d10931ca19cd3..4c68c7ef155d1 100644 --- a/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/plugin/EqlStatsAction.java +++ b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/plugin/EqlStatsAction.java @@ -8,6 +8,7 @@ package org.elasticsearch.xpack.eql.plugin; import org.elasticsearch.action.ActionType; +import org.elasticsearch.common.io.stream.Writeable; public class EqlStatsAction extends ActionType { @@ -15,6 +16,6 @@ public class EqlStatsAction extends ActionType { public static final String NAME = "cluster:monitor/xpack/eql/stats/dist"; private EqlStatsAction() { - super(NAME, EqlStatsResponse::new); + super(NAME, Writeable.Reader.localOnly()); } } diff --git a/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/plugin/EqlStatsRequest.java b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/plugin/EqlStatsRequest.java index bf9179ed97b36..53e9e1d1a0137 100644 --- a/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/plugin/EqlStatsRequest.java +++ b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/plugin/EqlStatsRequest.java @@ -7,6 +7,7 @@ package org.elasticsearch.xpack.eql.plugin; +import org.elasticsearch.action.support.TransportAction; import org.elasticsearch.action.support.nodes.BaseNodesRequest; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; @@ -25,11 +26,6 @@ public EqlStatsRequest() { super((String[]) null); } - public EqlStatsRequest(StreamInput in) throws IOException { - super(in); - includeStats = in.readBoolean(); - } - public boolean includeStats() { return includeStats; } @@ -40,8 +36,7 @@ public void includeStats(boolean includeStats) { @Override public void writeTo(StreamOutput out) throws IOException { - super.writeTo(out); - out.writeBoolean(includeStats); + TransportAction.localOnly(); } @Override diff --git a/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/plugin/EqlStatsResponse.java b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/plugin/EqlStatsResponse.java index b454bef8ab49f..a860f9ae31d00 100644 --- a/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/plugin/EqlStatsResponse.java +++ b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/plugin/EqlStatsResponse.java @@ -8,6 +8,7 @@ package org.elasticsearch.xpack.eql.plugin; import org.elasticsearch.action.FailedNodeException; +import org.elasticsearch.action.support.TransportAction; import org.elasticsearch.action.support.nodes.BaseNodeResponse; import org.elasticsearch.action.support.nodes.BaseNodesResponse; import org.elasticsearch.cluster.ClusterName; @@ -23,22 +24,18 @@ public class EqlStatsResponse extends BaseNodesResponse implements ToXContentObject { - public EqlStatsResponse(StreamInput in) throws IOException { - super(in); - } - public EqlStatsResponse(ClusterName clusterName, List nodes, List failures) { super(clusterName, nodes, failures); } @Override - protected List readNodesFrom(StreamInput in) throws IOException { - return in.readCollectionAsList(NodeStatsResponse::readNodeResponse); + protected List readNodesFrom(StreamInput in) { + return TransportAction.localOnly(); } @Override - protected void writeNodesTo(StreamOutput out, List nodes) throws IOException { - out.writeCollection(nodes); + protected void writeNodesTo(StreamOutput out, List nodes) { + TransportAction.localOnly(); } @Override diff --git a/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/plugin/TransportEqlStatsAction.java b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/plugin/TransportEqlStatsAction.java index 90316d8b8db44..18030c3d6207a 100644 --- a/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/plugin/TransportEqlStatsAction.java +++ b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/plugin/TransportEqlStatsAction.java @@ -43,11 +43,9 @@ public TransportEqlStatsAction( ) { super( EqlStatsAction.NAME, - threadPool, clusterService, transportService, actionFilters, - EqlStatsRequest::new, EqlStatsRequest.NodeStatsRequest::new, threadPool.executor(ThreadPool.Names.MANAGEMENT) ); diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/EsqlStatsAction.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/EsqlStatsAction.java index b1e3ac1b0717c..0c23f5f05af6f 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/EsqlStatsAction.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/EsqlStatsAction.java @@ -8,6 +8,7 @@ package org.elasticsearch.xpack.esql.plugin; import org.elasticsearch.action.ActionType; +import org.elasticsearch.common.io.stream.Writeable; public class EsqlStatsAction extends ActionType { @@ -15,6 +16,6 @@ public class EsqlStatsAction extends ActionType { public static final String NAME = "cluster:monitor/xpack/esql/stats/dist"; private EsqlStatsAction() { - super(NAME, EsqlStatsResponse::new); + super(NAME, Writeable.Reader.localOnly()); } } diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/EsqlStatsRequest.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/EsqlStatsRequest.java index 47d0c6baa12b3..2a0a148459250 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/EsqlStatsRequest.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/EsqlStatsRequest.java @@ -7,6 +7,7 @@ package org.elasticsearch.xpack.esql.plugin; +import org.elasticsearch.action.support.TransportAction; import org.elasticsearch.action.support.nodes.BaseNodesRequest; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; @@ -25,11 +26,6 @@ public EsqlStatsRequest() { super((String[]) null); } - public EsqlStatsRequest(StreamInput in) throws IOException { - super(in); - includeStats = in.readBoolean(); - } - public boolean includeStats() { return includeStats; } @@ -40,8 +36,7 @@ public void includeStats(boolean includeStats) { @Override public void writeTo(StreamOutput out) throws IOException { - super.writeTo(out); - out.writeBoolean(includeStats); + TransportAction.localOnly(); } @Override diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/EsqlStatsResponse.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/EsqlStatsResponse.java index ec75cbfd01da1..09178d26daea4 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/EsqlStatsResponse.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/EsqlStatsResponse.java @@ -8,6 +8,7 @@ package org.elasticsearch.xpack.esql.plugin; import org.elasticsearch.action.FailedNodeException; +import org.elasticsearch.action.support.TransportAction; import org.elasticsearch.action.support.nodes.BaseNodeResponse; import org.elasticsearch.action.support.nodes.BaseNodesResponse; import org.elasticsearch.cluster.ClusterName; @@ -23,22 +24,18 @@ public class EsqlStatsResponse extends BaseNodesResponse implements ToXContentObject { - public EsqlStatsResponse(StreamInput in) throws IOException { - super(in); - } - public EsqlStatsResponse(ClusterName clusterName, List nodes, List failures) { super(clusterName, nodes, failures); } @Override - protected List readNodesFrom(StreamInput in) throws IOException { - return in.readCollectionAsList(NodeStatsResponse::readNodeResponse); + protected List readNodesFrom(StreamInput in) { + return TransportAction.localOnly(); } @Override - protected void writeNodesTo(StreamOutput out, List nodes) throws IOException { - out.writeCollection(nodes); + protected void writeNodesTo(StreamOutput out, List nodes) { + TransportAction.localOnly(); } @Override diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/TransportEsqlStatsAction.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/TransportEsqlStatsAction.java index cd708f3ae18ff..21e5b18bab7cb 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/TransportEsqlStatsAction.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/TransportEsqlStatsAction.java @@ -46,11 +46,9 @@ public TransportEsqlStatsAction( ) { super( EsqlStatsAction.NAME, - threadPool, clusterService, transportService, actionFilters, - EsqlStatsRequest::new, EsqlStatsRequest.NodeStatsRequest::new, threadPool.executor(ThreadPool.Names.MANAGEMENT) ); diff --git a/x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/services/elser/ElserMlNodeService.java b/x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/services/elser/ElserMlNodeService.java index 48b6952bcc8af..53d363aace121 100644 --- a/x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/services/elser/ElserMlNodeService.java +++ b/x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/services/elser/ElserMlNodeService.java @@ -35,7 +35,7 @@ public class ElserMlNodeService implements InferenceService { - public static final String NAME = "elser_mlnode"; + public static final String NAME = "elser"; static final String ELSER_V1_MODEL = ".elser_model_1"; // Default non platform specific v2 model diff --git a/x-pack/plugin/inference/src/test/java/org/elasticsearch/xpack/inference/services/elser/ElserMlNodeServiceTests.java b/x-pack/plugin/inference/src/test/java/org/elasticsearch/xpack/inference/services/elser/ElserMlNodeServiceTests.java index 6348e1d7d4f98..56a592a490712 100644 --- a/x-pack/plugin/inference/src/test/java/org/elasticsearch/xpack/inference/services/elser/ElserMlNodeServiceTests.java +++ b/x-pack/plugin/inference/src/test/java/org/elasticsearch/xpack/inference/services/elser/ElserMlNodeServiceTests.java @@ -124,7 +124,7 @@ public void testParseConfigStrictWithUnknownSettings() { ); assertThat( e.getMessage(), - containsString("Model configuration contains settings [{foo=bar}] unknown to the [elser_mlnode] service") + containsString("Model configuration contains settings [{foo=bar}] unknown to the [elser] service") ); } else { var parsed = service.parsePersistedConfig("foo", TaskType.SPARSE_EMBEDDING, settings, Collections.emptyMap()); @@ -155,7 +155,7 @@ public void testParseConfigStrictWithUnknownSettings() { ); assertThat( e.getMessage(), - containsString("Model configuration contains settings [{foo=bar}] unknown to the [elser_mlnode] service") + containsString("Model configuration contains settings [{foo=bar}] unknown to the [elser] service") ); } else { var parsed = service.parsePersistedConfig("foo", TaskType.SPARSE_EMBEDDING, settings, Collections.emptyMap()); @@ -187,7 +187,7 @@ public void testParseConfigStrictWithUnknownSettings() { ); assertThat( e.getMessage(), - containsString("Model configuration contains settings [{foo=bar}] unknown to the [elser_mlnode] service") + containsString("Model configuration contains settings [{foo=bar}] unknown to the [elser] service") ); } else { var parsed = service.parsePersistedConfig("foo", TaskType.SPARSE_EMBEDDING, settings, Collections.emptyMap()); diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportTrainedModelCacheInfoAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportTrainedModelCacheInfoAction.java index 0af653e1495a3..89eb1dc45c547 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportTrainedModelCacheInfoAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportTrainedModelCacheInfoAction.java @@ -47,11 +47,9 @@ public TransportTrainedModelCacheInfoAction( ) { super( TrainedModelCacheInfoAction.NAME, - threadPool, clusterService, transportService, actionFilters, - TrainedModelCacheInfoAction.Request::new, NodeModelCacheInfoRequest::new, threadPool.executor(ThreadPool.Names.MANAGEMENT) ); diff --git a/x-pack/plugin/repositories-metering-api/src/main/java/org/elasticsearch/xpack/repositories/metering/action/ClearRepositoriesMeteringArchiveAction.java b/x-pack/plugin/repositories-metering-api/src/main/java/org/elasticsearch/xpack/repositories/metering/action/ClearRepositoriesMeteringArchiveAction.java index e8d443af227c7..30b5b50be89c6 100644 --- a/x-pack/plugin/repositories-metering-api/src/main/java/org/elasticsearch/xpack/repositories/metering/action/ClearRepositoriesMeteringArchiveAction.java +++ b/x-pack/plugin/repositories-metering-api/src/main/java/org/elasticsearch/xpack/repositories/metering/action/ClearRepositoriesMeteringArchiveAction.java @@ -8,6 +8,7 @@ package org.elasticsearch.xpack.repositories.metering.action; import org.elasticsearch.action.ActionType; +import org.elasticsearch.common.io.stream.Writeable; public final class ClearRepositoriesMeteringArchiveAction extends ActionType { public static final ClearRepositoriesMeteringArchiveAction INSTANCE = new ClearRepositoriesMeteringArchiveAction(); @@ -15,6 +16,6 @@ public final class ClearRepositoriesMeteringArchiveAction extends ActionType { private final long maxVersionToClear; - public ClearRepositoriesMeteringArchiveRequest(StreamInput in) throws IOException { - super(in); - this.maxVersionToClear = in.readLong(); - } - public ClearRepositoriesMeteringArchiveRequest(long maxVersionToClear, String... nodesIds) { super(nodesIds); this.maxVersionToClear = maxVersionToClear; } @Override - public void writeTo(StreamOutput out) throws IOException { - super.writeTo(out); - out.writeLong(maxVersionToClear); + public void writeTo(StreamOutput out) { + TransportAction.localOnly(); } public long getMaxVersionToClear() { diff --git a/x-pack/plugin/repositories-metering-api/src/main/java/org/elasticsearch/xpack/repositories/metering/action/RepositoriesMeteringAction.java b/x-pack/plugin/repositories-metering-api/src/main/java/org/elasticsearch/xpack/repositories/metering/action/RepositoriesMeteringAction.java index eb713de51c2b2..47f34d31b29dd 100644 --- a/x-pack/plugin/repositories-metering-api/src/main/java/org/elasticsearch/xpack/repositories/metering/action/RepositoriesMeteringAction.java +++ b/x-pack/plugin/repositories-metering-api/src/main/java/org/elasticsearch/xpack/repositories/metering/action/RepositoriesMeteringAction.java @@ -8,6 +8,7 @@ package org.elasticsearch.xpack.repositories.metering.action; import org.elasticsearch.action.ActionType; +import org.elasticsearch.common.io.stream.Writeable; public final class RepositoriesMeteringAction extends ActionType { public static final RepositoriesMeteringAction INSTANCE = new RepositoriesMeteringAction(); @@ -15,6 +16,6 @@ public final class RepositoriesMeteringAction extends ActionType { - public RepositoriesMeteringRequest(StreamInput in) throws IOException { - super(in); - } - public RepositoriesMeteringRequest(String... nodesIds) { super(nodesIds); } + + @Override + public void writeTo(StreamOutput out) { + TransportAction.localOnly(); + } } diff --git a/x-pack/plugin/repositories-metering-api/src/main/java/org/elasticsearch/xpack/repositories/metering/action/RepositoriesMeteringResponse.java b/x-pack/plugin/repositories-metering-api/src/main/java/org/elasticsearch/xpack/repositories/metering/action/RepositoriesMeteringResponse.java index 811e23c0b44bb..8e9538623209f 100644 --- a/x-pack/plugin/repositories-metering-api/src/main/java/org/elasticsearch/xpack/repositories/metering/action/RepositoriesMeteringResponse.java +++ b/x-pack/plugin/repositories-metering-api/src/main/java/org/elasticsearch/xpack/repositories/metering/action/RepositoriesMeteringResponse.java @@ -8,6 +8,7 @@ package org.elasticsearch.xpack.repositories.metering.action; import org.elasticsearch.action.FailedNodeException; +import org.elasticsearch.action.support.TransportAction; import org.elasticsearch.action.support.nodes.BaseNodesResponse; import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.common.io.stream.StreamInput; @@ -20,10 +21,6 @@ public final class RepositoriesMeteringResponse extends BaseNodesResponse implements ToXContentFragment { - public RepositoriesMeteringResponse(StreamInput in) throws IOException { - super(in); - } - public RepositoriesMeteringResponse( ClusterName clusterName, List nodes, @@ -33,13 +30,13 @@ public RepositoriesMeteringResponse( } @Override - protected List readNodesFrom(StreamInput in) throws IOException { - return in.readCollectionAsList(RepositoriesNodeMeteringResponse::new); + protected List readNodesFrom(StreamInput in) { + return TransportAction.localOnly(); } @Override - protected void writeNodesTo(StreamOutput out, List nodes) throws IOException { - out.writeCollection(nodes); + protected void writeNodesTo(StreamOutput out, List nodes) { + TransportAction.localOnly(); } @Override diff --git a/x-pack/plugin/repositories-metering-api/src/main/java/org/elasticsearch/xpack/repositories/metering/action/TransportClearRepositoriesStatsArchiveAction.java b/x-pack/plugin/repositories-metering-api/src/main/java/org/elasticsearch/xpack/repositories/metering/action/TransportClearRepositoriesStatsArchiveAction.java index 6872210ca303f..d68903ae857ee 100644 --- a/x-pack/plugin/repositories-metering-api/src/main/java/org/elasticsearch/xpack/repositories/metering/action/TransportClearRepositoriesStatsArchiveAction.java +++ b/x-pack/plugin/repositories-metering-api/src/main/java/org/elasticsearch/xpack/repositories/metering/action/TransportClearRepositoriesStatsArchiveAction.java @@ -43,11 +43,9 @@ public TransportClearRepositoriesStatsArchiveAction( ) { super( ClearRepositoriesMeteringArchiveAction.NAME, - threadPool, clusterService, transportService, actionFilters, - ClearRepositoriesMeteringArchiveRequest::new, ClearRepositoriesStatsArchiveNodeRequest::new, threadPool.executor(ThreadPool.Names.GENERIC) ); diff --git a/x-pack/plugin/repositories-metering-api/src/main/java/org/elasticsearch/xpack/repositories/metering/action/TransportRepositoriesStatsAction.java b/x-pack/plugin/repositories-metering-api/src/main/java/org/elasticsearch/xpack/repositories/metering/action/TransportRepositoriesStatsAction.java index 1574cce107416..87a990be5f8af 100644 --- a/x-pack/plugin/repositories-metering-api/src/main/java/org/elasticsearch/xpack/repositories/metering/action/TransportRepositoriesStatsAction.java +++ b/x-pack/plugin/repositories-metering-api/src/main/java/org/elasticsearch/xpack/repositories/metering/action/TransportRepositoriesStatsAction.java @@ -41,11 +41,9 @@ public TransportRepositoriesStatsAction( ) { super( RepositoriesMeteringAction.NAME, - threadPool, clusterService, transportService, actionFilters, - RepositoriesMeteringRequest::new, RepositoriesNodeStatsRequest::new, threadPool.executor(ThreadPool.Names.GENERIC) ); diff --git a/x-pack/plugin/repositories-metering-api/src/test/java/org/elasticsearch/xpack/repositories/metering/action/RepositoriesMeteringResponseTests.java b/x-pack/plugin/repositories-metering-api/src/test/java/org/elasticsearch/xpack/repositories/metering/action/RepositoriesMeteringResponseTests.java deleted file mode 100644 index 03c2c3c17fe8d..0000000000000 --- a/x-pack/plugin/repositories-metering-api/src/test/java/org/elasticsearch/xpack/repositories/metering/action/RepositoriesMeteringResponseTests.java +++ /dev/null @@ -1,103 +0,0 @@ -/* - * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one - * or more contributor license agreements. Licensed under the Elastic License - * 2.0; you may not use this file except in compliance with the Elastic License - * 2.0. - */ - -package org.elasticsearch.xpack.repositories.metering.action; - -import org.elasticsearch.TransportVersion; -import org.elasticsearch.action.FailedNodeException; -import org.elasticsearch.cluster.ClusterName; -import org.elasticsearch.cluster.node.DiscoveryNode; -import org.elasticsearch.cluster.node.DiscoveryNodeUtils; -import org.elasticsearch.repositories.RepositoryInfo; -import org.elasticsearch.repositories.RepositoryStats; -import org.elasticsearch.repositories.RepositoryStatsSnapshot; -import org.elasticsearch.test.ESTestCase; - -import java.util.ArrayList; -import java.util.List; -import java.util.Locale; -import java.util.Map; - -import static org.hamcrest.Matchers.equalTo; - -public class RepositoriesMeteringResponseTests extends ESTestCase { - public void testSerializationRoundtrip() throws Exception { - final RepositoriesMeteringResponse repositoriesMeteringResponse = createResponse(); - final RepositoriesMeteringResponse deserializedResponse = copyWriteable( - repositoriesMeteringResponse, - writableRegistry(), - RepositoriesMeteringResponse::new, - TransportVersion.current() - ); - assertResponsesAreEqual(repositoriesMeteringResponse, deserializedResponse); - } - - private void assertResponsesAreEqual(RepositoriesMeteringResponse response, RepositoriesMeteringResponse otherResponse) { - List nodeResponses = response.getNodes(); - List otherNodeResponses = otherResponse.getNodes(); - assertThat(nodeResponses.size(), equalTo(otherNodeResponses.size())); - for (int i = 0; i < nodeResponses.size(); i++) { - RepositoriesNodeMeteringResponse nodeResponse = nodeResponses.get(i); - RepositoriesNodeMeteringResponse otherNodeResponse = otherNodeResponses.get(i); - assertThat(nodeResponse.repositoryStatsSnapshots, equalTo(otherNodeResponse.repositoryStatsSnapshots)); - } - - List failures = response.failures(); - List otherFailures = otherResponse.failures(); - assertThat(failures.size(), equalTo(otherFailures.size())); - for (int i = 0; i < failures.size(); i++) { - FailedNodeException failure = failures.get(i); - FailedNodeException otherFailure = otherFailures.get(i); - assertThat(failure.nodeId(), equalTo(otherFailure.nodeId())); - assertThat(failure.getMessage(), equalTo(otherFailure.getMessage())); - } - } - - private RepositoriesMeteringResponse createResponse() { - ClusterName clusterName = new ClusterName("test"); - int nodes = randomIntBetween(1, 10); - List nodeResponses = new ArrayList<>(nodes); - for (int nodeId = 0; nodeId < nodes; nodeId++) { - DiscoveryNode node = DiscoveryNodeUtils.create("nodeId" + nodeId); - int numberOfRepos = randomInt(10); - List nodeRepoStats = new ArrayList<>(numberOfRepos); - - for (int clusterVersion = 0; clusterVersion < numberOfRepos; clusterVersion++) { - String repoId = randomAlphaOfLength(10); - String repoName = randomAlphaOfLength(10).toLowerCase(Locale.ROOT); - String repoType = randomAlphaOfLength(10).toLowerCase(Locale.ROOT); - Map repoLocation = Map.of("bucket", randomAlphaOfLength(10).toLowerCase(Locale.ROOT)); - long startedAt = System.currentTimeMillis() - 1; - Long stoppedAt = randomBoolean() ? System.currentTimeMillis() : null; - RepositoryInfo repositoryInfo = new RepositoryInfo(repoId, repoName, repoType, repoLocation, startedAt, stoppedAt); - boolean archived = randomBoolean(); - RepositoryStatsSnapshot statsSnapshot = new RepositoryStatsSnapshot( - repositoryInfo, - new RepositoryStats(Map.of("GET", randomLongBetween(0, 2000))), - archived ? clusterVersion : RepositoryStatsSnapshot.UNKNOWN_CLUSTER_VERSION, - archived - ); - nodeRepoStats.add(statsSnapshot); - } - - nodeResponses.add(new RepositoriesNodeMeteringResponse(node, nodeRepoStats)); - } - - int numberOfFailures = randomInt(20); - List failures = new ArrayList<>(numberOfFailures); - for (int i = nodes; i < numberOfFailures + nodes; i++) { - FailedNodeException failedNodeException = new FailedNodeException( - "nodeId" + i, - "error", - randomBoolean() ? new RuntimeException("boom") : null - ); - failures.add(failedNodeException); - } - - return new RepositoriesMeteringResponse(clusterName, nodeResponses, failures); - } -} diff --git a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/action/cache/TransportSearchableSnapshotCacheStoresAction.java b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/action/cache/TransportSearchableSnapshotCacheStoresAction.java index ce49969d3fb6c..ed92a43ad0d08 100644 --- a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/action/cache/TransportSearchableSnapshotCacheStoresAction.java +++ b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/action/cache/TransportSearchableSnapshotCacheStoresAction.java @@ -9,6 +9,7 @@ import org.elasticsearch.action.ActionType; import org.elasticsearch.action.FailedNodeException; import org.elasticsearch.action.support.ActionFilters; +import org.elasticsearch.action.support.TransportAction; import org.elasticsearch.action.support.nodes.BaseNodeResponse; import org.elasticsearch.action.support.nodes.BaseNodesRequest; import org.elasticsearch.action.support.nodes.BaseNodesResponse; @@ -19,6 +20,7 @@ import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.io.stream.Writeable; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.snapshots.SnapshotId; import org.elasticsearch.tasks.Task; @@ -42,7 +44,7 @@ public class TransportSearchableSnapshotCacheStoresAction extends TransportNodes public static final String ACTION_NAME = "internal:admin/xpack/searchable_snapshots/cache/store"; - public static final ActionType TYPE = new ActionType<>(ACTION_NAME, NodesCacheFilesMetadata::new); + public static final ActionType TYPE = new ActionType<>(ACTION_NAME, Writeable.Reader.localOnly()); private final CacheService cacheService; @@ -56,11 +58,9 @@ public TransportSearchableSnapshotCacheStoresAction( ) { super( ACTION_NAME, - threadPool, clusterService, transportService, actionFilters, - Request::new, NodeRequest::new, threadPool.executor(ThreadPool.Names.MANAGEMENT) ); @@ -106,17 +106,9 @@ public Request(SnapshotId snapshotId, ShardId shardId, DiscoveryNode[] nodes) { this.shardId = shardId; } - public Request(StreamInput in) throws IOException { - super(in); - snapshotId = new SnapshotId(in); - shardId = new ShardId(in); - } - @Override public void writeTo(StreamOutput out) throws IOException { - super.writeTo(out); - snapshotId.writeTo(out); - shardId.writeTo(out); + TransportAction.localOnly(); } } @@ -170,23 +162,18 @@ public void writeTo(StreamOutput out) throws IOException { } public static class NodesCacheFilesMetadata extends BaseNodesResponse { - - public NodesCacheFilesMetadata(StreamInput in) throws IOException { - super(in); - } - public NodesCacheFilesMetadata(ClusterName clusterName, List nodes, List failures) { super(clusterName, nodes, failures); } @Override - protected List readNodesFrom(StreamInput in) throws IOException { - return in.readCollectionAsList(NodeCacheFilesMetadata::new); + protected List readNodesFrom(StreamInput in) { + return TransportAction.localOnly(); } @Override - protected void writeNodesTo(StreamOutput out, List nodes) throws IOException { - out.writeCollection(nodes); + protected void writeNodesTo(StreamOutput out, List nodes) { + TransportAction.localOnly(); } } } diff --git a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/action/cache/TransportSearchableSnapshotsNodeCachesStatsAction.java b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/action/cache/TransportSearchableSnapshotsNodeCachesStatsAction.java index 71aa7709685b1..c192d5bff8eb9 100644 --- a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/action/cache/TransportSearchableSnapshotsNodeCachesStatsAction.java +++ b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/action/cache/TransportSearchableSnapshotsNodeCachesStatsAction.java @@ -9,6 +9,7 @@ import org.elasticsearch.action.ActionType; import org.elasticsearch.action.FailedNodeException; import org.elasticsearch.action.support.ActionFilters; +import org.elasticsearch.action.support.TransportAction; import org.elasticsearch.action.support.nodes.BaseNodeResponse; import org.elasticsearch.action.support.nodes.BaseNodesRequest; import org.elasticsearch.action.support.nodes.BaseNodesResponse; @@ -21,6 +22,7 @@ import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.io.stream.Writeable; import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.license.XPackLicenseState; import org.elasticsearch.tasks.Task; @@ -50,7 +52,7 @@ public class TransportSearchableSnapshotsNodeCachesStatsAction extends Transport public static final String ACTION_NAME = "cluster:admin/xpack/searchable_snapshots/cache/stats"; - public static final ActionType TYPE = new ActionType<>(ACTION_NAME, NodesCachesStatsResponse::new); + public static final ActionType TYPE = new ActionType<>(ACTION_NAME, Writeable.Reader.localOnly()); private final Supplier> frozenCacheService; private final XPackLicenseState licenseState; @@ -66,11 +68,9 @@ public TransportSearchableSnapshotsNodeCachesStatsAction( ) { super( ACTION_NAME, - threadPool, clusterService, transportService, actionFilters, - NodesRequest::new, NodeRequest::new, threadPool.executor(ThreadPool.Names.MANAGEMENT) ); @@ -155,13 +155,9 @@ public NodesRequest(String[] nodes) { super(nodes); } - public NodesRequest(StreamInput in) throws IOException { - super(in); - } - @Override - public void writeTo(StreamOutput out) throws IOException { - super.writeTo(out); + public void writeTo(StreamOutput out) { + TransportAction.localOnly(); } } @@ -279,22 +275,18 @@ public long getEvictions() { public static class NodesCachesStatsResponse extends BaseNodesResponse implements ToXContentObject { - public NodesCachesStatsResponse(StreamInput in) throws IOException { - super(in); - } - public NodesCachesStatsResponse(ClusterName clusterName, List nodes, List failures) { super(clusterName, nodes, failures); } @Override - protected List readNodesFrom(StreamInput in) throws IOException { - return in.readCollectionAsList(NodeCachesStatsResponse::new); + protected List readNodesFrom(StreamInput in) { + return TransportAction.localOnly(); } @Override - protected void writeNodesTo(StreamOutput out, List nodes) throws IOException { - out.writeCollection(nodes); + protected void writeNodesTo(StreamOutput out, List nodes) { + TransportAction.localOnly(); } @Override diff --git a/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/action/TransportClearSecurityCacheAction.java b/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/action/TransportClearSecurityCacheAction.java index 56c8ac6a80868..f8df2461ec8e5 100644 --- a/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/action/TransportClearSecurityCacheAction.java +++ b/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/action/TransportClearSecurityCacheAction.java @@ -47,11 +47,9 @@ public TransportClearSecurityCacheAction( ) { super( ClearSecurityCacheAction.NAME, - threadPool, clusterService, transportService, actionFilters, - ClearSecurityCacheRequest::new, ClearSecurityCacheRequest.Node::new, threadPool.executor(ThreadPool.Names.MANAGEMENT) ); diff --git a/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/action/privilege/TransportClearPrivilegesCacheAction.java b/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/action/privilege/TransportClearPrivilegesCacheAction.java index bfaac8c84ec34..282733c64d37b 100644 --- a/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/action/privilege/TransportClearPrivilegesCacheAction.java +++ b/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/action/privilege/TransportClearPrivilegesCacheAction.java @@ -46,11 +46,9 @@ public TransportClearPrivilegesCacheAction( ) { super( ClearPrivilegesCacheAction.NAME, - threadPool, clusterService, transportService, actionFilters, - ClearPrivilegesCacheRequest::new, ClearPrivilegesCacheRequest.Node::new, threadPool.executor(ThreadPool.Names.MANAGEMENT) ); diff --git a/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/action/realm/TransportClearRealmCacheAction.java b/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/action/realm/TransportClearRealmCacheAction.java index 3daf9c4053bfa..b17cf624bf83d 100644 --- a/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/action/realm/TransportClearRealmCacheAction.java +++ b/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/action/realm/TransportClearRealmCacheAction.java @@ -48,11 +48,9 @@ public TransportClearRealmCacheAction( ) { super( ClearRealmCacheAction.NAME, - threadPool, clusterService, transportService, actionFilters, - ClearRealmCacheRequest::new, ClearRealmCacheRequest.Node::new, threadPool.executor(ThreadPool.Names.MANAGEMENT) ); diff --git a/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/action/role/TransportClearRolesCacheAction.java b/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/action/role/TransportClearRolesCacheAction.java index 7ebec117ff428..a879f35e096da 100644 --- a/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/action/role/TransportClearRolesCacheAction.java +++ b/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/action/role/TransportClearRolesCacheAction.java @@ -42,11 +42,9 @@ public TransportClearRolesCacheAction( ) { super( ClearRolesCacheAction.NAME, - threadPool, clusterService, transportService, actionFilters, - ClearRolesCacheRequest::new, ClearRolesCacheRequest.Node::new, threadPool.executor(ThreadPool.Names.MANAGEMENT) ); diff --git a/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/action/service/TransportGetServiceAccountNodesCredentialsAction.java b/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/action/service/TransportGetServiceAccountNodesCredentialsAction.java index 8c338250f16a0..e9e90ec855d4d 100644 --- a/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/action/service/TransportGetServiceAccountNodesCredentialsAction.java +++ b/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/action/service/TransportGetServiceAccountNodesCredentialsAction.java @@ -49,11 +49,9 @@ public TransportGetServiceAccountNodesCredentialsAction( ) { super( GetServiceAccountNodesCredentialsAction.NAME, - threadPool, clusterService, transportService, actionFilters, - GetServiceAccountCredentialsNodesRequest::new, GetServiceAccountCredentialsNodesRequest.Node::new, threadPool.executor(ThreadPool.Names.GENERIC) ); diff --git a/x-pack/plugin/spatial/src/main/java/org/elasticsearch/xpack/spatial/action/SpatialStatsTransportAction.java b/x-pack/plugin/spatial/src/main/java/org/elasticsearch/xpack/spatial/action/SpatialStatsTransportAction.java index c94fe94a7246d..708594a9a287f 100644 --- a/x-pack/plugin/spatial/src/main/java/org/elasticsearch/xpack/spatial/action/SpatialStatsTransportAction.java +++ b/x-pack/plugin/spatial/src/main/java/org/elasticsearch/xpack/spatial/action/SpatialStatsTransportAction.java @@ -39,11 +39,9 @@ public SpatialStatsTransportAction( ) { super( SpatialStatsAction.NAME, - threadPool, clusterService, transportService, actionFilters, - SpatialStatsAction.Request::new, SpatialStatsAction.NodeRequest::new, threadPool.executor(ThreadPool.Names.MANAGEMENT) ); @@ -61,7 +59,7 @@ protected SpatialStatsAction.Response newResponse( @Override protected SpatialStatsAction.NodeRequest newNodeRequest(SpatialStatsAction.Request request) { - return new SpatialStatsAction.NodeRequest(request); + return new SpatialStatsAction.NodeRequest(); } @Override diff --git a/x-pack/plugin/spatial/src/test/java/org/elasticsearch/xpack/spatial/action/SpatialStatsTransportActionTests.java b/x-pack/plugin/spatial/src/test/java/org/elasticsearch/xpack/spatial/action/SpatialStatsTransportActionTests.java index a7e0ba6eb0d9b..d410f923aa5e1 100644 --- a/x-pack/plugin/spatial/src/test/java/org/elasticsearch/xpack/spatial/action/SpatialStatsTransportActionTests.java +++ b/x-pack/plugin/spatial/src/test/java/org/elasticsearch/xpack/spatial/action/SpatialStatsTransportActionTests.java @@ -96,7 +96,7 @@ private SpatialStatsTransportAction toAction(SpatialUsage nodeUsage) { private ObjectPath buildSpatialStatsResponse(SpatialUsage... nodeUsages) throws IOException { SpatialStatsAction.Request request = new SpatialStatsAction.Request(); List nodeResponses = Arrays.stream(nodeUsages) - .map(usage -> toAction(usage).nodeOperation(new SpatialStatsAction.NodeRequest(request), null)) + .map(usage -> toAction(usage).nodeOperation(new SpatialStatsAction.NodeRequest(), null)) .collect(Collectors.toList()); SpatialStatsAction.Response response = new SpatialStatsAction.Response(new ClusterName("cluster_name"), nodeResponses, emptyList()); try (XContentBuilder builder = jsonBuilder()) { diff --git a/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/plugin/SqlStatsAction.java b/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/plugin/SqlStatsAction.java index 9ce71a38ef055..1a7afdce307ed 100644 --- a/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/plugin/SqlStatsAction.java +++ b/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/plugin/SqlStatsAction.java @@ -8,6 +8,7 @@ package org.elasticsearch.xpack.sql.plugin; import org.elasticsearch.action.ActionType; +import org.elasticsearch.common.io.stream.Writeable; public class SqlStatsAction extends ActionType { @@ -15,6 +16,6 @@ public class SqlStatsAction extends ActionType { public static final String NAME = "cluster:monitor/xpack/sql/stats/dist"; private SqlStatsAction() { - super(NAME, SqlStatsResponse::new); + super(NAME, Writeable.Reader.localOnly()); } } diff --git a/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/plugin/SqlStatsRequest.java b/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/plugin/SqlStatsRequest.java index 079b037ae1246..af3a82905f8ee 100644 --- a/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/plugin/SqlStatsRequest.java +++ b/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/plugin/SqlStatsRequest.java @@ -7,6 +7,7 @@ package org.elasticsearch.xpack.sql.plugin; +import org.elasticsearch.action.support.TransportAction; import org.elasticsearch.action.support.nodes.BaseNodesRequest; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; @@ -25,11 +26,6 @@ public SqlStatsRequest() { super((String[]) null); } - public SqlStatsRequest(StreamInput in) throws IOException { - super(in); - includeStats = in.readBoolean(); - } - public boolean includeStats() { return includeStats; } @@ -40,8 +36,7 @@ public void includeStats(boolean includeStats) { @Override public void writeTo(StreamOutput out) throws IOException { - super.writeTo(out); - out.writeBoolean(includeStats); + TransportAction.localOnly(); } @Override diff --git a/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/plugin/SqlStatsResponse.java b/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/plugin/SqlStatsResponse.java index 335b17555242c..860eb8d7de801 100644 --- a/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/plugin/SqlStatsResponse.java +++ b/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/plugin/SqlStatsResponse.java @@ -8,6 +8,7 @@ package org.elasticsearch.xpack.sql.plugin; import org.elasticsearch.action.FailedNodeException; +import org.elasticsearch.action.support.TransportAction; import org.elasticsearch.action.support.nodes.BaseNodeResponse; import org.elasticsearch.action.support.nodes.BaseNodesResponse; import org.elasticsearch.cluster.ClusterName; @@ -23,22 +24,18 @@ public class SqlStatsResponse extends BaseNodesResponse implements ToXContentObject { - public SqlStatsResponse(StreamInput in) throws IOException { - super(in); - } - public SqlStatsResponse(ClusterName clusterName, List nodes, List failures) { super(clusterName, nodes, failures); } @Override protected List readNodesFrom(StreamInput in) throws IOException { - return in.readCollectionAsList(NodeStatsResponse::readNodeResponse); + return TransportAction.localOnly(); } @Override protected void writeNodesTo(StreamOutput out, List nodes) throws IOException { - out.writeCollection(nodes); + TransportAction.localOnly(); } @Override diff --git a/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/plugin/TransportSqlStatsAction.java b/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/plugin/TransportSqlStatsAction.java index c1cc01b824ec6..4004724019af8 100644 --- a/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/plugin/TransportSqlStatsAction.java +++ b/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/plugin/TransportSqlStatsAction.java @@ -43,11 +43,9 @@ public TransportSqlStatsAction( ) { super( SqlStatsAction.NAME, - threadPool, clusterService, transportService, actionFilters, - SqlStatsRequest::new, SqlStatsRequest.NodeStatsRequest::new, threadPool.executor(ThreadPool.Names.MANAGEMENT) ); diff --git a/x-pack/plugin/src/yamlRestTest/resources/rest-api-spec/test/inference/inference_crud.yml b/x-pack/plugin/src/yamlRestTest/resources/rest-api-spec/test/inference/inference_crud.yml index af67a099085fd..f33201ff53ae6 100644 --- a/x-pack/plugin/src/yamlRestTest/resources/rest-api-spec/test/inference/inference_crud.yml +++ b/x-pack/plugin/src/yamlRestTest/resources/rest-api-spec/test/inference/inference_crud.yml @@ -17,7 +17,7 @@ model_id: elser_model body: > { - "service": "elser_mlnode", + "service": "elser", "service_settings": { "num_allocations": 1, "num_threads": 1 diff --git a/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/transport/actions/TransportWatcherStatsAction.java b/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/transport/actions/TransportWatcherStatsAction.java index 2f2354162b8d4..fc181b38f5944 100644 --- a/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/transport/actions/TransportWatcherStatsAction.java +++ b/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/transport/actions/TransportWatcherStatsAction.java @@ -54,11 +54,9 @@ public TransportWatcherStatsAction( ) { super( WatcherStatsAction.NAME, - threadPool, clusterService, transportService, actionFilters, - WatcherStatsRequest::new, WatcherStatsRequest.Node::new, threadPool.executor(ThreadPool.Names.MANAGEMENT) );