Skip to content

Commit

Permalink
Restore from Individual Shard Snapshot Files in Parallel (#48110)
Browse files Browse the repository at this point in the history
Make restoring shard snapshots run in parallel on the `SNAPSHOT` thread-pool.
  • Loading branch information
original-brownbear authored Oct 30, 2019
1 parent 678492d commit e58fc03
Show file tree
Hide file tree
Showing 15 changed files with 383 additions and 285 deletions.
37 changes: 20 additions & 17 deletions server/src/main/java/org/elasticsearch/index/shard/IndexShard.java
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionRunnable;
import org.elasticsearch.action.admin.indices.flush.FlushRequest;
import org.elasticsearch.action.admin.indices.forcemerge.ForceMergeRequest;
import org.elasticsearch.action.admin.indices.upgrade.post.UpgradeRequest;
Expand Down Expand Up @@ -1816,12 +1817,16 @@ public boolean recoverFromStore() {
return storeRecovery.recoverFromStore(this);
}

public boolean restoreFromRepository(Repository repository) {
assert shardRouting.primary() : "recover from store only makes sense if the shard is a primary shard";
assert recoveryState.getRecoverySource().getType() == RecoverySource.Type.SNAPSHOT : "invalid recovery type: " +
recoveryState.getRecoverySource();
StoreRecovery storeRecovery = new StoreRecovery(shardId, logger);
return storeRecovery.recoverFromRepository(this, repository);
public void restoreFromRepository(Repository repository, ActionListener<Boolean> listener) {
try {
assert shardRouting.primary() : "recover from store only makes sense if the shard is a primary shard";
assert recoveryState.getRecoverySource().getType() == RecoverySource.Type.SNAPSHOT : "invalid recovery type: " +
recoveryState.getRecoverySource();
StoreRecovery storeRecovery = new StoreRecovery(shardId, logger);
storeRecovery.recoverFromRepository(this, repository, listener);
} catch (Exception e) {
listener.onFailure(e);
}
}

/**
Expand Down Expand Up @@ -2504,17 +2509,15 @@ public void startRecovery(RecoveryState recoveryState, PeerRecoveryTargetService
case SNAPSHOT:
markAsRecovering("from snapshot", recoveryState); // mark the shard as recovering on the cluster state thread
SnapshotRecoverySource recoverySource = (SnapshotRecoverySource) recoveryState.getRecoverySource();
threadPool.generic().execute(() -> {
try {
final Repository repository = repositoriesService.repository(recoverySource.snapshot().getRepository());
if (restoreFromRepository(repository)) {
recoveryListener.onRecoveryDone(recoveryState);
}
} catch (Exception e) {
recoveryListener.onRecoveryFailure(recoveryState,
new RecoveryFailedException(recoveryState, null, e), true);
}
});
threadPool.generic().execute(
ActionRunnable.<Boolean>wrap(ActionListener.wrap(r -> {
if (r) {
recoveryListener.onRecoveryDone(recoveryState);
}
},
e -> recoveryListener.onRecoveryFailure(recoveryState, new RecoveryFailedException(recoveryState, null, e), true)),
restoreListener -> restoreFromRepository(
repositoriesService.repository(recoverySource.snapshot().getRepository()), restoreListener)));
break;
case LOCAL_SHARDS:
final IndexMetaData indexMetaData = indexSettings().getIndexMetaData();
Expand Down
175 changes: 99 additions & 76 deletions server/src/main/java/org/elasticsearch/index/shard/StoreRecovery.java

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -123,8 +123,9 @@ public void snapshotShard(Store store, MapperService mapperService, SnapshotId s
in.snapshotShard(store, mapperService, snapshotId, indexId, snapshotIndexCommit, snapshotStatus, writeShardGens, listener);
}
@Override
public void restoreShard(Store store, SnapshotId snapshotId, IndexId indexId, ShardId snapshotShardId, RecoveryState recoveryState) {
in.restoreShard(store, snapshotId, indexId, snapshotShardId, recoveryState);
public void restoreShard(Store store, SnapshotId snapshotId, IndexId indexId, ShardId snapshotShardId, RecoveryState recoveryState,
ActionListener<Void> listener) {
in.restoreShard(store, snapshotId, indexId, snapshotShardId, recoveryState, listener);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -211,9 +211,10 @@ void snapshotShard(Store store, MapperService mapperService, SnapshotId snapshot
* @param indexId id of the index in the repository from which the restore is occurring
* @param snapshotShardId shard id (in the snapshot)
* @param recoveryState recovery state
* @param listener listener to invoke once done
*/
void restoreShard(Store store, SnapshotId snapshotId, IndexId indexId, ShardId snapshotShardId, RecoveryState recoveryState);

void restoreShard(Store store, SnapshotId snapshotId, IndexId indexId, ShardId snapshotShardId, RecoveryState recoveryState,
ActionListener<Void> listener);
/**
* Retrieve shard snapshot status for the stored snapshot
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1195,11 +1195,7 @@ public void snapshotShard(Store store, MapperService mapperService, SnapshotId s
final Executor executor = threadPool.executor(ThreadPool.Names.SNAPSHOT);
// Start as many workers as fit into the snapshot pool at once at the most
final int workers = Math.min(threadPool.info(ThreadPool.Names.SNAPSHOT).getMax(), indexIncrementalFileCount);
final ActionListener<Void> filesListener = ActionListener.delegateResponse(
new GroupedActionListener<>(allFilesUploadedListener, workers), (l, e) -> {
filesToSnapshot.clear(); // Stop uploading the remaining files if we run into any exception
l.onFailure(e);
});
final ActionListener<Void> filesListener = fileQueueListener(filesToSnapshot, workers, allFilesUploadedListener);
for (int i = 0; i < workers; ++i) {
executor.execute(ActionRunnable.run(filesListener, () -> {
BlobStoreIndexShardSnapshot.FileInfo snapshotFileInfo = filesToSnapshot.poll(0L, TimeUnit.MILLISECONDS);
Expand All @@ -1223,19 +1219,42 @@ public void snapshotShard(Store store, MapperService mapperService, SnapshotId s

@Override
public void restoreShard(Store store, SnapshotId snapshotId, IndexId indexId, ShardId snapshotShardId,
RecoveryState recoveryState) {
ShardId shardId = store.shardId();
try {
final BlobContainer container = shardContainer(indexId, snapshotShardId);
BlobStoreIndexShardSnapshot snapshot = loadShardSnapshot(container, snapshotId);
SnapshotFiles snapshotFiles = new SnapshotFiles(snapshot.snapshot(), snapshot.indexFiles());
RecoveryState recoveryState, ActionListener<Void> listener) {
final ShardId shardId = store.shardId();
final ActionListener<Void> restoreListener = ActionListener.delegateResponse(listener,
(l, e) -> l.onFailure(new IndexShardRestoreFailedException(shardId, "failed to restore snapshot [" + snapshotId + "]", e)));
final Executor executor = threadPool.executor(ThreadPool.Names.SNAPSHOT);
final BlobContainer container = shardContainer(indexId, snapshotShardId);
executor.execute(ActionRunnable.wrap(restoreListener, l -> {
final BlobStoreIndexShardSnapshot snapshot = loadShardSnapshot(container, snapshotId);
final SnapshotFiles snapshotFiles = new SnapshotFiles(snapshot.snapshot(), snapshot.indexFiles());
new FileRestoreContext(metadata.name(), shardId, snapshotId, recoveryState) {
@Override
protected void restoreFiles(List<BlobStoreIndexShardSnapshot.FileInfo> filesToRecover, Store store) throws IOException {
// restore the files from the snapshot to the Lucene store
for (final BlobStoreIndexShardSnapshot.FileInfo fileToRecover : filesToRecover) {
logger.trace("[{}] [{}] restoring file [{}]", shardId, snapshotId, fileToRecover.name());
restoreFile(fileToRecover, store);
protected void restoreFiles(List<BlobStoreIndexShardSnapshot.FileInfo> filesToRecover, Store store,
ActionListener<Void> listener) {
if (filesToRecover.isEmpty()) {
listener.onResponse(null);
} else {
// Start as many workers as fit into the snapshot pool at once at the most
final int workers =
Math.min(threadPool.info(ThreadPool.Names.SNAPSHOT).getMax(), snapshotFiles.indexFiles().size());
final BlockingQueue<BlobStoreIndexShardSnapshot.FileInfo> files = new LinkedBlockingQueue<>(filesToRecover);
final ActionListener<Void> allFilesListener =
fileQueueListener(files, workers, ActionListener.map(listener, v -> null));
// restore the files from the snapshot to the Lucene store
for (int i = 0; i < workers; ++i) {
executor.execute(ActionRunnable.run(allFilesListener, () -> {
store.incRef();
try {
BlobStoreIndexShardSnapshot.FileInfo fileToRecover;
while ((fileToRecover = files.poll(0L, TimeUnit.MILLISECONDS)) != null) {
restoreFile(fileToRecover, store);
}
} finally {
store.decRef();
}
}));
}
}
}

Expand Down Expand Up @@ -1275,10 +1294,16 @@ protected InputStream openSlice(long slice) throws IOException {
}
}
}
}.restore(snapshotFiles, store);
} catch (Exception e) {
throw new IndexShardRestoreFailedException(shardId, "failed to restore snapshot [" + snapshotId + "]", e);
}
}.restore(snapshotFiles, store, l);
}));
}

private static ActionListener<Void> fileQueueListener(BlockingQueue<BlobStoreIndexShardSnapshot.FileInfo> files, int workers,
ActionListener<Collection<Void>> listener) {
return ActionListener.delegateResponse(new GroupedActionListener<>(listener, workers), (l, e) -> {
files.clear(); // Stop uploading the remaining files if we run into any exception
l.onFailure(e);
});
}

private static InputStream maybeRateLimit(InputStream stream, @Nullable RateLimiter rateLimiter, CounterMetric metric) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.common.lucene.Lucene;
import org.elasticsearch.common.util.iterable.Iterables;
import org.elasticsearch.index.shard.ShardId;
Expand Down Expand Up @@ -74,7 +75,7 @@ protected FileRestoreContext(String repositoryName, ShardId shardId, SnapshotId
/**
* Performs restore operation
*/
public void restore(SnapshotFiles snapshotFiles, Store store) {
public void restore(SnapshotFiles snapshotFiles, Store store, ActionListener<Void> listener) {
store.incRef();
try {
logger.debug("[{}] [{}] restoring to [{}] ...", snapshotId, repositoryName, shardId);
Expand Down Expand Up @@ -150,36 +151,49 @@ public void restore(SnapshotFiles snapshotFiles, Store store) {
}
}

restoreFiles(filesToRecover, store);
restoreFiles(filesToRecover, store, ActionListener.wrap(
v -> {
store.incRef();
try {
afterRestore(snapshotFiles, store, restoredSegmentsFile);
listener.onResponse(null);
} finally {
store.decRef();
}
}, listener::onFailure));
} catch (IOException ex) {
throw new IndexShardRestoreFailedException(shardId, "Failed to recover index", ex);
}
} catch (Exception e) {
listener.onFailure(e);
} finally {
store.decRef();
}
}

// read the snapshot data persisted
try {
Lucene.pruneUnreferencedFiles(restoredSegmentsFile.name(), store.directory());
} catch (IOException e) {
throw new IndexShardRestoreFailedException(shardId, "Failed to fetch index version after copying it over", e);
}
private void afterRestore(SnapshotFiles snapshotFiles, Store store, StoreFileMetaData restoredSegmentsFile) {
// read the snapshot data persisted
try {
Lucene.pruneUnreferencedFiles(restoredSegmentsFile.name(), store.directory());
} catch (IOException e) {
throw new IndexShardRestoreFailedException(shardId, "Failed to fetch index version after copying it over", e);
}

/// now, go over and clean files that are in the store, but were not in the snapshot
try {
for (String storeFile : store.directory().listAll()) {
if (Store.isAutogenerated(storeFile) || snapshotFiles.containPhysicalIndexFile(storeFile)) {
continue; //skip write.lock, checksum files and files that exist in the snapshot
}
try {
store.deleteQuiet("restore", storeFile);
store.directory().deleteFile(storeFile);
} catch (IOException e) {
logger.warn("[{}] [{}] failed to delete file [{}] during snapshot cleanup", shardId, snapshotId, storeFile);
}
/// now, go over and clean files that are in the store, but were not in the snapshot
try {
for (String storeFile : store.directory().listAll()) {
if (Store.isAutogenerated(storeFile) || snapshotFiles.containPhysicalIndexFile(storeFile)) {
continue; //skip write.lock, checksum files and files that exist in the snapshot
}
try {
store.deleteQuiet("restore", storeFile);
store.directory().deleteFile(storeFile);
} catch (IOException e) {
logger.warn("[{}] [{}] failed to delete file [{}] during snapshot cleanup", shardId, snapshotId, storeFile);
}
} catch (IOException e) {
logger.warn("[{}] [{}] failed to list directory - some of files might not be deleted", shardId, snapshotId);
}
} finally {
store.decRef();
} catch (IOException e) {
logger.warn("[{}] [{}] failed to list directory - some of files might not be deleted", shardId, snapshotId);
}
}

Expand All @@ -189,7 +203,8 @@ public void restore(SnapshotFiles snapshotFiles, Store store) {
* @param filesToRecover List of files to restore
* @param store Store to restore into
*/
protected abstract void restoreFiles(List<BlobStoreIndexShardSnapshot.FileInfo> filesToRecover, Store store) throws IOException;
protected abstract void restoreFiles(List<BlobStoreIndexShardSnapshot.FileInfo> filesToRecover, Store store,
ActionListener<Void> listener);

@SuppressWarnings("unchecked")
private static Iterable<StoreFileMetaData> concat(Store.RecoveryDiff diff) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@
* {@link RoutingTable.Builder#addAsRestore(IndexMetaData, SnapshotRecoverySource)} method.
* <p>
* Individual shards are getting restored as part of normal recovery process in
* {@link IndexShard#restoreFromRepository(Repository)} )}
* {@link IndexShard#restoreFromRepository} )}
* method, which detects that shard should be restored from snapshot rather than recovered from gateway by looking
* at the {@link ShardRouting#recoverySource()} property.
* <p>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2338,23 +2338,24 @@ public void testRestoreShard() throws IOException {

DiscoveryNode localNode = new DiscoveryNode("foo", buildNewFakeTransportAddress(), emptyMap(), emptySet(), Version.CURRENT);
target.markAsRecovering("store", new RecoveryState(routing, localNode, null));
assertTrue(target.restoreFromRepository(new RestoreOnlyRepository("test") {
final PlainActionFuture<Boolean> future = PlainActionFuture.newFuture();
target.restoreFromRepository(new RestoreOnlyRepository("test") {
@Override
public void restoreShard(Store store, SnapshotId snapshotId, IndexId indexId, ShardId snapshotShardId,
RecoveryState recoveryState) {
try {
RecoveryState recoveryState, ActionListener<Void> listener) {
ActionListener.completeWith(listener, () -> {
cleanLuceneIndex(targetStore.directory());
for (String file : sourceStore.directory().listAll()) {
if (file.equals("write.lock") || file.startsWith("extra")) {
continue;
}
targetStore.directory().copyFrom(sourceStore.directory(), file, file, IOContext.DEFAULT);
}
} catch (Exception ex) {
throw new RuntimeException(ex);
}
return null;
});
}
}));
}, future);
assertTrue(future.actionGet());
assertThat(target.getLocalCheckpoint(), equalTo(2L));
assertThat(target.seqNoStats().getMaxSeqNo(), equalTo(2L));
assertThat(target.seqNoStats().getGlobalCheckpoint(), equalTo(0L));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,8 @@ public void snapshotShard(Store store, MapperService mapperService, SnapshotId s

@Override
public void restoreShard(Store store, SnapshotId snapshotId, IndexId indexId, ShardId snapshotShardId,
RecoveryState recoveryState) {
RecoveryState recoveryState, ActionListener<Void> listener) {

}

@Override
Expand Down
Loading

0 comments on commit e58fc03

Please sign in to comment.