diff --git a/server/src/main/java/org/elasticsearch/action/admin/cluster/remote/RemoteInfoResponse.java b/server/src/main/java/org/elasticsearch/action/admin/cluster/remote/RemoteInfoResponse.java index 8e9360bdb1238..b8c2c29a0c8f9 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/cluster/remote/RemoteInfoResponse.java +++ b/server/src/main/java/org/elasticsearch/action/admin/cluster/remote/RemoteInfoResponse.java @@ -43,6 +43,10 @@ public final class RemoteInfoResponse extends ActionResponse implements ToXConte this.infos = Collections.unmodifiableList(new ArrayList<>(infos)); } + public List getInfos() { + return infos; + } + @Override public void writeTo(StreamOutput out) throws IOException { super.writeTo(out); diff --git a/server/src/main/java/org/elasticsearch/transport/RemoteConnectionInfo.java b/server/src/main/java/org/elasticsearch/transport/RemoteConnectionInfo.java index 7c51ca7b9c892..b79b79236d958 100644 --- a/server/src/main/java/org/elasticsearch/transport/RemoteConnectionInfo.java +++ b/server/src/main/java/org/elasticsearch/transport/RemoteConnectionInfo.java @@ -92,6 +92,30 @@ public RemoteConnectionInfo(StreamInput input) throws IOException { skipUnavailable = input.readBoolean(); } + public List getSeedNodes() { + return seedNodes; + } + + public int getConnectionsPerCluster() { + return connectionsPerCluster; + } + + public TimeValue getInitialConnectionTimeout() { + return initialConnectionTimeout; + } + + public int getNumNodesConnected() { + return numNodesConnected; + } + + public String getClusterAlias() { + return clusterAlias; + } + + public boolean isSkipUnavailable() { + return skipUnavailable; + } + @Override public void writeTo(StreamOutput out) throws IOException { if (out.getVersion().onOrAfter(Version.V_7_0_0)) { diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowTasksExecutor.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowTasksExecutor.java index d9af0d6e71c04..1a1a60678de42 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowTasksExecutor.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowTasksExecutor.java @@ -46,6 +46,7 @@ import org.elasticsearch.persistent.PersistentTasksExecutor; import org.elasticsearch.tasks.TaskId; import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.NoSuchRemoteClusterException; import org.elasticsearch.xpack.ccr.Ccr; import org.elasticsearch.xpack.ccr.CcrSettings; import org.elasticsearch.xpack.ccr.action.bulk.BulkShardOperationsAction; @@ -114,7 +115,16 @@ protected void innerUpdateMapping(long minRequiredMappingVersion, LongConsumer h final Index followerIndex = params.getFollowShardId().getIndex(); final Index leaderIndex = params.getLeaderShardId().getIndex(); final Supplier timeout = () -> isStopped() ? TimeValue.MINUS_ONE : waitForMetadataTimeOut; - CcrRequests.getIndexMetadata(remoteClient(params), leaderIndex, minRequiredMappingVersion, 0L, timeout, ActionListener.wrap( + + final Client remoteClient; + try { + remoteClient = remoteClient(params); + } catch (NoSuchRemoteClusterException e) { + errorHandler.accept(e); + return; + } + + CcrRequests.getIndexMetadata(remoteClient, leaderIndex, minRequiredMappingVersion, 0L, timeout, ActionListener.wrap( indexMetaData -> { if (indexMetaData.getMappings().isEmpty()) { assert indexMetaData.getMappingVersion() == 1; @@ -173,7 +183,7 @@ protected void innerUpdateSettings(final LongConsumer finalHandler, final Consum }; try { remoteClient(params).admin().cluster().state(clusterStateRequest, ActionListener.wrap(onResponse, errorHandler)); - } catch (Exception e) { + } catch (NoSuchRemoteClusterException e) { errorHandler.accept(e); } } @@ -231,7 +241,7 @@ protected void innerSendShardChangesRequest(long from, int maxOperationCount, Co request.setPollTimeout(params.getReadPollTimeout()); try { remoteClient(params).execute(ShardChangesAction.INSTANCE, request, ActionListener.wrap(handler::accept, errorHandler)); - } catch (Exception e) { + } catch (NoSuchRemoteClusterException e) { errorHandler.accept(e); } } diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/CcrSingleNodeTestCase.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/CcrSingleNodeTestCase.java index c2760aa5efd6b..0316482571eb2 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/CcrSingleNodeTestCase.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/CcrSingleNodeTestCase.java @@ -6,6 +6,8 @@ package org.elasticsearch.xpack; +import org.elasticsearch.action.admin.cluster.remote.RemoteInfoAction; +import org.elasticsearch.action.admin.cluster.remote.RemoteInfoRequest; import org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest; import org.elasticsearch.action.support.ActiveShardCount; import org.elasticsearch.cluster.service.ClusterService; @@ -15,6 +17,7 @@ import org.elasticsearch.license.LicensesMetaData; import org.elasticsearch.plugins.Plugin; import org.elasticsearch.test.ESSingleNodeTestCase; +import org.elasticsearch.transport.RemoteConnectionInfo; import org.elasticsearch.transport.TransportService; import org.elasticsearch.xpack.ccr.CcrSettings; import org.elasticsearch.xpack.ccr.LocalStateCcr; @@ -30,6 +33,7 @@ import java.util.Collection; import java.util.Collections; +import java.util.List; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; import static org.elasticsearch.xpack.CcrIntegTestCase.removeCCRRelatedMetadataFromClusterState; @@ -57,11 +61,17 @@ protected Collection> getPlugins() { } @Before - public void setupLocalRemote() { + public void setupLocalRemote() throws Exception { ClusterUpdateSettingsRequest updateSettingsRequest = new ClusterUpdateSettingsRequest(); String address = getInstanceFromNode(TransportService.class).boundAddress().publishAddress().toString(); updateSettingsRequest.transientSettings(Settings.builder().put("cluster.remote.local.seeds", address)); assertAcked(client().admin().cluster().updateSettings(updateSettingsRequest).actionGet()); + + assertBusy(() -> { + List infos = client().execute(RemoteInfoAction.INSTANCE, new RemoteInfoRequest()).get().getInfos(); + assertThat(infos.size(), equalTo(1)); + assertThat(infos.get(0).getNumNodesConnected(), equalTo(1)); + }); } @Before @@ -76,10 +86,15 @@ public void purgeCCRMetadata() throws Exception { } @After - public void removeLocalRemote() { + public void removeLocalRemote() throws Exception { ClusterUpdateSettingsRequest updateSettingsRequest = new ClusterUpdateSettingsRequest(); updateSettingsRequest.transientSettings(Settings.builder().put("cluster.remote.local.seeds", (String) null)); assertAcked(client().admin().cluster().updateSettings(updateSettingsRequest).actionGet()); + + assertBusy(() -> { + List infos = client().execute(RemoteInfoAction.INSTANCE, new RemoteInfoRequest()).get().getInfos(); + assertThat(infos.size(), equalTo(0)); + }); } protected AutoFollowStats getAutoFollowStats() { diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/LocalIndexFollowingIT.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/LocalIndexFollowingIT.java index 2fc4c45e54e27..0df3f4ea47f43 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/LocalIndexFollowingIT.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/LocalIndexFollowingIT.java @@ -92,7 +92,6 @@ public void testDoNotCreateFollowerIfLeaderDoesNotHaveSoftDeletes() throws Excep assertThat(client().admin().indices().prepareExists("follower-index").get().isExists(), equalTo(false)); } - @AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/38695") public void testRemoveRemoteConnection() throws Exception { PutAutoFollowPatternAction.Request request = new PutAutoFollowPatternAction.Request(); request.setName("my_pattern");