Skip to content

Commit

Permalink
Remove nodeId from BaseNodeRequest (#43658)
Browse files Browse the repository at this point in the history
TransportNodesAction provides a mechanism to easily broadcast a request
to many nodes, and collect the respones into a high level response. Each
node has its own request type, with a base class of BaseNodeRequest.
This base request requires passing the nodeId to which the request will
be sent. However, that nodeId is not used anywhere. It is private to the
base class, yet serialized to each node, where the node could just as
easily find the nodeId of the node it is on locally.

This commit removes passing the nodeId through to the node request
creation, and guards its serialization so that we can remove the base
request class altogether in the future.
  • Loading branch information
rjernst authored Jun 28, 2019
1 parent 34a86cc commit 5b4089e
Show file tree
Hide file tree
Showing 27 changed files with 74 additions and 109 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,8 @@ protected NodesHotThreadsResponse newResponse(NodesHotThreadsRequest request,
}

@Override
protected NodeRequest newNodeRequest(String nodeId, NodesHotThreadsRequest request) {
return new NodeRequest(nodeId, request);
protected NodeRequest newNodeRequest(NodesHotThreadsRequest request) {
return new NodeRequest(request);
}

@Override
Expand Down Expand Up @@ -85,8 +85,7 @@ public static class NodeRequest extends BaseNodeRequest {
public NodeRequest() {
}

NodeRequest(String nodeId, NodesHotThreadsRequest request) {
super(nodeId);
NodeRequest(NodesHotThreadsRequest request) {
this.request = request;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,8 @@ protected NodesInfoResponse newResponse(NodesInfoRequest nodesInfoRequest,
}

@Override
protected NodeInfoRequest newNodeRequest(String nodeId, NodesInfoRequest request) {
return new NodeInfoRequest(nodeId, request);
protected NodeInfoRequest newNodeRequest(NodesInfoRequest request) {
return new NodeInfoRequest(request);
}

@Override
Expand All @@ -79,8 +79,7 @@ public static class NodeInfoRequest extends BaseNodeRequest {
public NodeInfoRequest() {
}

public NodeInfoRequest(String nodeId, NodesInfoRequest request) {
super(nodeId);
public NodeInfoRequest(NodesInfoRequest request) {
this.request = request;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,8 +68,8 @@ protected NodesReloadSecureSettingsResponse newResponse(NodesReloadSecureSetting
}

@Override
protected NodeRequest newNodeRequest(String nodeId, NodesReloadSecureSettingsRequest request) {
return new NodeRequest(nodeId, request);
protected NodeRequest newNodeRequest(NodesReloadSecureSettingsRequest request) {
return new NodeRequest(request);
}

@Override
Expand Down Expand Up @@ -116,8 +116,7 @@ public static class NodeRequest extends BaseNodeRequest {
public NodeRequest() {
}

NodeRequest(String nodeId, NodesReloadSecureSettingsRequest request) {
super(nodeId);
NodeRequest(NodesReloadSecureSettingsRequest request) {
this.request = request;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,8 @@ protected NodesStatsResponse newResponse(NodesStatsRequest request, List<NodeSta
}

@Override
protected NodeStatsRequest newNodeRequest(String nodeId, NodesStatsRequest request) {
return new NodeStatsRequest(nodeId, request);
protected NodeStatsRequest newNodeRequest(NodesStatsRequest request) {
return new NodeStatsRequest(request);
}

@Override
Expand All @@ -79,8 +79,7 @@ public static class NodeStatsRequest extends BaseNodeRequest {
public NodeStatsRequest() {
}

NodeStatsRequest(String nodeId, NodesStatsRequest request) {
super(nodeId);
NodeStatsRequest(NodesStatsRequest request) {
this.request = request;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,8 @@ protected NodesUsageResponse newResponse(NodesUsageRequest request, List<NodeUsa
}

@Override
protected NodeUsageRequest newNodeRequest(String nodeId, NodesUsageRequest request) {
return new NodeUsageRequest(nodeId, request);
protected NodeUsageRequest newNodeRequest(NodesUsageRequest request) {
return new NodeUsageRequest(request);
}

@Override
Expand All @@ -75,8 +75,7 @@ public static class NodeUsageRequest extends BaseNodeRequest {
public NodeUsageRequest() {
}

NodeUsageRequest(String nodeId, NodesUsageRequest request) {
super(nodeId);
NodeUsageRequest(NodesUsageRequest request) {
this.request = request;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,8 +68,8 @@ public TransportNodesSnapshotsStatus(ThreadPool threadPool, ClusterService clust
}

@Override
protected NodeRequest newNodeRequest(String nodeId, Request request) {
return new NodeRequest(nodeId, request);
protected NodeRequest newNodeRequest(Request request) {
return new NodeRequest(request);
}

@Override
Expand Down Expand Up @@ -168,8 +168,7 @@ public static class NodeRequest extends BaseNodeRequest {
public NodeRequest() {
}

NodeRequest(String nodeId, TransportNodesSnapshotsStatus.Request request) {
super(nodeId);
NodeRequest(TransportNodesSnapshotsStatus.Request request) {
snapshots = Arrays.asList(request.snapshots);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,8 +81,8 @@ protected ClusterStatsResponse newResponse(ClusterStatsRequest request,
}

@Override
protected ClusterStatsNodeRequest newNodeRequest(String nodeId, ClusterStatsRequest request) {
return new ClusterStatsNodeRequest(nodeId, request);
protected ClusterStatsNodeRequest newNodeRequest(ClusterStatsRequest request) {
return new ClusterStatsNodeRequest(request);
}

@Override
Expand Down Expand Up @@ -142,8 +142,7 @@ public static class ClusterStatsNodeRequest extends BaseNodeRequest {
public ClusterStatsNodeRequest() {
}

ClusterStatsNodeRequest(String nodeId, ClusterStatsRequest request) {
super(nodeId);
ClusterStatsNodeRequest(ClusterStatsRequest request) {
this.request = request;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,33 +19,31 @@

package org.elasticsearch.action.support.nodes;

import org.elasticsearch.Version;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.transport.TransportRequest;

import java.io.IOException;

// TODO: this class can be removed in master once 7.x is bumped to 7.4.0
public abstract class BaseNodeRequest extends TransportRequest {

private String nodeId;

public BaseNodeRequest() {

}

protected BaseNodeRequest(String nodeId) {
this.nodeId = nodeId;
}
public BaseNodeRequest() {}

@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
nodeId = in.readString();
if (in.getVersion().before(Version.V_7_3_0)) {
in.readString(); // previously nodeId
}
}

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeString(nodeId);
if (out.getVersion().before(Version.V_7_3_0)) {
out.writeString(""); // previously nodeId
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ protected NodesResponse newResponse(NodesRequest request, AtomicReferenceArray n
*/
protected abstract NodesResponse newResponse(NodesRequest request, List<NodeResponse> responses, List<FailedNodeException> failures);

protected abstract NodeRequest newNodeRequest(String nodeId, NodesRequest request);
protected abstract NodeRequest newNodeRequest(NodesRequest request);

protected abstract NodeResponse newNodeResponse();

Expand Down Expand Up @@ -174,7 +174,7 @@ void start() {
final DiscoveryNode node = nodes[i];
final String nodeId = node.getId();
try {
TransportRequest nodeRequest = newNodeRequest(nodeId, request);
TransportRequest nodeRequest = newNodeRequest(request);
if (task != null) {
nodeRequest.setParentTask(clusterService.localNode().getId(), task.getId());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,8 +67,8 @@ public ActionFuture<NodesGatewayMetaState> list(String[] nodesIds, @Nullable Tim
}

@Override
protected NodeRequest newNodeRequest(String nodeId, Request request) {
return new NodeRequest(nodeId);
protected NodeRequest newNodeRequest(Request request) {
return new NodeRequest();
}

@Override
Expand Down Expand Up @@ -114,14 +114,6 @@ protected void writeNodesTo(StreamOutput out, List<NodeGatewayMetaState> nodes)
}

public static class NodeRequest extends BaseNodeRequest {

public NodeRequest() {
}

NodeRequest(String nodeId) {
super(nodeId);
}

}

public static class NodeGatewayMetaState extends BaseNodeResponse {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,8 +93,8 @@ public void list(ShardId shardId, DiscoveryNode[] nodes,
}

@Override
protected NodeRequest newNodeRequest(String nodeId, Request request) {
return new NodeRequest(nodeId, request);
protected NodeRequest newNodeRequest(Request request) {
return new NodeRequest(request);
}

@Override
Expand Down Expand Up @@ -223,8 +223,7 @@ public static class NodeRequest extends BaseNodeRequest {
public NodeRequest() {
}

public NodeRequest(String nodeId, Request request) {
super(nodeId);
public NodeRequest(Request request) {
this.shardId = request.shardId();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,8 +91,8 @@ public void list(ShardId shardId, DiscoveryNode[] nodes, ActionListener<NodesSto
}

@Override
protected NodeRequest newNodeRequest(String nodeId, Request request) {
return new NodeRequest(nodeId, request);
protected NodeRequest newNodeRequest(Request request) {
return new NodeRequest(request);
}

@Override
Expand Down Expand Up @@ -290,8 +290,7 @@ public static class NodeRequest extends BaseNodeRequest {
public NodeRequest() {
}

NodeRequest(String nodeId, TransportNodesListShardStoreMetaData.Request request) {
super(nodeId);
NodeRequest(TransportNodesListShardStoreMetaData.Request request) {
this.shardId = request.shardId;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,35 +60,30 @@ public class CancellableTasksTests extends TaskManagerTestCase {

public static class CancellableNodeRequest extends BaseNodeRequest {
protected String requestName;
protected String nodeId;

public CancellableNodeRequest() {
super();
}

public CancellableNodeRequest(CancellableNodesRequest request, String nodeId) {
super(nodeId);
public CancellableNodeRequest(CancellableNodesRequest request) {
requestName = request.requestName;
this.nodeId = nodeId;
}

@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
requestName = in.readString();
nodeId = in.readString();
}

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeString(requestName);
out.writeString(nodeId);
}

@Override
public String getDescription() {
return "CancellableNodeRequest[" + requestName + ", " + nodeId + "]";
return "CancellableNodeRequest[" + requestName + "]";
}

@Override
Expand Down Expand Up @@ -161,19 +156,19 @@ class CancellableTestNodesAction extends AbstractTestNodesAction<CancellableNode
}

@Override
protected CancellableNodeRequest newNodeRequest(String nodeId, CancellableNodesRequest request) {
return new CancellableNodeRequest(request, nodeId);
protected CancellableNodeRequest newNodeRequest(CancellableNodesRequest request) {
return new CancellableNodeRequest(request);
}

@Override
protected NodeResponse nodeOperation(CancellableNodeRequest request, Task task) {
assert task instanceof CancellableTask;
debugDelay(request.nodeId, "op1");
debugDelay("op1");
if (actionStartedLatch != null) {
actionStartedLatch.countDown();
}

debugDelay(request.nodeId, "op2");
debugDelay("op2");
if (shouldBlock) {
// Simulate a job that takes forever to finish
// Using periodic checks method to identify that the task was cancelled
Expand All @@ -189,7 +184,7 @@ protected NodeResponse nodeOperation(CancellableNodeRequest request, Task task)
Thread.currentThread().interrupt();
}
}
debugDelay(request.nodeId, "op4");
debugDelay("op4");

return new NodeResponse(clusterService.localNode());
}
Expand Down Expand Up @@ -426,9 +421,9 @@ public void onFailure(Exception e) {

}

private static void debugDelay(String nodeId, String name) {
private static void debugDelay(String name) {
// Introduce an additional pseudo random repeatable race conditions
String delayName = RandomizedContext.current().getRunnerSeedAsString() + ":" + nodeId + ":" + name;
String delayName = RandomizedContext.current().getRunnerSeedAsString() + ":" + name;
Random random = new Random(delayName.hashCode());
if (RandomNumbers.randomIntBetween(random, 0, 10) < 1) {
try {
Expand Down
Loading

0 comments on commit 5b4089e

Please sign in to comment.