Skip to content

Commit

Permalink
Skip BaseNodesRequest header without instantiation (#109682)
Browse files Browse the repository at this point in the history
As described in #100878 we sometimes send an unnecessary wrapped-up
`BaseNodesRequest` to individual nodes for legacy reasons, and today we
do that by calling its constructor and then discarding the resulting
object. This commit introduces utilities for skipping and synthesizing
the unnecessary data without involving `BaseNodesRequest` itself.
  • Loading branch information
DaveCTurner authored Jun 17, 2024
1 parent c9d66b6 commit 0a7c99b
Show file tree
Hide file tree
Showing 21 changed files with 121 additions and 230 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,11 @@

package org.elasticsearch.action.admin.cluster.node.hotthreads;

import org.elasticsearch.action.support.TransportAction;
import org.elasticsearch.action.support.nodes.BaseNodesRequest;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.core.UpdateForV9;
import org.elasticsearch.monitor.jvm.HotThreads;

import java.io.IOException;
Expand All @@ -22,12 +21,6 @@ public class NodesHotThreadsRequest extends BaseNodesRequest<NodesHotThreadsRequ

final HotThreads.RequestOptions requestOptions;

@UpdateForV9 // will be unused in v9
public NodesHotThreadsRequest(StreamInput in) throws IOException {
super(in);
requestOptions = HotThreads.RequestOptions.readFrom(in);
}

/**
* Get hot threads from nodes based on the nodes ids specified. If none are passed, hot
* threads for all nodes is used.
Expand Down Expand Up @@ -69,10 +62,8 @@ public int snapshots() {
return requestOptions.snapshots();
}

@UpdateForV9 // can become localOnly() in v9
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
requestOptions.writeTo(out);
TransportAction.localOnly();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
import org.elasticsearch.action.support.nodes.TransportNodesAction;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.bytes.ReleasableBytesReference;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.Streams;
Expand Down Expand Up @@ -116,21 +115,15 @@ public static class NodeRequest extends TransportRequest {

NodeRequest(StreamInput in) throws IOException {
super(in);
if (in.getTransportVersion().onOrAfter(TransportVersions.MORE_LIGHTER_NODES_REQUESTS)) {
requestOptions = HotThreads.RequestOptions.readFrom(in);
} else {
requestOptions = new NodesHotThreadsRequest(in).requestOptions;
}
skipLegacyNodesRequestHeader(TransportVersions.MORE_LIGHTER_NODES_REQUESTS, in);
requestOptions = HotThreads.RequestOptions.readFrom(in);
}

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
if (out.getTransportVersion().onOrAfter(TransportVersions.MORE_LIGHTER_NODES_REQUESTS)) {
requestOptions.writeTo(out);
} else {
new NodesHotThreadsRequest(Strings.EMPTY_ARRAY, requestOptions).writeTo(out);
}
sendLegacyNodesRequestHeader(TransportVersions.MORE_LIGHTER_NODES_REQUESTS, out);
requestOptions.writeTo(out);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,9 @@

package org.elasticsearch.action.admin.cluster.node.info;

import org.elasticsearch.action.support.TransportAction;
import org.elasticsearch.action.support.nodes.BaseNodesRequest;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.core.UpdateForV9;

import java.io.IOException;
import java.util.Set;
Expand All @@ -25,18 +24,6 @@ public final class NodesInfoRequest extends BaseNodesRequest<NodesInfoRequest> {

private final NodesInfoMetrics nodesInfoMetrics;

/**
* Create a new NodeInfoRequest from a {@link StreamInput} object.
*
* @param in A stream input object.
* @throws IOException if the stream cannot be deserialized.
*/
@UpdateForV9 // this constructor is unused in v9
public NodesInfoRequest(StreamInput in) throws IOException {
super(in);
nodesInfoMetrics = new NodesInfoMetrics(in);
}

/**
* Get information from nodes based on the nodes ids specified. If none are passed, information
* for all nodes will be returned.
Expand Down Expand Up @@ -113,11 +100,9 @@ public NodesInfoRequest removeMetric(String metric) {
return this;
}

@UpdateForV9 // this method can just call localOnly() in v9
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
nodesInfoMetrics.writeTo(out);
TransportAction.localOnly();
}

public NodesInfoMetrics getNodesInfoMetrics() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,11 +101,8 @@ public static class NodeInfoRequest extends TransportRequest {

public NodeInfoRequest(StreamInput in) throws IOException {
super(in);
if (in.getTransportVersion().onOrAfter(V_8_11_X)) {
this.nodesInfoMetrics = new NodesInfoMetrics(in);
} else {
this.nodesInfoMetrics = new NodesInfoRequest(in).getNodesInfoMetrics();
}
skipLegacyNodesRequestHeader(V_8_11_X, in);
this.nodesInfoMetrics = new NodesInfoMetrics(in);
}

public NodeInfoRequest(NodesInfoRequest request) {
Expand All @@ -115,11 +112,8 @@ public NodeInfoRequest(NodesInfoRequest request) {
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
if (out.getTransportVersion().onOrAfter(V_8_11_X)) {
this.nodesInfoMetrics.writeTo(out);
} else {
new NodesInfoRequest().clear().addMetrics(nodesInfoMetrics.requestedMetrics()).writeTo(out);
}
sendLegacyNodesRequestHeader(V_8_11_X, out);
nodesInfoMetrics.writeTo(out);
}

public Set<String> requestedMetrics() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,10 @@
package org.elasticsearch.action.admin.cluster.node.stats;

import org.elasticsearch.action.admin.indices.stats.CommonStatsFlags;
import org.elasticsearch.action.support.TransportAction;
import org.elasticsearch.action.support.nodes.BaseNodesRequest;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.core.UpdateForV9;
import org.elasticsearch.tasks.CancellableTask;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.tasks.TaskId;
Expand All @@ -37,12 +36,6 @@ public NodesStatsRequest() {
nodesStatsRequestParameters = new NodesStatsRequestParameters();
}

@UpdateForV9 // this constructor is unused in v9
public NodesStatsRequest(StreamInput in) throws IOException {
super(in);
nodesStatsRequestParameters = new NodesStatsRequestParameters(in);
}

/**
* Get stats from nodes based on the nodes ids specified. If none are passed, stats
* for all nodes will be returned.
Expand Down Expand Up @@ -179,11 +172,9 @@ public void setIncludeShardsStats(boolean includeShardsStats) {
nodesStatsRequestParameters.setIncludeShardsStats(includeShardsStats);
}

@UpdateForV9 // this method can just call localOnly() in v9
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
nodesStatsRequestParameters.writeTo(out);
TransportAction.localOnly();
}

public NodesStatsRequestParameters getNodesStatsRequestParameters() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -161,13 +161,11 @@ public static class NodeStatsRequest extends TransportRequest {

public NodeStatsRequest(StreamInput in) throws IOException {
super(in);
if (in.getTransportVersion().before(TransportVersions.V_8_13_0)) {
this.nodesStatsRequestParameters = new NodesStatsRequest(in).getNodesStatsRequestParameters();
} else {
this.nodesStatsRequestParameters = new NodesStatsRequestParameters(in);
if (in.getTransportVersion().before(TransportVersions.DROP_UNUSED_NODES_IDS)) {
in.readStringArray(); // formerly nodeIds, now unused
}
skipLegacyNodesRequestHeader(TransportVersions.V_8_13_0, in);
this.nodesStatsRequestParameters = new NodesStatsRequestParameters(in);
if (in.getTransportVersion().onOrAfter(TransportVersions.V_8_13_0)
&& in.getTransportVersion().before(TransportVersions.DROP_UNUSED_NODES_IDS)) {
in.readStringArray(); // formerly nodeIds, now unused
}
}

Expand All @@ -192,13 +190,11 @@ public String getDescription() {
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
if (out.getTransportVersion().before(TransportVersions.V_8_13_0)) {
new NodesStatsRequest(nodesStatsRequestParameters, Strings.EMPTY_ARRAY).writeTo(out);
} else {
nodesStatsRequestParameters.writeTo(out);
if (out.getTransportVersion().before(TransportVersions.DROP_UNUSED_NODES_IDS)) {
out.writeStringArray(Strings.EMPTY_ARRAY); // formerly nodeIds, now unused
}
sendLegacyNodesRequestHeader(TransportVersions.V_8_13_0, out);
nodesStatsRequestParameters.writeTo(out);
if (out.getTransportVersion().onOrAfter(TransportVersions.V_8_13_0)
&& out.getTransportVersion().before(TransportVersions.DROP_UNUSED_NODES_IDS)) {
out.writeStringArray(Strings.EMPTY_ARRAY); // formerly nodeIds, now unused
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,9 @@

package org.elasticsearch.action.admin.cluster.node.usage;

import org.elasticsearch.action.support.TransportAction;
import org.elasticsearch.action.support.nodes.BaseNodesRequest;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.core.UpdateForV9;

import java.io.IOException;

Expand All @@ -20,13 +19,6 @@ public class NodesUsageRequest extends BaseNodesRequest<NodesUsageRequest> {
private boolean restActions;
private boolean aggregations;

@UpdateForV9 // will be unused in v9
public NodesUsageRequest(StreamInput in) throws IOException {
super(in);
this.restActions = in.readBoolean();
this.aggregations = in.readBoolean();
}

/**
* Get usage from nodes based on the nodes ids specified. If none are
* passed, usage for all nodes will be returned.
Expand Down Expand Up @@ -82,11 +74,8 @@ public NodesUsageRequest aggregations(boolean aggregations) {
return this;
}

@UpdateForV9 // can become localOnly() in v9
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeBoolean(restActions);
out.writeBoolean(aggregations);
TransportAction.localOnly();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -91,14 +91,9 @@ public static class NodeUsageRequest extends TransportRequest {

public NodeUsageRequest(StreamInput in) throws IOException {
super(in);
if (in.getTransportVersion().onOrAfter(TransportVersions.MORE_LIGHTER_NODES_REQUESTS)) {
restActions = in.readBoolean();
aggregations = in.readBoolean();
} else {
final var request = new NodesUsageRequest(in);
restActions = request.restActions();
aggregations = request.aggregations();
}
skipLegacyNodesRequestHeader(TransportVersions.MORE_LIGHTER_NODES_REQUESTS, in);
restActions = in.readBoolean();
aggregations = in.readBoolean();
}

NodeUsageRequest(NodesUsageRequest request) {
Expand All @@ -109,12 +104,9 @@ public NodeUsageRequest(StreamInput in) throws IOException {
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
if (out.getTransportVersion().onOrAfter(TransportVersions.MORE_LIGHTER_NODES_REQUESTS)) {
out.writeBoolean(restActions);
out.writeBoolean(aggregations);
} else {
new NodesUsageRequest().restActions(restActions).aggregations(aggregations).writeTo(out);
}
sendLegacyNodesRequestHeader(TransportVersions.MORE_LIGHTER_NODES_REQUESTS, out);
out.writeBoolean(restActions);
out.writeBoolean(aggregations);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,9 @@

package org.elasticsearch.action.admin.cluster.stats;

import org.elasticsearch.action.support.TransportAction;
import org.elasticsearch.action.support.nodes.BaseNodesRequest;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.core.UpdateForV9;
import org.elasticsearch.tasks.CancellableTask;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.tasks.TaskId;
Expand All @@ -23,12 +22,6 @@
* A request to get cluster level stats.
*/
public class ClusterStatsRequest extends BaseNodesRequest<ClusterStatsRequest> {

@UpdateForV9 // this constructor is unused in v9
public ClusterStatsRequest(StreamInput in) throws IOException {
super(in);
}

/**
* Get stats from nodes based on the nodes ids specified. If none are passed, stats
* based on all nodes will be returned.
Expand All @@ -42,10 +35,9 @@ public Task createTask(long id, String type, String action, TaskId parentTaskId,
return new CancellableTask(id, type, action, "", parentTaskId, headers);
}

@UpdateForV9 // this method can just call localOnly() in v9
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
TransportAction.localOnly();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -260,9 +260,7 @@ public static class ClusterStatsNodeRequest extends TransportRequest {

public ClusterStatsNodeRequest(StreamInput in) throws IOException {
super(in);
if (in.getTransportVersion().before(TransportVersions.DROP_UNUSED_NODES_REQUESTS)) {
new ClusterStatsRequest(in);
}
skipLegacyNodesRequestHeader(TransportVersions.DROP_UNUSED_NODES_REQUESTS, in);
}

@Override
Expand All @@ -273,9 +271,7 @@ public Task createTask(long id, String type, String action, TaskId parentTaskId,
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
if (out.getTransportVersion().before(TransportVersions.DROP_UNUSED_NODES_REQUESTS)) {
new ClusterStatsRequest().writeTo(out);
}
sendLegacyNodesRequestHeader(TransportVersions.DROP_UNUSED_NODES_REQUESTS, out);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.core.TimeValue;

Expand All @@ -39,23 +38,6 @@ public abstract class BaseNodesRequest<Request extends BaseNodesRequest<Request>

private TimeValue timeout;

/**
* @deprecated {@link BaseNodesRequest} derivatives are quite heavyweight and should never need sending over the wire. Do not include
* the full top-level request directly in the node-level requests. Instead, copy the needed fields over to a dedicated node-level
* request.
*
* @see <a href="https://github.com/elastic/elasticsearch/issues/100878">#100878</a>
*/
@Deprecated(forRemoval = true)
protected BaseNodesRequest(StreamInput in) throws IOException {
// A bare `BaseNodesRequest` is never sent over the wire, but several implementations send the full top-level request to each node
// (wrapped up in another request). They shouldn't, but until we fix that we must keep this. See #100878.
super(in);
nodesIds = in.readStringArray();
concreteNodes = in.readOptionalArray(DiscoveryNode::new, DiscoveryNode[]::new);
timeout = in.readOptionalTimeValue();
}

protected BaseNodesRequest(String... nodesIds) {
this.nodesIds = nodesIds;
}
Expand Down
Loading

0 comments on commit 0a7c99b

Please sign in to comment.