Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Responses can use Writeable.Reader interface #34655

Merged
merged 11 commits into from
Oct 26, 2018
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.lease.Releasables;
import org.elasticsearch.common.network.NetworkService;
import org.elasticsearch.common.settings.Settings;
Expand Down Expand Up @@ -102,7 +103,7 @@ public void testScheduledPing() throws Exception {
TransportRequest.Empty.INSTANCE, TransportRequestOptions.builder().withCompress(randomBoolean()).build(),
new TransportResponseHandler<TransportResponse.Empty>() {
@Override
public TransportResponse.Empty newInstance() {
public TransportResponse.Empty read(StreamInput in) {
return TransportResponse.Empty.INSTANCE;
}

Expand Down
15 changes: 15 additions & 0 deletions server/src/main/java/org/elasticsearch/action/Action.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

package org.elasticsearch.action;

import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.transport.TransportRequestOptions;

Expand All @@ -45,9 +46,23 @@ public String name() {

/**
* Creates a new response instance.
* @deprecated Implement {@link #getResponseReader()} instead and make this method throw an
* {@link UnsupportedOperationException}
*/
@Deprecated
public abstract Response newResponse();

/**
* Get a reader that can create a new instance of the class from a {@link org.elasticsearch.common.io.stream.StreamInput}
*/
public Writeable.Reader<Response> getResponseReader() {
return in -> {
Response response = newResponse();
response.readFrom(in);
return response;
};
}

/**
* Optional request options for the action.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,15 @@

package org.elasticsearch.action;

import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportResponseHandler;
import org.elasticsearch.transport.TransportException;
import org.elasticsearch.transport.TransportResponse;

import java.io.IOException;
import java.util.Objects;
import java.util.function.Supplier;

/**
* A simple base class for action response listeners, defaulting to using the SAME executor (as its
Expand All @@ -34,11 +36,11 @@
public class ActionListenerResponseHandler<Response extends TransportResponse> implements TransportResponseHandler<Response> {

private final ActionListener<? super Response> listener;
private final Supplier<Response> responseSupplier;
private final Writeable.Reader<Response> reader;

public ActionListenerResponseHandler(ActionListener<? super Response> listener, Supplier<Response> responseSupplier) {
public ActionListenerResponseHandler(ActionListener<? super Response> listener, Writeable.Reader<Response> reader) {
this.listener = Objects.requireNonNull(listener);
this.responseSupplier = Objects.requireNonNull(responseSupplier);
this.reader = Objects.requireNonNull(reader);
}

@Override
Expand All @@ -52,12 +54,12 @@ public void handleException(TransportException e) {
}

@Override
public Response newInstance() {
return responseSupplier.get();
public String executor() {
return ThreadPool.Names.SAME;
}

@Override
public String executor() {
return ThreadPool.Names.SAME;
public Response read(StreamInput in) throws IOException {
return reader.read(in);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,13 @@
*/
public abstract class ActionResponse extends TransportResponse {

public ActionResponse() {
}

public ActionResponse(StreamInput in) throws IOException {
super(in);
}

@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,6 @@ public void execute(final DiscoveryNode node, final Request request, final Actio
return;
}
transportService.sendRequest(node, action.name(), request, transportOptions,
new ActionListenerResponseHandler<>(listener, action::newResponse));
new ActionListenerResponseHandler<>(listener, action.getResponseReader()));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.common.xcontent.LoggingDeprecationHandler;
Expand Down Expand Up @@ -119,8 +120,10 @@ private void runOnNodeWithTaskIfPossible(Task thisTask, GetTaskRequest request,
transportService.sendRequest(node, GetTaskAction.NAME, nodeRequest, builder.build(),
new TransportResponseHandler<GetTaskResponse>() {
@Override
public GetTaskResponse newInstance() {
return new GetTaskResponse();
public GetTaskResponse read(StreamInput in) throws IOException {
GetTaskResponse response = new GetTaskResponse();
response.readFrom(in);
return response;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
package org.elasticsearch.action.admin.cluster.shards;

import org.elasticsearch.action.Action;
import org.elasticsearch.common.io.stream.Writeable;

public class ClusterSearchShardsAction extends Action<ClusterSearchShardsResponse> {

Expand All @@ -32,6 +33,11 @@ private ClusterSearchShardsAction() {

@Override
public ClusterSearchShardsResponse newResponse() {
return new ClusterSearchShardsResponse();
throw new UnsupportedOperationException("usage of Streamable is to be replaced by Writeable");
}

@Override
public Writeable.Reader<ClusterSearchShardsResponse> getResponseReader() {
return ClusterSearchShardsResponse::new;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,12 +38,27 @@ public class ClusterSearchShardsResponse extends ActionResponse implements ToXCo
public static final ClusterSearchShardsResponse EMPTY = new ClusterSearchShardsResponse(new ClusterSearchShardsGroup[0],
new DiscoveryNode[0], Collections.emptyMap());

private ClusterSearchShardsGroup[] groups;
private DiscoveryNode[] nodes;
private Map<String, AliasFilter> indicesAndFilters;

public ClusterSearchShardsResponse() {
private final ClusterSearchShardsGroup[] groups;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

❤️

private final DiscoveryNode[] nodes;
private final Map<String, AliasFilter> indicesAndFilters;

public ClusterSearchShardsResponse(StreamInput in) throws IOException {
super(in);
groups = new ClusterSearchShardsGroup[in.readVInt()];
for (int i = 0; i < groups.length; i++) {
groups[i] = ClusterSearchShardsGroup.readSearchShardsGroupResponse(in);
}
nodes = new DiscoveryNode[in.readVInt()];
for (int i = 0; i < nodes.length; i++) {
nodes[i] = new DiscoveryNode(in);
}
int size = in.readVInt();
indicesAndFilters = new HashMap<>();
for (int i = 0; i < size; i++) {
String index = in.readString();
AliasFilter aliasFilter = new AliasFilter(in);
indicesAndFilters.put(index, aliasFilter);
}
}

public ClusterSearchShardsResponse(ClusterSearchShardsGroup[] groups, DiscoveryNode[] nodes,
Expand All @@ -67,22 +82,7 @@ public Map<String, AliasFilter> getIndicesAndFilters() {

@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
groups = new ClusterSearchShardsGroup[in.readVInt()];
for (int i = 0; i < groups.length; i++) {
groups[i] = ClusterSearchShardsGroup.readSearchShardsGroupResponse(in);
}
nodes = new DiscoveryNode[in.readVInt()];
for (int i = 0; i < nodes.length; i++) {
nodes[i] = new DiscoveryNode(in);
}
int size = in.readVInt();
indicesAndFilters = new HashMap<>();
for (int i = 0; i < size; i++) {
String index = in.readString();
AliasFilter aliasFilter = new AliasFilter(in);
indicesAndFilters.put(index, aliasFilter);
}
throw new UnsupportedOperationException("usage of Streamable is to be replaced by Writeable");
}

@Override
jaymode marked this conversation as resolved.
Show resolved Hide resolved
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,13 +32,15 @@
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.search.internal.AliasFilter;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;

import java.io.IOException;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
Expand Down Expand Up @@ -72,7 +74,12 @@ protected ClusterBlockException checkBlock(ClusterSearchShardsRequest request, C

@Override
protected ClusterSearchShardsResponse newResponse() {
return new ClusterSearchShardsResponse();
throw new UnsupportedOperationException("usage of Streamable is to be replaced by Writeable");
}

@Override
protected ClusterSearchShardsResponse read(StreamInput in) throws IOException {
return new ClusterSearchShardsResponse(in);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ public IngestActionForwarder(TransportService transportService) {

public void forwardIngestRequest(Action<?> action, ActionRequest request, ActionListener<?> listener) {
transportService.sendRequest(randomIngestNode(), action.name(), request,
new ActionListenerResponseHandler(listener, action::newResponse));
new ActionListenerResponseHandler(listener, action.getResponseReader()));
}

private DiscoveryNode randomIngestNode() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.seqno.SequenceNumbers;
Expand All @@ -45,6 +46,7 @@
import org.elasticsearch.transport.TransportResponseHandler;
import org.elasticsearch.transport.TransportService;

import java.io.IOException;
import java.util.function.Consumer;
import java.util.function.Supplier;

Expand Down Expand Up @@ -151,8 +153,10 @@ public void sync(ResyncReplicationRequest request, Task parentTask, String prima
transportOptions,
new TransportResponseHandler<ResyncReplicationResponse>() {
@Override
public ResyncReplicationResponse newInstance() {
return newResponseInstance();
public ResyncReplicationResponse read(StreamInput in) throws IOException {
ResyncReplicationResponse response = newResponseInstance();
response.readFrom(in);
return response;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,10 @@ public Exception getFailure() {
MultiSearchResponse() {
}

MultiSearchResponse(StreamInput in) throws IOException {
readFrom(in);
}

public MultiSearchResponse(Item[] items, long tookInMillis) {
this.items = items;
this.tookInMillis = tookInMillis;
Expand Down
Loading