From c2bdb4e46df33badb2bcee7ca6735f1c5241c8f4 Mon Sep 17 00:00:00 2001 From: Martijn van Groningen Date: Wed, 5 Dec 2018 16:07:27 +0100 Subject: [PATCH] [CCR] Change AutofollowCoordinator to use wait_for_metadata_version Changed AutofollowCoordinator makes use of the wait_for_metadata_version feature in cluster state API and removed hard coded poll interval. Originates from #35895 Relates to #33007 --- .../java/org/elasticsearch/xpack/ccr/Ccr.java | 2 +- .../xpack/ccr/CcrLicenseChecker.java | 11 +- .../ccr/action/AutoFollowCoordinator.java | 40 ++-- .../TransportPutAutoFollowPatternAction.java | 5 +- .../action/AutoFollowCoordinatorTests.java | 191 ++++++++++++++---- 5 files changed, 188 insertions(+), 61 deletions(-) diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/Ccr.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/Ccr.java index 6e09d29ba4f29..b990de071de39 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/Ccr.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/Ccr.java @@ -155,7 +155,7 @@ public Collection createComponents( return Arrays.asList( ccrLicenseChecker, - new AutoFollowCoordinator(client, threadPool, clusterService, ccrLicenseChecker) + new AutoFollowCoordinator(client, clusterService, ccrLicenseChecker) ); } diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/CcrLicenseChecker.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/CcrLicenseChecker.java index 77ac94da4aa53..07d136c95c60d 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/CcrLicenseChecker.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/CcrLicenseChecker.java @@ -121,8 +121,9 @@ public void checkRemoteClusterLicenseAndFetchLeaderIndexMetadataAndHistoryUUIDs( client.getRemoteClusterClient(clusterAlias), request, onFailure, - leaderClusterState -> { - IndexMetaData leaderIndexMetaData = leaderClusterState.getMetaData().index(leaderIndex); + remoteClusterStateRsp -> { + ClusterState remoteClusterState = remoteClusterStateRsp.getState(); + IndexMetaData leaderIndexMetaData = remoteClusterState.getMetaData().index(leaderIndex); if (leaderIndexMetaData == null) { onFailure.accept(new IndexNotFoundException(leaderIndex)); return; @@ -159,7 +160,7 @@ public void checkRemoteClusterLicenseAndFetchClusterState( final String clusterAlias, final ClusterStateRequest request, final Consumer onFailure, - final Consumer leaderClusterStateConsumer) { + final Consumer leaderClusterStateConsumer) { try { Client remoteClient = systemClient(client.getRemoteClusterClient(clusterAlias)); checkRemoteClusterLicenseAndFetchClusterState( @@ -199,7 +200,7 @@ private void checkRemoteClusterLicenseAndFetchClusterState( final Client remoteClient, final ClusterStateRequest request, final Consumer onFailure, - final Consumer leaderClusterStateConsumer, + final Consumer leaderClusterStateConsumer, final Function nonCompliantLicense, final Function unknownLicense) { // we have to check the license on the remote cluster @@ -211,7 +212,7 @@ private void checkRemoteClusterLicenseAndFetchClusterState( public void onResponse(final RemoteClusterLicenseChecker.LicenseCheck licenseCheck) { if (licenseCheck.isSuccess()) { final ActionListener clusterStateListener = - ActionListener.wrap(s -> leaderClusterStateConsumer.accept(s.getState()), onFailure); + ActionListener.wrap(leaderClusterStateConsumer::accept, onFailure); // following an index in remote cluster, so use remote client to fetch leader index metadata remoteClient.admin().cluster().state(request, clusterStateListener); } else { diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/AutoFollowCoordinator.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/AutoFollowCoordinator.java index 707cd1abe5e8d..081c9df962364 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/AutoFollowCoordinator.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/AutoFollowCoordinator.java @@ -12,6 +12,7 @@ import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.admin.cluster.state.ClusterStateRequest; +import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse; import org.elasticsearch.client.Client; import org.elasticsearch.cluster.ClusterChangedEvent; import org.elasticsearch.cluster.ClusterState; @@ -23,13 +24,11 @@ import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.collect.CopyOnWriteHashMap; import org.elasticsearch.common.collect.Tuple; -import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.concurrent.AtomicArray; import org.elasticsearch.common.util.concurrent.CountDown; import org.elasticsearch.index.Index; import org.elasticsearch.index.IndexSettings; import org.elasticsearch.license.LicenseUtils; -import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.xpack.ccr.CcrLicenseChecker; import org.elasticsearch.xpack.core.ccr.AutoFollowMetadata; import org.elasticsearch.xpack.core.ccr.AutoFollowMetadata.AutoFollowPattern; @@ -62,7 +61,6 @@ public class AutoFollowCoordinator implements ClusterStateListener { private static final int MAX_AUTO_FOLLOW_ERRORS = 256; private final Client client; - private final ThreadPool threadPool; private final ClusterService clusterService; private final CcrLicenseChecker ccrLicenseChecker; @@ -76,11 +74,9 @@ public class AutoFollowCoordinator implements ClusterStateListener { public AutoFollowCoordinator( Client client, - ThreadPool threadPool, ClusterService clusterService, CcrLicenseChecker ccrLicenseChecker) { this.client = client; - this.threadPool = threadPool; this.clusterService = clusterService; this.ccrLicenseChecker = Objects.requireNonNull(ccrLicenseChecker, "ccrLicenseChecker"); clusterService.addListener(this); @@ -146,22 +142,24 @@ void updateAutoFollowers(ClusterState followerClusterState) { Map newAutoFollowers = new HashMap<>(newRemoteClusters.size()); for (String remoteCluster : newRemoteClusters) { - AutoFollower autoFollower = new AutoFollower(remoteCluster, threadPool, this::updateStats, clusterService::state) { + AutoFollower autoFollower = new AutoFollower(remoteCluster, this::updateStats, clusterService::state) { @Override - void getLeaderClusterState(final String remoteCluster, - final BiConsumer handler) { + void getRemoteClusterState(final String remoteCluster, + final long metadataVersion, + final BiConsumer handler) { final ClusterStateRequest request = new ClusterStateRequest(); request.clear(); request.metaData(true); request.routingTable(true); + request.waitForMetaDataVersion(metadataVersion); // TODO: set non-compliant status on auto-follow coordination that can be viewed via a stats API ccrLicenseChecker.checkRemoteClusterLicenseAndFetchClusterState( client, remoteCluster, request, e -> handler.accept(null, e), - leaderClusterState -> handler.accept(leaderClusterState, null)); + remoteClusterStateRsp -> handler.accept(remoteClusterStateRsp, null)); } @Override @@ -235,19 +233,17 @@ public void clusterChanged(ClusterChangedEvent event) { abstract static class AutoFollower { private final String remoteCluster; - private final ThreadPool threadPool; private final Consumer> statsUpdater; private final Supplier followerClusterStateSupplier; + private volatile long metadataVersion = 0; private volatile CountDown autoFollowPatternsCountDown; private volatile AtomicArray autoFollowResults; AutoFollower(final String remoteCluster, - final ThreadPool threadPool, final Consumer> statsUpdater, final Supplier followerClusterStateSupplier) { this.remoteCluster = remoteCluster; - this.threadPool = threadPool; this.statsUpdater = statsUpdater; this.followerClusterStateSupplier = followerClusterStateSupplier; } @@ -272,10 +268,15 @@ void autoFollowIndices() { this.autoFollowPatternsCountDown = new CountDown(patterns.size()); this.autoFollowResults = new AtomicArray<>(patterns.size()); - getLeaderClusterState(remoteCluster, (leaderClusterState, e) -> { - if (leaderClusterState != null) { + getRemoteClusterState(remoteCluster, metadataVersion + 1, (leaderClusterStateRsp, e) -> { + if (leaderClusterStateRsp != null) { assert e == null; + if (leaderClusterStateRsp.isWaitForTimedOut()) { + autoFollowIndices(); + return; + } + ClusterState leaderClusterState = leaderClusterStateRsp.getState(); int i = 0; for (String autoFollowPatternName : patterns) { final int slot = i; @@ -392,8 +393,7 @@ private void finalise(int slot, AutoFollowResult result) { autoFollowResults.set(slot, result); if (autoFollowPatternsCountDown.countDown()) { statsUpdater.accept(autoFollowResults.asList()); - // TODO: Remove scheduling here with using cluster state API's waitForMetadataVersion: - threadPool.schedule(TimeValue.timeValueMillis(2500), ThreadPool.Names.GENERIC, this::autoFollowIndices); + autoFollowIndices(); } } @@ -461,13 +461,15 @@ static Function recordLeaderIndexAsFollowFunction(St } /** - * Fetch the cluster state from the leader with the specified cluster alias + * Fetch a remote cluster state from with the specified cluster alias * @param remoteCluster the name of the leader cluster + * @param metadataVersion the last seen metadata version * @param handler the callback to invoke */ - abstract void getLeaderClusterState( + abstract void getRemoteClusterState( String remoteCluster, - BiConsumer handler + long metadataVersion, + BiConsumer handler ); abstract void createAndFollow( diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportPutAutoFollowPatternAction.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportPutAutoFollowPatternAction.java index a8194fc1f0fef..8c722942d19b0 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportPutAutoFollowPatternAction.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportPutAutoFollowPatternAction.java @@ -7,6 +7,7 @@ import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.admin.cluster.state.ClusterStateRequest; +import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse; import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.master.AcknowledgedResponse; import org.elasticsearch.action.support.master.TransportMasterNodeAction; @@ -80,7 +81,7 @@ protected void masterOperation(PutAutoFollowPatternAction.Request request, .filter(e -> ShardFollowTask.HEADER_FILTERS.contains(e.getKey())) .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); - Consumer consumer = remoteClusterState -> { + Consumer consumer = remoteClusterState -> { String[] indices = request.getLeaderIndexPatterns().toArray(new String[0]); ccrLicenseChecker.hasPrivilegesToFollowIndices(remoteClient, indices, e -> { if (e == null) { @@ -94,7 +95,7 @@ protected AcknowledgedResponse newResponse(boolean acknowledged) { @Override public ClusterState execute(ClusterState currentState) throws Exception { - return innerPut(request, filteredHeaders, currentState, remoteClusterState); + return innerPut(request, filteredHeaders, currentState, remoteClusterState.getState()); } }); } else { diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/AutoFollowCoordinatorTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/AutoFollowCoordinatorTests.java index 7b2965246760a..640b317ec450f 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/AutoFollowCoordinatorTests.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/AutoFollowCoordinatorTests.java @@ -6,6 +6,7 @@ package org.elasticsearch.xpack.ccr.action; import org.elasticsearch.Version; +import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse; import org.elasticsearch.client.Client; import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.ClusterState; @@ -22,7 +23,6 @@ import org.elasticsearch.index.Index; import org.elasticsearch.index.IndexSettings; import org.elasticsearch.test.ESTestCase; -import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.xpack.ccr.CcrLicenseChecker; import org.elasticsearch.xpack.ccr.action.AutoFollowCoordinator.AutoFollower; import org.elasticsearch.xpack.core.ccr.AutoFollowMetadata; @@ -38,6 +38,7 @@ import java.util.LinkedList; import java.util.List; import java.util.Map; +import java.util.concurrent.atomic.AtomicInteger; import java.util.function.BiConsumer; import java.util.function.Consumer; import java.util.function.Function; @@ -45,13 +46,12 @@ import static org.elasticsearch.xpack.ccr.action.AutoFollowCoordinator.AutoFollower.recordLeaderIndexAsFollowFunction; import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.greaterThan; import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.notNullValue; import static org.hamcrest.Matchers.nullValue; import static org.hamcrest.Matchers.sameInstance; -import static org.mockito.Matchers.any; import static org.mockito.Matchers.anyString; -import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -59,7 +59,6 @@ public class AutoFollowCoordinatorTests extends ESTestCase { public void testAutoFollower() { Client client = mock(Client.class); - ThreadPool threadPool = mockThreadPool(); when(client.getRemoteClusterClient(anyString())).thenReturn(client); ClusterState leaderState = createRemoteClusterState("logs-20190101"); @@ -89,12 +88,13 @@ public void testAutoFollower() { assertThat(entries.get(0).getKey().getName(), equalTo("logs-20190101")); assertThat(entries.get(0).getValue(), nullValue()); }; - AutoFollower autoFollower = new AutoFollower("remote", threadPool, handler, followerClusterStateSupplier(currentState)) { + AutoFollower autoFollower = new AutoFollower("remote", handler, followerClusterStateSupplier(currentState)) { @Override - void getLeaderClusterState(String remoteCluster, - BiConsumer handler) { + void getRemoteClusterState(String remoteCluster, + long metadataVersion, + BiConsumer handler) { assertThat(remoteCluster, equalTo("remote")); - handler.accept(leaderState, null); + handler.accept(new ClusterStateResponse(new ClusterName("name"), leaderState, 1L, false), null); } @Override @@ -125,7 +125,6 @@ void updateAutoFollowMetadata(Function updateFunctio public void testAutoFollowerClusterStateApiFailure() { Client client = mock(Client.class); - ThreadPool threadPool = mockThreadPool(); when(client.getRemoteClusterClient(anyString())).thenReturn(client); AutoFollowPattern autoFollowPattern = new AutoFollowPattern("remote", Collections.singletonList("logs-*"), @@ -149,10 +148,11 @@ public void testAutoFollowerClusterStateApiFailure() { assertThat(results.get(0).clusterStateFetchException, sameInstance(failure)); assertThat(results.get(0).autoFollowExecutionResults.entrySet().size(), equalTo(0)); }; - AutoFollower autoFollower = new AutoFollower("remote", threadPool, handler, followerClusterStateSupplier(followerState)) { + AutoFollower autoFollower = new AutoFollower("remote", handler, followerClusterStateSupplier(followerState)) { @Override - void getLeaderClusterState(String remoteCluster, - BiConsumer handler) { + void getRemoteClusterState(String remoteCluster, + long metadataVersion, + BiConsumer handler) { handler.accept(null, failure); } @@ -176,7 +176,6 @@ void updateAutoFollowMetadata(Function updateFunctio public void testAutoFollowerUpdateClusterStateFailure() { Client client = mock(Client.class); - ThreadPool threadPool = mockThreadPool(); when(client.getRemoteClusterClient(anyString())).thenReturn(client); ClusterState leaderState = createRemoteClusterState("logs-20190101"); @@ -204,11 +203,12 @@ public void testAutoFollowerUpdateClusterStateFailure() { assertThat(entries.get(0).getKey().getName(), equalTo("logs-20190101")); assertThat(entries.get(0).getValue(), sameInstance(failure)); }; - AutoFollower autoFollower = new AutoFollower("remote", threadPool, handler, followerClusterStateSupplier(followerState)) { + AutoFollower autoFollower = new AutoFollower("remote", handler, followerClusterStateSupplier(followerState)) { @Override - void getLeaderClusterState(String remoteCluster, - BiConsumer handler) { - handler.accept(leaderState, null); + void getRemoteClusterState(String remoteCluster, + long metadataVersion, + BiConsumer handler) { + handler.accept(new ClusterStateResponse(new ClusterName("name"), leaderState, 1L, false), null); } @Override @@ -233,7 +233,6 @@ void updateAutoFollowMetadata(Function updateFunctio public void testAutoFollowerCreateAndFollowApiCallFailure() { Client client = mock(Client.class); - ThreadPool threadPool = mockThreadPool(); when(client.getRemoteClusterClient(anyString())).thenReturn(client); ClusterState leaderState = createRemoteClusterState("logs-20190101"); @@ -261,11 +260,12 @@ public void testAutoFollowerCreateAndFollowApiCallFailure() { assertThat(entries.get(0).getKey().getName(), equalTo("logs-20190101")); assertThat(entries.get(0).getValue(), sameInstance(failure)); }; - AutoFollower autoFollower = new AutoFollower("remote", threadPool, handler, followerClusterStateSupplier(followerState)) { + AutoFollower autoFollower = new AutoFollower("remote", handler, followerClusterStateSupplier(followerState)) { @Override - void getLeaderClusterState(String remoteCluster, - BiConsumer handler) { - handler.accept(leaderState, null); + void getRemoteClusterState(String remoteCluster, + long metadataVersion, + BiConsumer handler) { + handler.accept(new ClusterStateResponse(new ClusterName("name"), leaderState, 1L, false), null); } @Override @@ -438,7 +438,6 @@ public void testGetFollowerIndexName() { public void testStats() { AutoFollowCoordinator autoFollowCoordinator = new AutoFollowCoordinator( - null, null, mock(ClusterService.class), new CcrLicenseChecker(() -> true, () -> false) @@ -494,6 +493,122 @@ public void testStats() { assertThat(autoFollowStats.getRecentAutoFollowErrors().get("_alias2:index2").getCause().getMessage(), equalTo("error")); } + public void testWaitForMetadataVersion() { + Client client = mock(Client.class); + when(client.getRemoteClusterClient(anyString())).thenReturn(client); + + AutoFollowPattern autoFollowPattern = new AutoFollowPattern("remote", Collections.singletonList("logs-*"), + null, null, null, null, null, null, null, null, null, null, null); + Map patterns = new HashMap<>(); + patterns.put("remote", autoFollowPattern); + Map> followedLeaderIndexUUIDS = new HashMap<>(); + followedLeaderIndexUUIDS.put("remote", new ArrayList<>()); + Map> autoFollowHeaders = new HashMap<>(); + autoFollowHeaders.put("remote", Collections.singletonMap("key", "val")); + AutoFollowMetadata autoFollowMetadata = new AutoFollowMetadata(patterns, followedLeaderIndexUUIDS, autoFollowHeaders); + + final LinkedList leaderStates = new LinkedList<>(); + ClusterState[] states = new ClusterState[16]; + for (int i = 0; i < states.length; i++) { + states[i] = ClusterState.builder(new ClusterName("name")) + .metaData(MetaData.builder().putCustom(AutoFollowMetadata.TYPE, autoFollowMetadata)) + .build(); + String indexName = "logs-" + i; + leaderStates.add(i == 0 ? createRemoteClusterState(indexName) : createRemoteClusterState(leaderStates.get(i - 1), indexName)); + } + + List allResults = new ArrayList<>(); + Consumer> handler = allResults::addAll; + AutoFollower autoFollower = new AutoFollower("remote", handler, followerClusterStateSupplier(states)) { + + long previousRequestedMetadataVersion = 0; + + @Override + void getRemoteClusterState(String remoteCluster, + long metadataVersion, + BiConsumer handler) { + assertThat(remoteCluster, equalTo("remote")); + assertThat(metadataVersion, greaterThan(previousRequestedMetadataVersion)); + handler.accept(new ClusterStateResponse(new ClusterName("name"), leaderStates.poll(), 1L, false), null); + } + + @Override + void createAndFollow(Map headers, + PutFollowAction.Request followRequest, + Runnable successHandler, + Consumer failureHandler) { + successHandler.run(); + } + + @Override + void updateAutoFollowMetadata(Function updateFunction, + Consumer handler) { + handler.accept(null); + } + }; + autoFollower.autoFollowIndices(); + assertThat(allResults.size(), equalTo(states.length)); + for (int i = 0; i < states.length; i++) { + assertThat(allResults.get(i).autoFollowExecutionResults.containsKey(new Index("logs-" + i, "_na_")), is(true)); + } + } + + public void testWaitForTimeOut() { + Client client = mock(Client.class); + when(client.getRemoteClusterClient(anyString())).thenReturn(client); + + AutoFollowPattern autoFollowPattern = new AutoFollowPattern("remote", Collections.singletonList("logs-*"), + null, null, null, null, null, null, null, null, null, null, null); + Map patterns = new HashMap<>(); + patterns.put("remote", autoFollowPattern); + Map> followedLeaderIndexUUIDS = new HashMap<>(); + followedLeaderIndexUUIDS.put("remote", new ArrayList<>()); + Map> autoFollowHeaders = new HashMap<>(); + autoFollowHeaders.put("remote", Collections.singletonMap("key", "val")); + AutoFollowMetadata autoFollowMetadata = new AutoFollowMetadata(patterns, followedLeaderIndexUUIDS, autoFollowHeaders); + + ClusterState[] states = new ClusterState[16]; + for (int i = 0; i < states.length; i++) { + states[i] = ClusterState.builder(new ClusterName("name")) + .metaData(MetaData.builder().putCustom(AutoFollowMetadata.TYPE, autoFollowMetadata)) + .build(); + } + Consumer> handler = results -> { + fail("should not be invoked"); + }; + AtomicInteger counter = new AtomicInteger(); + AutoFollower autoFollower = new AutoFollower("remote", handler, followerClusterStateSupplier(states)) { + + long previousRequestedMetadataVersion = 0; + + @Override + void getRemoteClusterState(String remoteCluster, + long metadataVersion, + BiConsumer handler) { + counter.incrementAndGet(); + assertThat(remoteCluster, equalTo("remote")); + assertThat(metadataVersion, greaterThan(previousRequestedMetadataVersion)); + handler.accept(new ClusterStateResponse(new ClusterName("name"), null, 1L, true), null); + } + + @Override + void createAndFollow(Map headers, + PutFollowAction.Request followRequest, + Runnable successHandler, + Consumer failureHandler) { + fail("should not be invoked"); + } + + @Override + void updateAutoFollowMetadata(Function updateFunction, + Consumer handler) { + fail("should not be invoked"); + } + }; + autoFollower.autoFollowIndices(); + assertThat(counter.get(), equalTo(states.length)); + } + private static ClusterState createRemoteClusterState(String indexName) { IndexMetaData indexMetaData = IndexMetaData.builder(indexName) .settings(settings(Version.CURRENT).put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), true)) @@ -511,6 +626,25 @@ private static ClusterState createRemoteClusterState(String indexName) { return csBuilder.build(); } + private static ClusterState createRemoteClusterState(ClusterState previous, String indexName) { + IndexMetaData indexMetaData = IndexMetaData.builder(indexName) + .settings(settings(Version.CURRENT).put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), true)) + .numberOfShards(1) + .numberOfReplicas(0) + .build(); + ClusterState.Builder csBuilder = ClusterState.builder(new ClusterName("remote")) + .metaData(MetaData.builder(previous.metaData()) + .version(previous.metaData().version() + 1) + .put(indexMetaData, true)); + + ShardRouting shardRouting = + TestShardRouting.newShardRouting(indexName, 0, "1", true, ShardRoutingState.INITIALIZING).moveToStarted(); + IndexRoutingTable indexRoutingTable = IndexRoutingTable.builder(indexMetaData.getIndex()).addShard(shardRouting).build(); + csBuilder.routingTable(RoutingTable.builder().add(indexRoutingTable).build()).build(); + + return csBuilder.build(); + } + private static Supplier followerClusterStateSupplier(ClusterState... states) { final AutoFollowMetadata emptyAutoFollowMetadata = new AutoFollowMetadata(Collections.emptyMap(), Collections.emptyMap(), Collections.emptyMap()); @@ -528,15 +662,4 @@ private static Supplier followerClusterStateSupplier(ClusterState. }; } - private static ThreadPool mockThreadPool() { - ThreadPool threadPool = mock(ThreadPool.class); - doAnswer(invocation -> { - Object[] args = invocation.getArguments(); - Runnable task = (Runnable) args[2]; - task.run(); - return null; - }).when(threadPool).schedule(any(), anyString(), any()); - return threadPool; - } - }