From 6742d9c9d9049eacd9158815cab57b82046e8743 Mon Sep 17 00:00:00 2001 From: Armin Braun Date: Sat, 2 Nov 2019 15:41:04 +0100 Subject: [PATCH] Cleanup Redundant Futures in Recovery Code (#48805) Follow up to #48110 cleaning up the redundant future uses that were left over from that change. --- .../elasticsearch/index/shard/IndexShard.java | 71 ++++++++----------- .../index/shard/StoreRecovery.java | 33 ++++----- .../index/shard/IndexShardIT.java | 3 +- .../index/shard/IndexShardTests.java | 32 +++++---- .../IndexingMemoryControllerTests.java | 2 +- ...dicesLifecycleListenerSingleNodeTests.java | 2 +- .../ESIndexLevelReplicationTestCase.java | 2 +- .../index/shard/IndexShardTestCase.java | 8 ++- .../action/bulk/BulkShardOperationsTests.java | 2 +- 9 files changed, 74 insertions(+), 81 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java index 99083a586d18f..33829e88b4d15 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -54,6 +54,7 @@ import org.elasticsearch.cluster.routing.RecoverySource.SnapshotRecoverySource; import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.common.Booleans; +import org.elasticsearch.common.CheckedConsumer; import org.elasticsearch.common.CheckedFunction; import org.elasticsearch.common.CheckedRunnable; import org.elasticsearch.common.Nullable; @@ -1787,34 +1788,38 @@ public ShardPath shardPath() { return path; } - public boolean recoverFromLocalShards(BiConsumer mappingUpdateConsumer, - List localShards) throws IOException { + public void recoverFromLocalShards(BiConsumer mappingUpdateConsumer, List localShards, + ActionListener listener) throws IOException { assert shardRouting.primary() : "recover from local shards only makes sense if the shard is a primary shard"; assert recoveryState.getRecoverySource().getType() == RecoverySource.Type.LOCAL_SHARDS : "invalid recovery type: " + recoveryState.getRecoverySource(); final List snapshots = new ArrayList<>(); + final ActionListener recoveryListener = ActionListener.runBefore(listener, () -> IOUtils.close(snapshots)); + boolean success = false; try { for (IndexShard shard : localShards) { snapshots.add(new LocalShardSnapshot(shard)); } - // we are the first primary, recover from the gateway // if its post api allocation, the index should exists assert shardRouting.primary() : "recover from local shards only makes sense if the shard is a primary shard"; StoreRecovery storeRecovery = new StoreRecovery(shardId, logger); - return storeRecovery.recoverFromLocalShards(mappingUpdateConsumer, this, snapshots); + storeRecovery.recoverFromLocalShards(mappingUpdateConsumer, this, snapshots, recoveryListener); + success = true; } finally { - IOUtils.close(snapshots); + if (success == false) { + IOUtils.close(snapshots); + } } } - public boolean recoverFromStore() { + public void recoverFromStore(ActionListener listener) { // we are the first primary, recover from the gateway // if its post api allocation, the index should exists assert shardRouting.primary() : "recover from store only makes sense if the shard is a primary shard"; assert shardRouting.initializing() : "can only start recovery on initializing shard"; StoreRecovery storeRecovery = new StoreRecovery(shardId, logger); - return storeRecovery.recoverFromStore(this); + storeRecovery.recoverFromStore(this, listener); } public void restoreFromRepository(Repository repository, ActionListener listener) { @@ -2484,17 +2489,7 @@ public void startRecovery(RecoveryState recoveryState, PeerRecoveryTargetService switch (recoveryState.getRecoverySource().getType()) { case EMPTY_STORE: case EXISTING_STORE: - markAsRecovering("from store", recoveryState); // mark the shard as recovering on the cluster state thread - threadPool.generic().execute(() -> { - try { - if (recoverFromStore()) { - recoveryListener.onRecoveryDone(recoveryState); - } - } catch (Exception e) { - recoveryListener.onRecoveryFailure(recoveryState, - new RecoveryFailedException(recoveryState, null, e), true); - } - }); + executeRecovery("from store", recoveryState, recoveryListener, this::recoverFromStore); break; case PEER: try { @@ -2507,17 +2502,9 @@ public void startRecovery(RecoveryState recoveryState, PeerRecoveryTargetService } break; case SNAPSHOT: - markAsRecovering("from snapshot", recoveryState); // mark the shard as recovering on the cluster state thread - SnapshotRecoverySource recoverySource = (SnapshotRecoverySource) recoveryState.getRecoverySource(); - threadPool.generic().execute( - ActionRunnable.wrap(ActionListener.wrap(r -> { - if (r) { - recoveryListener.onRecoveryDone(recoveryState); - } - }, - e -> recoveryListener.onRecoveryFailure(recoveryState, new RecoveryFailedException(recoveryState, null, e), true)), - restoreListener -> restoreFromRepository( - repositoriesService.repository(recoverySource.snapshot().getRepository()), restoreListener))); + final String repo = ((SnapshotRecoverySource) recoveryState.getRecoverySource()).snapshot().getRepository(); + executeRecovery("from snapshot", + recoveryState, recoveryListener, l -> restoreFromRepository(repositoriesService.repository(repo), l)); break; case LOCAL_SHARDS: final IndexMetaData indexMetaData = indexSettings().getIndexMetaData(); @@ -2542,18 +2529,9 @@ public void startRecovery(RecoveryState recoveryState, PeerRecoveryTargetService if (numShards == startedShards.size()) { assert requiredShards.isEmpty() == false; - markAsRecovering("from local shards", recoveryState); // mark the shard as recovering on the cluster state thread - threadPool.generic().execute(() -> { - try { - if (recoverFromLocalShards(mappingUpdateConsumer, startedShards.stream() - .filter((s) -> requiredShards.contains(s.shardId())).collect(Collectors.toList()))) { - recoveryListener.onRecoveryDone(recoveryState); - } - } catch (Exception e) { - recoveryListener.onRecoveryFailure(recoveryState, - new RecoveryFailedException(recoveryState, null, e), true); - } - }); + executeRecovery("from local shards", recoveryState, recoveryListener, + l -> recoverFromLocalShards(mappingUpdateConsumer, + startedShards.stream().filter((s) -> requiredShards.contains(s.shardId())).collect(Collectors.toList()), l)); } else { final RuntimeException e; if (numShards == -1) { @@ -2571,6 +2549,17 @@ public void startRecovery(RecoveryState recoveryState, PeerRecoveryTargetService } } + private void executeRecovery(String reason, RecoveryState recoveryState, PeerRecoveryTargetService.RecoveryListener recoveryListener, + CheckedConsumer, Exception> action) { + markAsRecovering(reason, recoveryState); // mark the shard as recovering on the cluster state thread + threadPool.generic().execute(ActionRunnable.wrap(ActionListener.wrap(r -> { + if (r) { + recoveryListener.onRecoveryDone(recoveryState); + } + }, + e -> recoveryListener.onRecoveryFailure(recoveryState, new RecoveryFailedException(recoveryState, null, e), true)), action)); + } + /** * Returns whether the shard is a relocated primary, i.e. not in charge anymore of replicating changes (see {@link ReplicationTracker}). */ diff --git a/server/src/main/java/org/elasticsearch/index/shard/StoreRecovery.java b/server/src/main/java/org/elasticsearch/index/shard/StoreRecovery.java index 4d4d3260046ae..e80128af1dc9a 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/StoreRecovery.java +++ b/server/src/main/java/org/elasticsearch/index/shard/StoreRecovery.java @@ -31,7 +31,6 @@ import org.apache.lucene.store.IndexInput; import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.action.ActionListener; -import org.elasticsearch.action.support.PlainActionFuture; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.MappingMetaData; import org.elasticsearch.cluster.routing.RecoverySource; @@ -81,31 +80,27 @@ final class StoreRecovery { * exist on disk ie. has been previously allocated or if the shard is a brand new allocation without pre-existing index * files / transaction logs. This * @param indexShard the index shard instance to recovery the shard into - * @return true if the shard has been recovered successfully, false if the recovery - * has been ignored due to a concurrent modification of if the clusters state has changed due to async updates. + * @param listener resolves to true if the shard has been recovered successfully, false if the recovery + * has been ignored due to a concurrent modification of if the clusters state has changed due to async updates. * @see Store */ - boolean recoverFromStore(final IndexShard indexShard) { + void recoverFromStore(final IndexShard indexShard, ActionListener listener) { if (canRecover(indexShard)) { RecoverySource.Type recoveryType = indexShard.recoveryState().getRecoverySource().getType(); assert recoveryType == RecoverySource.Type.EMPTY_STORE || recoveryType == RecoverySource.Type.EXISTING_STORE : "expected store recovery type but was: " + recoveryType; - final PlainActionFuture future = PlainActionFuture.newFuture(); - final ActionListener recoveryListener = recoveryListener(indexShard, future); - try { + ActionListener.completeWith(recoveryListener(indexShard, listener), () -> { logger.debug("starting recovery from store ..."); internalRecoverFromStore(indexShard); - recoveryListener.onResponse(true); - } catch (Exception e) { - recoveryListener.onFailure(e); - } - return future.actionGet(); + return true; + }); + } else { + listener.onResponse(false); } - return false; } - boolean recoverFromLocalShards(BiConsumer mappingUpdateConsumer, - final IndexShard indexShard, final List shards) { + void recoverFromLocalShards(BiConsumer mappingUpdateConsumer, IndexShard indexShard, + List shards, ActionListener listener) { if (canRecover(indexShard)) { RecoverySource.Type recoveryType = indexShard.recoveryState().getRecoverySource().getType(); assert recoveryType == RecoverySource.Type.LOCAL_SHARDS: "expected local shards recovery type: " + recoveryType; @@ -125,8 +120,7 @@ boolean recoverFromLocalShards(BiConsumer mappingUpdate Sort indexSort = indexShard.getIndexSort(); final boolean hasNested = indexShard.mapperService().hasNested(); final boolean isSplit = sourceMetaData.getNumberOfShards() < indexShard.indexSettings().getNumberOfShards(); - final PlainActionFuture future = PlainActionFuture.newFuture(); - ActionListener.completeWith(recoveryListener(indexShard, future), () -> { + ActionListener.completeWith(recoveryListener(indexShard, listener), () -> { logger.debug("starting recovery from local shards {}", shards); try { final Directory directory = indexShard.store().directory(); // don't close this directory!! @@ -146,10 +140,9 @@ boolean recoverFromLocalShards(BiConsumer mappingUpdate throw new IndexShardRecoveryException(indexShard.shardId(), "failed to recover from local shards", ex); } }); - assert future.isDone(); - return future.actionGet(); + } else { + listener.onResponse(false); } - return false; } void addIndices(final RecoveryState.Index indexRecoveryStats, final Directory target, final Sort indexSort, final Directory[] sources, diff --git a/server/src/test/java/org/elasticsearch/index/shard/IndexShardIT.java b/server/src/test/java/org/elasticsearch/index/shard/IndexShardIT.java index 91ef318c54b70..e524e7bad5d49 100644 --- a/server/src/test/java/org/elasticsearch/index/shard/IndexShardIT.java +++ b/server/src/test/java/org/elasticsearch/index/shard/IndexShardIT.java @@ -118,6 +118,7 @@ import static org.elasticsearch.index.seqno.SequenceNumbers.NO_OPS_PERFORMED; import static org.elasticsearch.index.seqno.SequenceNumbers.UNASSIGNED_SEQ_NO; import static org.elasticsearch.index.shard.IndexShardTestCase.getTranslog; +import static org.elasticsearch.index.shard.IndexShardTestCase.recoverFromStore; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertNoFailures; @@ -641,7 +642,7 @@ public void testCircuitBreakerIncrementedByIndexShard() throws Exception { public static final IndexShard recoverShard(IndexShard newShard) throws IOException { DiscoveryNode localNode = new DiscoveryNode("foo", buildNewFakeTransportAddress(), emptyMap(), emptySet(), Version.CURRENT); newShard.markAsRecovering("store", new RecoveryState(newShard.routingEntry(), localNode, null)); - assertTrue(newShard.recoverFromStore()); + recoverFromStore(newShard); IndexShardTestCase.updateRoutingEntry(newShard, newShard.routingEntry().moveToStarted()); return newShard; } diff --git a/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java b/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java index d818bd49aa083..ff13e24da3840 100644 --- a/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java +++ b/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java @@ -1369,7 +1369,7 @@ public void testSnapshotStore() throws IOException { snapshot = newShard.snapshotStoreMetadata(); assertThat(snapshot.getSegmentsFile().name(), equalTo("segments_3")); - assertTrue(newShard.recoverFromStore()); + assertTrue(recoverFromStore(newShard)); snapshot = newShard.snapshotStoreMetadata(); assertThat(snapshot.getSegmentsFile().name(), equalTo("segments_3")); @@ -2041,7 +2041,7 @@ public void testRecoverFromStoreWithOutOfOrderDelete() throws IOException { RecoverySource.ExistingStoreRecoverySource.INSTANCE)); DiscoveryNode localNode = new DiscoveryNode("foo", buildNewFakeTransportAddress(), emptyMap(), emptySet(), Version.CURRENT); newShard.markAsRecovering("store", new RecoveryState(newShard.routingEntry(), localNode, null)); - assertTrue(newShard.recoverFromStore()); + assertTrue(recoverFromStore(newShard)); assertEquals(translogOps, newShard.recoveryState().getTranslog().recoveredOperations()); assertEquals(translogOps, newShard.recoveryState().getTranslog().totalOperations()); assertEquals(translogOps, newShard.recoveryState().getTranslog().totalOperationsOnStart()); @@ -2067,7 +2067,7 @@ public void testRecoverFromStore() throws IOException { IndexShard newShard = reinitShard(shard); DiscoveryNode localNode = new DiscoveryNode("foo", buildNewFakeTransportAddress(), emptyMap(), emptySet(), Version.CURRENT); newShard.markAsRecovering("store", new RecoveryState(newShard.routingEntry(), localNode, null)); - assertTrue(newShard.recoverFromStore()); + assertTrue(recoverFromStore(newShard)); assertEquals(translogOps, newShard.recoveryState().getTranslog().recoveredOperations()); assertEquals(translogOps, newShard.recoveryState().getTranslog().totalOperations()); assertEquals(translogOps, newShard.recoveryState().getTranslog().totalOperationsOnStart()); @@ -2098,7 +2098,7 @@ public void testRecoverFromStalePrimaryForceNewHistoryUUID() throws IOException ShardRoutingState.INITIALIZING, RecoverySource.ExistingStoreRecoverySource.FORCE_STALE_PRIMARY_INSTANCE)); DiscoveryNode localNode = new DiscoveryNode("foo", buildNewFakeTransportAddress(), emptyMap(), emptySet(), Version.CURRENT); newShard.markAsRecovering("store", new RecoveryState(newShard.routingEntry(), localNode, null)); - assertTrue(newShard.recoverFromStore()); + assertTrue(recoverFromStore(newShard)); IndexShardTestCase.updateRoutingEntry(newShard, newShard.routingEntry().moveToStarted()); assertDocCount(newShard, totalOps); assertThat(newShard.getHistoryUUID(), not(equalTo(historyUUID))); @@ -2146,7 +2146,7 @@ public void testRecoverFromStoreWithNoOps() throws IOException { RecoverySource.ExistingStoreRecoverySource.INSTANCE)); DiscoveryNode localNode = new DiscoveryNode("foo", buildNewFakeTransportAddress(), emptyMap(), emptySet(), Version.CURRENT); newShard.markAsRecovering("store", new RecoveryState(newShard.routingEntry(), localNode, null)); - assertTrue(newShard.recoverFromStore()); + assertTrue(recoverFromStore(newShard)); assertEquals(1, newShard.recoveryState().getTranslog().recoveredOperations()); assertEquals(1, newShard.recoveryState().getTranslog().totalOperations()); assertEquals(1, newShard.recoveryState().getTranslog().totalOperationsOnStart()); @@ -2171,7 +2171,7 @@ public void testRecoverFromStoreWithNoOps() throws IOException { newShard = reinitShard(newShard, ShardRoutingHelper.initWithSameId(primaryShardRouting, RecoverySource.ExistingStoreRecoverySource.INSTANCE)); newShard.markAsRecovering("store", new RecoveryState(newShard.routingEntry(), localNode, null)); - assertTrue(newShard.recoverFromStore()); + assertTrue(recoverFromStore(newShard)); try (Translog.Snapshot snapshot = getTranslog(newShard).newSnapshot()) { assertThat(snapshot.totalOperations(), equalTo(newShard.indexSettings.isSoftDeleteEnabled() ? 0 : 2)); } @@ -2192,7 +2192,7 @@ public void testRecoverFromCleanStore() throws IOException { DiscoveryNode localNode = new DiscoveryNode("foo", buildNewFakeTransportAddress(), emptyMap(), emptySet(), Version.CURRENT); newShard.markAsRecovering("store", new RecoveryState(newShard.routingEntry(), localNode, null)); - assertTrue(newShard.recoverFromStore()); + assertTrue(recoverFromStore(newShard)); assertEquals(0, newShard.recoveryState().getTranslog().recoveredOperations()); assertEquals(0, newShard.recoveryState().getTranslog().totalOperations()); assertEquals(0, newShard.recoveryState().getTranslog().totalOperationsOnStart()); @@ -2219,7 +2219,7 @@ public void testFailIfIndexNotPresentInRecoverFromStore() throws Exception { ShardRouting routing = newShard.routingEntry(); newShard.markAsRecovering("store", new RecoveryState(routing, localNode, null)); try { - newShard.recoverFromStore(); + recoverFromStore(newShard); fail("index not there!"); } catch (IndexShardRecoveryException ex) { assertTrue(ex.getMessage().contains("failed to fetch index version after copying it over")); @@ -2239,7 +2239,7 @@ public void testFailIfIndexNotPresentInRecoverFromStore() throws Exception { newShard = reinitShard(newShard, ShardRoutingHelper.initWithSameId(routing, RecoverySource.EmptyStoreRecoverySource.INSTANCE)); newShard.markAsRecovering("store", new RecoveryState(newShard.routingEntry(), localNode, null)); - assertTrue("recover even if there is nothing to recover", newShard.recoverFromStore()); + assertTrue("recover even if there is nothing to recover", recoverFromStore(newShard)); IndexShardTestCase.updateRoutingEntry(newShard, newShard.routingEntry().moveToStarted()); assertDocCount(newShard, 0); @@ -2290,7 +2290,7 @@ public void testRecoverFromStoreRemoveStaleOperations() throws Exception { EMPTY_EVENT_LISTENER); DiscoveryNode localNode = new DiscoveryNode("foo", buildNewFakeTransportAddress(), emptyMap(), emptySet(), Version.CURRENT); newShard.markAsRecovering("store", new RecoveryState(newShard.routingEntry(), localNode, null)); - assertTrue(newShard.recoverFromStore()); + assertTrue(recoverFromStore(newShard)); assertThat(getShardDocUIDs(newShard), containsInAnyOrder("doc-0", "doc-2")); closeShards(newShard); } @@ -2622,7 +2622,7 @@ public void testRecoverFromTranslog() throws IOException { primary.markAsRecovering("store", new RecoveryState(primary.routingEntry(), getFakeDiscoNode(primary.routingEntry().currentNodeId()), null)); - primary.recoverFromStore(); + recoverFromStore(primary); primary.recoveryState().getTranslog().totalOperations(snapshot.totalOperations()); primary.recoveryState().getTranslog().totalOperationsOnStart(snapshot.totalOperations()); @@ -2810,11 +2810,15 @@ public void testRecoverFromLocalShard() throws IOException { final IndexShard differentIndex = newShard(new ShardId("index_2", "index_2", 0), true); recoverShardFromStore(differentIndex); expectThrows(IllegalArgumentException.class, () -> { - targetShard.recoverFromLocalShards(mappingConsumer, Arrays.asList(sourceShard, differentIndex)); + final PlainActionFuture future = PlainActionFuture.newFuture(); + targetShard.recoverFromLocalShards(mappingConsumer, Arrays.asList(sourceShard, differentIndex), future); + future.actionGet(); }); closeShards(differentIndex); - assertTrue(targetShard.recoverFromLocalShards(mappingConsumer, Arrays.asList(sourceShard))); + final PlainActionFuture future = PlainActionFuture.newFuture(); + targetShard.recoverFromLocalShards(mappingConsumer, Arrays.asList(sourceShard), future); + assertTrue(future.actionGet()); RecoveryState recoveryState = targetShard.recoveryState(); assertEquals(RecoveryState.Stage.DONE, recoveryState.getStage()); assertTrue(recoveryState.getIndex().fileDetails().size() > 0); @@ -4094,7 +4098,7 @@ protected void ensureMaxSeqNoEqualsToGlobalCheckpoint(SeqNoStats seqNoStats) { ); DiscoveryNode localNode = new DiscoveryNode("foo", buildNewFakeTransportAddress(), emptyMap(), emptySet(), Version.CURRENT); readonlyShard.markAsRecovering("store", new RecoveryState(readonlyShard.routingEntry(), localNode, null)); - assertTrue(readonlyShard.recoverFromStore()); + recoverFromStore(readonlyShard); assertThat(readonlyShard.docStats().getCount(), equalTo(numDocs)); closeShards(readonlyShard); } diff --git a/server/src/test/java/org/elasticsearch/indices/IndexingMemoryControllerTests.java b/server/src/test/java/org/elasticsearch/indices/IndexingMemoryControllerTests.java index eecf298bf3842..3609ef0aad0f7 100644 --- a/server/src/test/java/org/elasticsearch/indices/IndexingMemoryControllerTests.java +++ b/server/src/test/java/org/elasticsearch/indices/IndexingMemoryControllerTests.java @@ -451,7 +451,7 @@ protected void writeIndexingBufferAsync(IndexShard shard) { newShard.markAsRecovering("store", new RecoveryState(routing, localNode, null)); assertEquals(1, imc.availableShards().size()); - assertTrue(newShard.recoverFromStore()); + assertTrue(IndexShardTestCase.recoverFromStore(newShard)); assertTrue("we should have flushed in IMC at least once but did: " + flushes.get(), flushes.get() >= 1); IndexShardTestCase.updateRoutingEntry(newShard, routing.moveToStarted()); } finally { diff --git a/server/src/test/java/org/elasticsearch/indices/IndicesLifecycleListenerSingleNodeTests.java b/server/src/test/java/org/elasticsearch/indices/IndicesLifecycleListenerSingleNodeTests.java index 122d74121a718..436126930e3d1 100644 --- a/server/src/test/java/org/elasticsearch/indices/IndicesLifecycleListenerSingleNodeTests.java +++ b/server/src/test/java/org/elasticsearch/indices/IndicesLifecycleListenerSingleNodeTests.java @@ -137,7 +137,7 @@ public void afterIndexRemoved(Index index, IndexSettings indexSettings, IndexRem final DiscoveryNode localNode = new DiscoveryNode("foo", buildNewFakeTransportAddress(), emptyMap(), emptySet(), Version.CURRENT); shard.markAsRecovering("store", new RecoveryState(newRouting, localNode, null)); - shard.recoverFromStore(); + IndexShardTestCase.recoverFromStore(shard); newRouting = ShardRoutingHelper.moveToStarted(newRouting); IndexShardTestCase.updateRoutingEntry(shard, newRouting); assertEquals(6, counter.get()); diff --git a/test/framework/src/main/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java b/test/framework/src/main/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java index 7a757f5d2ab66..208fba5c83bf2 100644 --- a/test/framework/src/main/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java @@ -323,7 +323,7 @@ assert shardRoutings().stream().anyMatch(shardRouting -> shardRouting.isSameAllo protected synchronized void recoverPrimary(IndexShard primary) { final DiscoveryNode pNode = getDiscoveryNode(primary.routingEntry().currentNodeId()); primary.markAsRecovering("store", new RecoveryState(primary.routingEntry(), pNode, null)); - primary.recoverFromStore(); + recoverFromStore(primary); } public synchronized IndexShard addReplicaWithExistingPath(final ShardPath shardPath, final String nodeId) throws IOException { diff --git a/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java b/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java index 8d5cfc7471b59..e2cebf5360e64 100644 --- a/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java @@ -546,7 +546,7 @@ protected void recoverShardFromStore(IndexShard primary) throws IOException { primary.markAsRecovering("store", new RecoveryState(primary.routingEntry(), getFakeDiscoNode(primary.routingEntry().currentNodeId()), null)); - primary.recoverFromStore(); + recoverFromStore(primary); updateRoutingEntry(primary, ShardRoutingHelper.moveToStarted(primary.routingEntry())); } @@ -792,6 +792,12 @@ protected void flushShard(IndexShard shard, boolean force) { shard.flush(new FlushRequest(shard.shardId().getIndexName()).force(force)); } + public static boolean recoverFromStore(IndexShard newShard) { + final PlainActionFuture future = PlainActionFuture.newFuture(); + newShard.recoverFromStore(future); + return future.actionGet(); + } + /** Recover a shard from a snapshot using a given repository **/ protected void recoverShardFromSnapshot(final IndexShard shard, final Snapshot snapshot, diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/bulk/BulkShardOperationsTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/bulk/BulkShardOperationsTests.java index 18a2800d05b21..850efa9a74b68 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/bulk/BulkShardOperationsTests.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/bulk/BulkShardOperationsTests.java @@ -132,7 +132,7 @@ public void testPrimaryResultIncludeOnlyAppliedOperations() throws Exception { final IndexShard newPrimary = reinitShard(oldPrimary); DiscoveryNode localNode = new DiscoveryNode("foo", buildNewFakeTransportAddress(), emptyMap(), emptySet(), Version.CURRENT); newPrimary.markAsRecovering("store", new RecoveryState(newPrimary.routingEntry(), localNode, null)); - assertTrue(newPrimary.recoverFromStore()); + assertTrue(recoverFromStore(newPrimary)); IndexShardTestCase.updateRoutingEntry(newPrimary, newPrimary.routingEntry().moveToStarted()); newPrimary.advanceMaxSeqNoOfUpdatesOrDeletes(seqno); // The second bulk includes some operations from the first bulk which were processed already;