Skip to content

Commit

Permalink
Ensure remote cluster is connected before fetching _field_caps (#24845
Browse files Browse the repository at this point in the history
)

If a cluster disconnects and comes back up we should ensure that
we connected to the cluster before we fire the requests.

Closes #24763
  • Loading branch information
s1monw authored May 24, 2017
1 parent 85a1b2b commit ac6a6d6
Show file tree
Hide file tree
Showing 5 changed files with 159 additions and 33 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -118,38 +118,45 @@ public void onFailure(Exception e) {
for (Map.Entry<String, OriginalIndices> remoteIndices : remoteClusterIndices.entrySet()) {
String clusterAlias = remoteIndices.getKey();
OriginalIndices originalIndices = remoteIndices.getValue();
Transport.Connection connection = remoteClusterService.getConnection(remoteIndices.getKey());
FieldCapabilitiesRequest remoteRequest = new FieldCapabilitiesRequest();
remoteRequest.setMergeResults(false); // we need to merge on this node
remoteRequest.indicesOptions(originalIndices.indicesOptions());
remoteRequest.indices(originalIndices.indices());
remoteRequest.fields(request.fields());
transportService.sendRequest(connection, FieldCapabilitiesAction.NAME, remoteRequest, TransportRequestOptions.EMPTY,
new TransportResponseHandler<FieldCapabilitiesResponse>() {
@Override
public FieldCapabilitiesResponse newInstance() {
return new FieldCapabilitiesResponse();
}

@Override
public void handleResponse(FieldCapabilitiesResponse response) {
for (FieldCapabilitiesIndexResponse res : response.getIndexResponses()) {
indexResponses.add(new FieldCapabilitiesIndexResponse(RemoteClusterAware.buildRemoteIndexName(clusterAlias,
res.getIndexName()), res.get()));
}
onResponse.run();
}

@Override
public void handleException(TransportException exp) {
onResponse.run();
}

@Override
public String executor() {
return ThreadPool.Names.SAME;
}
});
// if we are connected this is basically a no-op, if we are not we try to connect in parallel in a non-blocking fashion
remoteClusterService.ensureConnected(clusterAlias, ActionListener.wrap(v -> {
Transport.Connection connection = remoteClusterService.getConnection(clusterAlias);
FieldCapabilitiesRequest remoteRequest = new FieldCapabilitiesRequest();
remoteRequest.setMergeResults(false); // we need to merge on this node
remoteRequest.indicesOptions(originalIndices.indicesOptions());
remoteRequest.indices(originalIndices.indices());
remoteRequest.fields(request.fields());
transportService.sendRequest(connection, FieldCapabilitiesAction.NAME, remoteRequest, TransportRequestOptions.EMPTY,
new TransportResponseHandler<FieldCapabilitiesResponse>() {

@Override
public FieldCapabilitiesResponse newInstance() {
return new FieldCapabilitiesResponse();
}

@Override
public void handleResponse(FieldCapabilitiesResponse response) {
try {
for (FieldCapabilitiesIndexResponse res : response.getIndexResponses()) {
indexResponses.add(new FieldCapabilitiesIndexResponse(RemoteClusterAware.
buildRemoteIndexName(clusterAlias, res.getIndexName()), res.get()));
}
} finally {
onResponse.run();
}
}

@Override
public void handleException(TransportException exp) {
onResponse.run();
}

@Override
public String executor() {
return ThreadPool.Names.SAME;
}
});
}, e -> onResponse.run()));
}

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -160,12 +160,24 @@ public void fetchSearchShards(ClusterSearchShardsRequest searchRequest,
// we can't proceed with a search on a cluster level.
// in the future we might want to just skip the remote nodes in such a case but that can already be implemented on the caller
// end since they provide the listener.
connectHandler.connect(ActionListener.wrap((x) -> fetchShardsInternal(searchRequest, listener), listener::onFailure));
ensureConnected(ActionListener.wrap((x) -> fetchShardsInternal(searchRequest, listener), listener::onFailure));
} else {
fetchShardsInternal(searchRequest, listener);
}
}

/**
* Ensures that this cluster is connected. If the cluster is connected this operation
* will invoke the listener immediately.
*/
public void ensureConnected(ActionListener<Void> voidActionListener) {
if (connectedNodes.isEmpty()) {
connectHandler.connect(voidActionListener);
} else {
voidActionListener.onResponse(null);
}
}

