Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use remote client in TransportFieldCapsAction #30838

Merged
merged 2 commits into from
May 24, 2018
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.elasticsearch.action.OriginalIndices;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.HandledTransportAction;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.service.ClusterService;
Expand All @@ -33,10 +34,6 @@
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.RemoteClusterAware;
import org.elasticsearch.transport.RemoteClusterService;
import org.elasticsearch.transport.Transport;
import org.elasticsearch.transport.TransportException;
import org.elasticsearch.transport.TransportRequestOptions;
import org.elasticsearch.transport.TransportResponseHandler;
import org.elasticsearch.transport.TransportService;

import java.util.ArrayList;
Expand All @@ -49,7 +46,6 @@ public class TransportFieldCapabilitiesAction extends HandledTransportAction<Fie
private final ClusterService clusterService;
private final TransportFieldCapabilitiesIndexAction shardAction;
private final RemoteClusterService remoteClusterService;
private final TransportService transportService;

@Inject
public TransportFieldCapabilitiesAction(Settings settings, TransportService transportService,
Expand All @@ -62,7 +58,6 @@ public TransportFieldCapabilitiesAction(Settings settings, TransportService tran
actionFilters, indexNameExpressionResolver, FieldCapabilitiesRequest::new);
this.clusterService = clusterService;
this.remoteClusterService = transportService.getRemoteClusterService();
this.transportService = transportService;
this.shardAction = shardAction;
}

Expand Down Expand Up @@ -118,47 +113,20 @@ public void onFailure(Exception e) {
for (Map.Entry<String, OriginalIndices> remoteIndices : remoteClusterIndices.entrySet()) {
String clusterAlias = remoteIndices.getKey();
OriginalIndices originalIndices = remoteIndices.getValue();
// if we are connected this is basically a no-op, if we are not we try to connect in parallel in a non-blocking fashion
remoteClusterService.ensureConnected(clusterAlias, ActionListener.wrap(v -> {
Transport.Connection connection = remoteClusterService.getConnection(clusterAlias);
FieldCapabilitiesRequest remoteRequest = new FieldCapabilitiesRequest();
remoteRequest.setMergeResults(false); // we need to merge on this node
remoteRequest.indicesOptions(originalIndices.indicesOptions());
remoteRequest.indices(originalIndices.indices());
remoteRequest.fields(request.fields());
transportService.sendRequest(connection, FieldCapabilitiesAction.NAME, remoteRequest, TransportRequestOptions.EMPTY,
new TransportResponseHandler<FieldCapabilitiesResponse>() {

@Override
public FieldCapabilitiesResponse newInstance() {
return new FieldCapabilitiesResponse();
}

@Override
public void handleResponse(FieldCapabilitiesResponse response) {
try {
for (FieldCapabilitiesIndexResponse res : response.getIndexResponses()) {
indexResponses.add(new FieldCapabilitiesIndexResponse(RemoteClusterAware.
buildRemoteIndexName(clusterAlias, res.getIndexName()), res.get()));
}
} finally {
onResponse.run();
}
}

@Override
public void handleException(TransportException exp) {
onResponse.run();
}

@Override
public String executor() {
return ThreadPool.Names.SAME;
}
});
}, e -> onResponse.run()));
Client remoteClusterClient = remoteClusterService.getRemoteClusterClient(threadPool, clusterAlias);
FieldCapabilitiesRequest remoteRequest = new FieldCapabilitiesRequest();
remoteRequest.setMergeResults(false); // we need to merge on this node
remoteRequest.indicesOptions(originalIndices.indicesOptions());
remoteRequest.indices(originalIndices.indices());
remoteRequest.fields(request.fields());
remoteClusterClient.fieldCaps(remoteRequest, ActionListener.wrap(response -> {
for (FieldCapabilitiesIndexResponse res : response.getIndexResponses()) {
indexResponses.add(new FieldCapabilitiesIndexResponse(RemoteClusterAware.
buildRemoteIndexName(clusterAlias, res.getIndexName()), res.get()));
}
onResponse.run();
}, failure -> onResponse.run()));
}

}
}

Expand Down