Skip to content

Commit

Permalink
Fix LocalIndexFollowingIT#testRemoveRemoteConnection() test (#38709)
Browse files Browse the repository at this point in the history
* During fetching remote mapping if remote client is missing then
`NoSuchRemoteClusterException` was not handled.
* When adding remote connection, check that it is really connected
before continue-ing to run the tests.

Relates to #38695
  • Loading branch information
martijnvg committed Feb 18, 2019
1 parent 03b2ec6 commit ed08bc3
Show file tree
Hide file tree
Showing 5 changed files with 58 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,10 @@ public final class RemoteInfoResponse extends ActionResponse implements ToXConte
this.infos = Collections.unmodifiableList(new ArrayList<>(infos));
}

public List<RemoteConnectionInfo> getInfos() {
return infos;
}

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,30 @@ public RemoteConnectionInfo(StreamInput input) throws IOException {
skipUnavailable = input.readBoolean();
}

public List<String> getSeedNodes() {
return seedNodes;
}

public int getConnectionsPerCluster() {
return connectionsPerCluster;
}

public TimeValue getInitialConnectionTimeout() {
return initialConnectionTimeout;
}

public int getNumNodesConnected() {
return numNodesConnected;
}

public String getClusterAlias() {
return clusterAlias;
}

public boolean isSkipUnavailable() {
return skipUnavailable;
}

@Override
public void writeTo(StreamOutput out) throws IOException {
if (out.getVersion().onOrAfter(Version.V_7_0_0)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import org.elasticsearch.persistent.PersistentTasksExecutor;
import org.elasticsearch.tasks.TaskId;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.NoSuchRemoteClusterException;
import org.elasticsearch.xpack.ccr.Ccr;
import org.elasticsearch.xpack.ccr.CcrSettings;
import org.elasticsearch.xpack.ccr.action.bulk.BulkShardOperationsAction;
Expand Down Expand Up @@ -114,7 +115,16 @@ protected void innerUpdateMapping(long minRequiredMappingVersion, LongConsumer h
final Index followerIndex = params.getFollowShardId().getIndex();
final Index leaderIndex = params.getLeaderShardId().getIndex();
final Supplier<TimeValue> timeout = () -> isStopped() ? TimeValue.MINUS_ONE : waitForMetadataTimeOut;
CcrRequests.getIndexMetadata(remoteClient(params), leaderIndex, minRequiredMappingVersion, 0L, timeout, ActionListener.wrap(

final Client remoteClient;
try {
remoteClient = remoteClient(params);
} catch (NoSuchRemoteClusterException e) {
errorHandler.accept(e);
return;
}

CcrRequests.getIndexMetadata(remoteClient, leaderIndex, minRequiredMappingVersion, 0L, timeout, ActionListener.wrap(
indexMetaData -> {
if (indexMetaData.getMappings().isEmpty()) {
assert indexMetaData.getMappingVersion() == 1;
Expand Down Expand Up @@ -173,7 +183,7 @@ protected void innerUpdateSettings(final LongConsumer finalHandler, final Consum
};
try {
remoteClient(params).admin().cluster().state(clusterStateRequest, ActionListener.wrap(onResponse, errorHandler));
} catch (Exception e) {
} catch (NoSuchRemoteClusterException e) {
errorHandler.accept(e);
}
}
Expand Down Expand Up @@ -231,7 +241,7 @@ protected void innerSendShardChangesRequest(long from, int maxOperationCount, Co
request.setPollTimeout(params.getReadPollTimeout());
try {
remoteClient(params).execute(ShardChangesAction.INSTANCE, request, ActionListener.wrap(handler::accept, errorHandler));
} catch (Exception e) {
} catch (NoSuchRemoteClusterException e) {
errorHandler.accept(e);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@

package org.elasticsearch.xpack;

import org.elasticsearch.action.admin.cluster.remote.RemoteInfoAction;
import org.elasticsearch.action.admin.cluster.remote.RemoteInfoRequest;
import org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest;
import org.elasticsearch.action.support.ActiveShardCount;
import org.elasticsearch.cluster.service.ClusterService;
Expand All @@ -15,6 +17,7 @@
import org.elasticsearch.license.LicensesMetaData;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.test.ESSingleNodeTestCase;
import org.elasticsearch.transport.RemoteConnectionInfo;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.ccr.CcrSettings;
import org.elasticsearch.xpack.ccr.LocalStateCcr;
Expand All @@ -30,6 +33,7 @@

import java.util.Collection;
import java.util.Collections;
import java.util.List;

import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.elasticsearch.xpack.CcrIntegTestCase.removeCCRRelatedMetadataFromClusterState;
Expand Down Expand Up @@ -57,11 +61,17 @@ protected Collection<Class<? extends Plugin>> getPlugins() {
}

@Before
public void setupLocalRemote() {
public void setupLocalRemote() throws Exception {
ClusterUpdateSettingsRequest updateSettingsRequest = new ClusterUpdateSettingsRequest();
String address = getInstanceFromNode(TransportService.class).boundAddress().publishAddress().toString();
updateSettingsRequest.transientSettings(Settings.builder().put("cluster.remote.local.seeds", address));
assertAcked(client().admin().cluster().updateSettings(updateSettingsRequest).actionGet());

assertBusy(() -> {
List<RemoteConnectionInfo> infos = client().execute(RemoteInfoAction.INSTANCE, new RemoteInfoRequest()).get().getInfos();
assertThat(infos.size(), equalTo(1));
assertThat(infos.get(0).getNumNodesConnected(), equalTo(1));
});
}

@Before
Expand All @@ -76,10 +86,15 @@ public void purgeCCRMetadata() throws Exception {
}

@After
public void removeLocalRemote() {
public void removeLocalRemote() throws Exception {
ClusterUpdateSettingsRequest updateSettingsRequest = new ClusterUpdateSettingsRequest();
updateSettingsRequest.transientSettings(Settings.builder().put("cluster.remote.local.seeds", (String) null));
assertAcked(client().admin().cluster().updateSettings(updateSettingsRequest).actionGet());

assertBusy(() -> {
List<RemoteConnectionInfo> infos = client().execute(RemoteInfoAction.INSTANCE, new RemoteInfoRequest()).get().getInfos();
assertThat(infos.size(), equalTo(0));
});
}

protected AutoFollowStats getAutoFollowStats() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,6 @@ public void testDoNotCreateFollowerIfLeaderDoesNotHaveSoftDeletes() throws Excep
assertThat(client().admin().indices().prepareExists("follower-index").get().isExists(), equalTo(false));
}

@AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/38695")
public void testRemoveRemoteConnection() throws Exception {
PutAutoFollowPatternAction.Request request = new PutAutoFollowPatternAction.Request();
request.setName("my_pattern");
Expand Down

0 comments on commit ed08bc3

Please sign in to comment.