private void fetchShardsInternal(ClusterSearchShardsRequest searchShardsRequest,
final ActionListener<ClusterSearchShardsResponse> listener) {
final DiscoveryNode node = nodeSupplier.get();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.lucene.util.IOUtils;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.LatchedActionListener;
import org.elasticsearch.action.OriginalIndices;
import org.elasticsearch.action.admin.cluster.shards.ClusterSearchShardsGroup;
import org.elasticsearch.action.admin.cluster.shards.ClusterSearchShardsRequest;
Expand All @@ -46,6 +47,7 @@
import java.io.Closeable;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
Expand Down Expand Up @@ -265,6 +267,18 @@ public Transport.Connection getConnection(DiscoveryNode node, String cluster) {
return connection.getConnection(node);
}

/**
* Ensures that the given cluster alias is connected. If the cluster is connected this operation
* will invoke the listener immediately.
*/
public void ensureConnected(String clusterAlias, ActionListener<Void> listener) {
RemoteClusterConnection remoteClusterConnection = remoteClusters.get(clusterAlias);
if (remoteClusterConnection == null) {
throw new IllegalArgumentException("no such remote cluster: " + clusterAlias);
}
remoteClusterConnection.ensureConnected(listener);
}

public Transport.Connection getConnection(String cluster) {
RemoteClusterConnection connection = remoteClusters.get(cluster);
if (connection == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.elasticsearch.Build;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.LatchedActionListener;
import org.elasticsearch.action.admin.cluster.node.info.NodeInfo;
import org.elasticsearch.action.admin.cluster.node.info.NodesInfoAction;
import org.elasticsearch.action.admin.cluster.node.info.NodesInfoRequest;
Expand Down Expand Up @@ -730,4 +731,58 @@ public void onFailure(Exception e) {
}
return statsRef.get();
}

public void testEnsureConnected() throws IOException, InterruptedException {
List<DiscoveryNode> knownNodes = new CopyOnWriteArrayList<>();
try (MockTransportService seedTransport = startTransport("seed_node", knownNodes, Version.CURRENT);
MockTransportService discoverableTransport = startTransport("discoverable_node", knownNodes, Version.CURRENT)) {
DiscoveryNode seedNode = seedTransport.getLocalDiscoNode();
DiscoveryNode discoverableNode = discoverableTransport.getLocalDiscoNode();
knownNodes.add(seedTransport.getLocalDiscoNode());
knownNodes.add(discoverableTransport.getLocalDiscoNode());
Collections.shuffle(knownNodes, random());

try (MockTransportService service = MockTransportService.createNewService(Settings.EMPTY, Version.CURRENT, threadPool, null)) {
service.start();
service.acceptIncomingRequests();
try (RemoteClusterConnection connection = new RemoteClusterConnection(Settings.EMPTY, "test-cluster",
Arrays.asList(seedNode), service, Integer.MAX_VALUE, n -> true)) {
assertFalse(service.nodeConnected(seedNode));
assertFalse(service.nodeConnected(discoverableNode));
assertTrue(connection.assertNoRunningConnections());
CountDownLatch latch = new CountDownLatch(1);
connection.ensureConnected(new LatchedActionListener<>(new ActionListener<Void>() {
@Override
public void onResponse(Void aVoid) {
}

@Override
public void onFailure(Exception e) {
throw new AssertionError(e);
}
}, latch));
latch.await();
assertTrue(service.nodeConnected(seedNode));
assertTrue(service.nodeConnected(discoverableNode));
assertTrue(connection.assertNoRunningConnections());

// exec again we are already connected
connection.ensureConnected(new LatchedActionListener<>(new ActionListener<Void>() {
@Override
public void onResponse(Void aVoid) {
}

@Override
public void onFailure(Exception e) {
throw new AssertionError(e);
}
}, latch));
latch.await();
assertTrue(service.nodeConnected(seedNode));
assertTrue(service.nodeConnected(discoverableNode));
assertTrue(connection.assertNoRunningConnections());
}
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,44 @@ public void testIncrementallyAddClusters() throws IOException {
}
}

public void testEnsureConnected() throws IOException {
List<DiscoveryNode> knownNodes = new CopyOnWriteArrayList<>();
try (MockTransportService seedTransport = startTransport("cluster_1_node", knownNodes, Version.CURRENT);
MockTransportService otherSeedTransport = startTransport("cluster_2_node", knownNodes, Version.CURRENT)) {
DiscoveryNode seedNode = seedTransport.getLocalDiscoNode();
DiscoveryNode otherSeedNode = otherSeedTransport.getLocalDiscoNode();
knownNodes.add(seedTransport.getLocalDiscoNode());
knownNodes.add(otherSeedTransport.getLocalDiscoNode());
Collections.shuffle(knownNodes, random());

try (MockTransportService transportService = MockTransportService.createNewService(Settings.EMPTY, Version.CURRENT, threadPool,
null)) {
transportService.start();
transportService.acceptIncomingRequests();
Settings.Builder builder = Settings.builder();
builder.putArray("search.remote.cluster_1.seeds", seedNode.getAddress().toString());
builder.putArray("search.remote.cluster_2.seeds", otherSeedNode.getAddress().toString());
try (RemoteClusterService service = new RemoteClusterService(Settings.EMPTY, transportService)) {
assertFalse(service.isCrossClusterSearchEnabled());
service.initializeRemoteClusters();
assertFalse(service.isCrossClusterSearchEnabled());
service.updateRemoteCluster("cluster_1", Collections.singletonList(seedNode.getAddress().address()));
assertTrue(service.isCrossClusterSearchEnabled());
assertTrue(service.isRemoteClusterRegistered("cluster_1"));
service.updateRemoteCluster("cluster_2", Collections.singletonList(otherSeedNode.getAddress().address()));
assertTrue(service.isCrossClusterSearchEnabled());
assertTrue(service.isRemoteClusterRegistered("cluster_1"));
assertTrue(service.isRemoteClusterRegistered("cluster_2"));
service.updateRemoteCluster("cluster_2", Collections.emptyList());
assertFalse(service.isRemoteClusterRegistered("cluster_2"));
IllegalArgumentException iae = expectThrows(IllegalArgumentException.class,
() -> service.updateRemoteCluster(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY, Collections.emptyList()));
assertEquals("remote clusters must not have the empty string as its key", iae.getMessage());
}
}
}
}

public void testRemoteNodeAttribute() throws IOException, InterruptedException {
final Settings settings =
Settings.builder().put("search.remote.node.attr", "gateway").build();
Expand Down

0 comments on commit ac6a6d6

Please sign in to comment.