Skip to content

Commit

Permalink
Cleanup Redundant Futures in Recovery Code (#48805)
Browse files Browse the repository at this point in the history
Follow up to #48110 cleaning up the redundant future
uses that were left over from that change.
  • Loading branch information
original-brownbear authored Nov 2, 2019
1 parent 0298b6e commit 6742d9c
Show file tree
Hide file tree
Showing 9 changed files with 74 additions and 81 deletions.
71 changes: 30 additions & 41 deletions server/src/main/java/org/elasticsearch/index/shard/IndexShard.java
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
import org.elasticsearch.cluster.routing.RecoverySource.SnapshotRecoverySource;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.common.Booleans;
import org.elasticsearch.common.CheckedConsumer;
import org.elasticsearch.common.CheckedFunction;
import org.elasticsearch.common.CheckedRunnable;
import org.elasticsearch.common.Nullable;
Expand Down Expand Up @@ -1787,34 +1788,38 @@ public ShardPath shardPath() {
return path;
}

public boolean recoverFromLocalShards(BiConsumer<String, MappingMetaData> mappingUpdateConsumer,
List<IndexShard> localShards) throws IOException {
public void recoverFromLocalShards(BiConsumer<String, MappingMetaData> mappingUpdateConsumer, List<IndexShard> localShards,
ActionListener<Boolean> listener) throws IOException {
assert shardRouting.primary() : "recover from local shards only makes sense if the shard is a primary shard";
assert recoveryState.getRecoverySource().getType() == RecoverySource.Type.LOCAL_SHARDS : "invalid recovery type: " +
recoveryState.getRecoverySource();
final List<LocalShardSnapshot> snapshots = new ArrayList<>();
final ActionListener<Boolean> recoveryListener = ActionListener.runBefore(listener, () -> IOUtils.close(snapshots));
boolean success = false;
try {
for (IndexShard shard : localShards) {
snapshots.add(new LocalShardSnapshot(shard));
}

// we are the first primary, recover from the gateway
// if its post api allocation, the index should exists
assert shardRouting.primary() : "recover from local shards only makes sense if the shard is a primary shard";
StoreRecovery storeRecovery = new StoreRecovery(shardId, logger);
return storeRecovery.recoverFromLocalShards(mappingUpdateConsumer, this, snapshots);
storeRecovery.recoverFromLocalShards(mappingUpdateConsumer, this, snapshots, recoveryListener);
success = true;
} finally {
IOUtils.close(snapshots);
if (success == false) {
IOUtils.close(snapshots);
}
}
}

public boolean recoverFromStore() {
public void recoverFromStore(ActionListener<Boolean> listener) {
// we are the first primary, recover from the gateway
// if its post api allocation, the index should exists
assert shardRouting.primary() : "recover from store only makes sense if the shard is a primary shard";
assert shardRouting.initializing() : "can only start recovery on initializing shard";
StoreRecovery storeRecovery = new StoreRecovery(shardId, logger);
return storeRecovery.recoverFromStore(this);
storeRecovery.recoverFromStore(this, listener);
}

public void restoreFromRepository(Repository repository, ActionListener<Boolean> listener) {
Expand Down Expand Up @@ -2484,17 +2489,7 @@ public void startRecovery(RecoveryState recoveryState, PeerRecoveryTargetService
switch (recoveryState.getRecoverySource().getType()) {
case EMPTY_STORE:
case EXISTING_STORE:
markAsRecovering("from store", recoveryState); // mark the shard as recovering on the cluster state thread
threadPool.generic().execute(() -> {
try {
if (recoverFromStore()) {
recoveryListener.onRecoveryDone(recoveryState);
}
} catch (Exception e) {
recoveryListener.onRecoveryFailure(recoveryState,
new RecoveryFailedException(recoveryState, null, e), true);
}
});
executeRecovery("from store", recoveryState, recoveryListener, this::recoverFromStore);
break;
case PEER:
try {
Expand All @@ -2507,17 +2502,9 @@ public void startRecovery(RecoveryState recoveryState, PeerRecoveryTargetService
}
break;
case SNAPSHOT:
markAsRecovering("from snapshot", recoveryState); // mark the shard as recovering on the cluster state thread
SnapshotRecoverySource recoverySource = (SnapshotRecoverySource) recoveryState.getRecoverySource();
threadPool.generic().execute(
ActionRunnable.<Boolean>wrap(ActionListener.wrap(r -> {
if (r) {
recoveryListener.onRecoveryDone(recoveryState);
}
},
e -> recoveryListener.onRecoveryFailure(recoveryState, new RecoveryFailedException(recoveryState, null, e), true)),
restoreListener -> restoreFromRepository(
repositoriesService.repository(recoverySource.snapshot().getRepository()), restoreListener)));
final String repo = ((SnapshotRecoverySource) recoveryState.getRecoverySource()).snapshot().getRepository();
executeRecovery("from snapshot",
recoveryState, recoveryListener, l -> restoreFromRepository(repositoriesService.repository(repo), l));
break;
case LOCAL_SHARDS:
final IndexMetaData indexMetaData = indexSettings().getIndexMetaData();
Expand All @@ -2542,18 +2529,9 @@ public void startRecovery(RecoveryState recoveryState, PeerRecoveryTargetService

if (numShards == startedShards.size()) {
assert requiredShards.isEmpty() == false;
markAsRecovering("from local shards", recoveryState); // mark the shard as recovering on the cluster state thread
threadPool.generic().execute(() -> {
try {
if (recoverFromLocalShards(mappingUpdateConsumer, startedShards.stream()
.filter((s) -> requiredShards.contains(s.shardId())).collect(Collectors.toList()))) {
recoveryListener.onRecoveryDone(recoveryState);
}
} catch (Exception e) {
recoveryListener.onRecoveryFailure(recoveryState,
new RecoveryFailedException(recoveryState, null, e), true);
}
});
executeRecovery("from local shards", recoveryState, recoveryListener,
l -> recoverFromLocalShards(mappingUpdateConsumer,
startedShards.stream().filter((s) -> requiredShards.contains(s.shardId())).collect(Collectors.toList()), l));
} else {
final RuntimeException e;
if (numShards == -1) {
Expand All @@ -2571,6 +2549,17 @@ public void startRecovery(RecoveryState recoveryState, PeerRecoveryTargetService
}
}

private void executeRecovery(String reason, RecoveryState recoveryState, PeerRecoveryTargetService.RecoveryListener recoveryListener,
CheckedConsumer<ActionListener<Boolean>, Exception> action) {
markAsRecovering(reason, recoveryState); // mark the shard as recovering on the cluster state thread
threadPool.generic().execute(ActionRunnable.wrap(ActionListener.wrap(r -> {
if (r) {
recoveryListener.onRecoveryDone(recoveryState);
}
},
e -> recoveryListener.onRecoveryFailure(recoveryState, new RecoveryFailedException(recoveryState, null, e), true)), action));
}

/**
* Returns whether the shard is a relocated primary, i.e. not in charge anymore of replicating changes (see {@link ReplicationTracker}).
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@
import org.apache.lucene.store.IndexInput;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MappingMetaData;
import org.elasticsearch.cluster.routing.RecoverySource;
Expand Down Expand Up @@ -81,31 +80,27 @@ final class StoreRecovery {
* exist on disk ie. has been previously allocated or if the shard is a brand new allocation without pre-existing index
* files / transaction logs. This
* @param indexShard the index shard instance to recovery the shard into
* @return <code>true</code> if the shard has been recovered successfully, <code>false</code> if the recovery
* has been ignored due to a concurrent modification of if the clusters state has changed due to async updates.
* @param listener resolves to <code>true</code> if the shard has been recovered successfully, <code>false</code> if the recovery
* has been ignored due to a concurrent modification of if the clusters state has changed due to async updates.
* @see Store
*/
boolean recoverFromStore(final IndexShard indexShard) {
void recoverFromStore(final IndexShard indexShard, ActionListener<Boolean> listener) {
if (canRecover(indexShard)) {
RecoverySource.Type recoveryType = indexShard.recoveryState().getRecoverySource().getType();
assert recoveryType == RecoverySource.Type.EMPTY_STORE || recoveryType == RecoverySource.Type.EXISTING_STORE :
"expected store recovery type but was: " + recoveryType;
final PlainActionFuture<Boolean> future = PlainActionFuture.newFuture();
final ActionListener<Boolean> recoveryListener = recoveryListener(indexShard, future);
try {
ActionListener.completeWith(recoveryListener(indexShard, listener), () -> {
logger.debug("starting recovery from store ...");
internalRecoverFromStore(indexShard);
recoveryListener.onResponse(true);
} catch (Exception e) {
recoveryListener.onFailure(e);
}
return future.actionGet();
return true;
});
} else {
listener.onResponse(false);
}
return false;
}

boolean recoverFromLocalShards(BiConsumer<String, MappingMetaData> mappingUpdateConsumer,
final IndexShard indexShard, final List<LocalShardSnapshot> shards) {
void recoverFromLocalShards(BiConsumer<String, MappingMetaData> mappingUpdateConsumer, IndexShard indexShard,
List<LocalShardSnapshot> shards, ActionListener<Boolean> listener) {
if (canRecover(indexShard)) {
RecoverySource.Type recoveryType = indexShard.recoveryState().getRecoverySource().getType();
assert recoveryType == RecoverySource.Type.LOCAL_SHARDS: "expected local shards recovery type: " + recoveryType;
Expand All @@ -125,8 +120,7 @@ boolean recoverFromLocalShards(BiConsumer<String, MappingMetaData> mappingUpdate
Sort indexSort = indexShard.getIndexSort();
final boolean hasNested = indexShard.mapperService().hasNested();
final boolean isSplit = sourceMetaData.getNumberOfShards() < indexShard.indexSettings().getNumberOfShards();
final PlainActionFuture<Boolean> future = PlainActionFuture.newFuture();
ActionListener.completeWith(recoveryListener(indexShard, future), () -> {
ActionListener.completeWith(recoveryListener(indexShard, listener), () -> {
logger.debug("starting recovery from local shards {}", shards);
try {
final Directory directory = indexShard.store().directory(); // don't close this directory!!
Expand All @@ -146,10 +140,9 @@ boolean recoverFromLocalShards(BiConsumer<String, MappingMetaData> mappingUpdate
throw new IndexShardRecoveryException(indexShard.shardId(), "failed to recover from local shards", ex);
}
});
assert future.isDone();
return future.actionGet();
} else {
listener.onResponse(false);
}
return false;
}

void addIndices(final RecoveryState.Index indexRecoveryStats, final Directory target, final Sort indexSort, final Directory[] sources,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,7 @@
import static org.elasticsearch.index.seqno.SequenceNumbers.NO_OPS_PERFORMED;
import static org.elasticsearch.index.seqno.SequenceNumbers.UNASSIGNED_SEQ_NO;
import static org.elasticsearch.index.shard.IndexShardTestCase.getTranslog;
import static org.elasticsearch.index.shard.IndexShardTestCase.recoverFromStore;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertNoFailures;
Expand Down Expand Up @@ -641,7 +642,7 @@ public void testCircuitBreakerIncrementedByIndexShard() throws Exception {
public static final IndexShard recoverShard(IndexShard newShard) throws IOException {
DiscoveryNode localNode = new DiscoveryNode("foo", buildNewFakeTransportAddress(), emptyMap(), emptySet(), Version.CURRENT);
newShard.markAsRecovering("store", new RecoveryState(newShard.routingEntry(), localNode, null));
assertTrue(newShard.recoverFromStore());
recoverFromStore(newShard);
IndexShardTestCase.updateRoutingEntry(newShard, newShard.routingEntry().moveToStarted());
return newShard;
}
Expand Down
Loading

0 comments on commit 6742d9c

Please sign in to comment.