Skip to content

Commit

Permalink
Remove IndexShard dependency from Repository (#42213)
Browse files Browse the repository at this point in the history
* Remove IndexShard dependency from Repository

In order to simplify repository testing especially for BlobStoreRepository
it's important to remove the dependency on IndexShard and reduce it to
Store and MapperService (in the snapshot case). This significantly reduces
the dependcy footprint for Repository and allows unittesting without starting
nodes or instantiate entire shard instances. This change deprecates the old
method signatures and adds a unittest for FileRepository to show the advantage
of this change.
In addition, the unittesting surfaced a bug where the internal file names that
are private to the repository were used in the recovery stats instead of the
target file names which makes it impossible to relate to the actual lucene files
in the recovery stats.

* don't delegate deprecated methods

* apply comments

* test
  • Loading branch information
s1monw committed May 22, 2019
1 parent 3a20ff7 commit a79cd77
Show file tree
Hide file tree
Showing 16 changed files with 353 additions and 107 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -476,7 +476,8 @@ private void restore(final IndexShard indexShard, final Repository repository, f
snapshotShardId = new ShardId(indexName, IndexMetaData.INDEX_UUID_NA_VALUE, shardId.id());
}
final IndexId indexId = repository.getRepositoryData().resolveIndexId(indexName);
repository.restoreShard(indexShard, restoreSource.snapshot().getSnapshotId(),
assert indexShard.getEngineOrNull() == null;
repository.restoreShard(indexShard, indexShard.store(), restoreSource.snapshot().getSnapshotId(),
restoreSource.version(), indexId, snapshotShardId, indexShard.recoveryState());
final Store store = indexShard.store();
store.bootstrapNewHistory();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.component.Lifecycle;
import org.elasticsearch.common.component.LifecycleListener;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.mapper.MapperService;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.snapshots.IndexShardSnapshotStatus;
import org.elasticsearch.index.store.Store;
Expand Down Expand Up @@ -119,16 +119,17 @@ public boolean isReadOnly() {
return in.isReadOnly();
}


@Override
public void snapshotShard(IndexShard shard, Store store, SnapshotId snapshotId, IndexId indexId, IndexCommit snapshotIndexCommit,
IndexShardSnapshotStatus snapshotStatus) {
in.snapshotShard(shard, store, snapshotId, indexId, snapshotIndexCommit, snapshotStatus);
public void snapshotShard(Store store, MapperService mapperService, SnapshotId snapshotId, IndexId indexId,
IndexCommit snapshotIndexCommit, IndexShardSnapshotStatus snapshotStatus) {
in.snapshotShard(store, mapperService, snapshotId, indexId, snapshotIndexCommit, snapshotStatus);
}

@Override
public void restoreShard(IndexShard shard, SnapshotId snapshotId, Version version, IndexId indexId, ShardId snapshotShardId,
RecoveryState recoveryState) {
in.restoreShard(shard, snapshotId, version, indexId, snapshotShardId, recoveryState);
public void restoreShard(Store store, SnapshotId snapshotId,
Version version, IndexId indexId, ShardId snapshotShardId, RecoveryState recoveryState) {
in.restoreShard(store, snapshotId, version, indexId, snapshotShardId, recoveryState);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.elasticsearch.cluster.metadata.RepositoryMetaData;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.component.LifecycleComponent;
import org.elasticsearch.index.mapper.MapperService;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.snapshots.IndexShardSnapshotStatus;
Expand All @@ -49,7 +50,7 @@
* <ul>
* <li>Master calls {@link #initializeSnapshot(SnapshotId, List, org.elasticsearch.cluster.metadata.MetaData)}
* with list of indices that will be included into the snapshot</li>
* <li>Data nodes call {@link Repository#snapshotShard(IndexShard, Store, SnapshotId, IndexId, IndexCommit, IndexShardSnapshotStatus)}
* <li>Data nodes call {@link Repository#snapshotShard(Store, MapperService, SnapshotId, IndexId, IndexCommit, IndexShardSnapshotStatus)}
* for each shard</li>
* <li>When all shard calls return master calls {@link #finalizeSnapshot} with possible list of failures</li>
* </ul>
Expand Down Expand Up @@ -196,30 +197,69 @@ SnapshotInfo finalizeSnapshot(SnapshotId snapshotId, List<IndexId> indices, long
* <p>
* As snapshot process progresses, implementation of this method should update {@link IndexShardSnapshotStatus} object and check
* {@link IndexShardSnapshotStatus#isAborted()} to see if the snapshot process should be aborted.
* @param shard shard to be snapshotted
* @param indexShard the shard to be snapshotted
* @param snapshotId snapshot id
* @param indexId id for the index being snapshotted
* @param snapshotIndexCommit commit point
* @param snapshotStatus snapshot status
* @deprecated use {@link #snapshotShard(Store, MapperService, SnapshotId, IndexId, IndexCommit, IndexShardSnapshotStatus)} instead
*/
@Deprecated
default void snapshotShard(IndexShard indexShard, SnapshotId snapshotId, IndexId indexId, IndexCommit snapshotIndexCommit,
IndexShardSnapshotStatus snapshotStatus) {
snapshotShard(indexShard.store(), indexShard.mapperService(), snapshotId, indexId, snapshotIndexCommit, snapshotStatus);
}

/**
* Creates a snapshot of the shard based on the index commit point.
* <p>
* The index commit point can be obtained by using {@link org.elasticsearch.index.engine.Engine#acquireLastIndexCommit} method.
* Repository implementations shouldn't release the snapshot index commit point. It is done by the method caller.
* <p>
* As snapshot process progresses, implementation of this method should update {@link IndexShardSnapshotStatus} object and check
* {@link IndexShardSnapshotStatus#isAborted()} to see if the snapshot process should be aborted.
* @param store store to be snapshotted
* @param mapperService the shards mapper service
* @param snapshotId snapshot id
* @param indexId id for the index being snapshotted
* @param snapshotIndexCommit commit point
* @param snapshotStatus snapshot status
*/
void snapshotShard(IndexShard shard, Store store, SnapshotId snapshotId, IndexId indexId, IndexCommit snapshotIndexCommit,
void snapshotShard(Store store, MapperService mapperService, SnapshotId snapshotId, IndexId indexId, IndexCommit snapshotIndexCommit,
IndexShardSnapshotStatus snapshotStatus);

/**
* Restores snapshot of the shard.
* <p>
* The index can be renamed on restore, hence different {@code shardId} and {@code snapshotShardId} are supplied.
*
* @param shard the shard to restore the index into
* @param store the store to restore the index into
* @param snapshotId snapshot id
* @param version version of elasticsearch that created this snapshot
* @param indexId id of the index in the repository from which the restore is occurring
* @param snapshotShardId shard id (in the snapshot)
* @param recoveryState recovery state
* @deprecated use {@link #restoreShard(Store, SnapshotId, Version, IndexId, ShardId, RecoveryState)} instead
*/
@Deprecated
default void restoreShard(IndexShard shard, Store store, SnapshotId snapshotId, Version version, IndexId indexId,
ShardId snapshotShardId, RecoveryState recoveryState) {
restoreShard(store, snapshotId, version, indexId, snapshotShardId, recoveryState);
}

/**
* Restores snapshot of the shard.
* <p>
* The index can be renamed on restore, hence different {@code shardId} and {@code snapshotShardId} are supplied.
* @param store the store to restore the index into
* @param snapshotId snapshot id
* @param version version of elasticsearch that created this snapshot
* @param indexId id of the index in the repository from which the restore is occurring
* @param snapshotShardId shard id (in the snapshot)
* @param recoveryState recovery state
*/
void restoreShard(IndexShard shard, SnapshotId snapshotId, Version version, IndexId indexId,
ShardId snapshotShardId, RecoveryState recoveryState);
void restoreShard(Store store, SnapshotId snapshotId, Version version, IndexId indexId, ShardId snapshotShardId,
RecoveryState recoveryState);

/**
* Retrieve shard snapshot status for the stored snapshot
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.core.internal.io.Streams;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.mapper.MapperService;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.snapshots.IndexShardRestoreFailedException;
import org.elasticsearch.index.snapshots.IndexShardSnapshotException;
Expand Down Expand Up @@ -830,8 +830,8 @@ private void writeAtomic(final String blobName, final BytesReference bytesRef, b
}

@Override
public void snapshotShard(IndexShard shard, Store store, SnapshotId snapshotId, IndexId indexId, IndexCommit snapshotIndexCommit,
IndexShardSnapshotStatus snapshotStatus) {
public void snapshotShard(Store store, MapperService mapperService, SnapshotId snapshotId, IndexId indexId,
IndexCommit snapshotIndexCommit, IndexShardSnapshotStatus snapshotStatus) {
SnapshotContext snapshotContext = new SnapshotContext(store, snapshotId, indexId, snapshotStatus, System.currentTimeMillis());
try {
snapshotContext.snapshot(snapshotIndexCommit);
Expand All @@ -846,18 +846,19 @@ public void snapshotShard(IndexShard shard, Store store, SnapshotId snapshotId,
}

@Override
public void restoreShard(IndexShard shard, SnapshotId snapshotId, Version version, IndexId indexId, ShardId snapshotShardId,
RecoveryState recoveryState) {
final Context context = new Context(snapshotId, indexId, shard.shardId(), snapshotShardId);
public void restoreShard(Store store, SnapshotId snapshotId,
Version version, IndexId indexId, ShardId snapshotShardId, RecoveryState recoveryState) {
ShardId shardId = store.shardId();
final Context context = new Context(snapshotId, indexId, shardId, snapshotShardId);
BlobPath path = basePath().add("indices").add(indexId.getId()).add(Integer.toString(snapshotShardId.getId()));
BlobContainer blobContainer = blobStore().blobContainer(path);
final RestoreContext snapshotContext = new RestoreContext(shard, snapshotId, recoveryState, blobContainer);
final RestoreContext snapshotContext = new RestoreContext(shardId, snapshotId, recoveryState, blobContainer);
try {
BlobStoreIndexShardSnapshot snapshot = context.loadSnapshot();
SnapshotFiles snapshotFiles = new SnapshotFiles(snapshot.snapshot(), snapshot.indexFiles());
snapshotContext.restore(snapshotFiles);
snapshotContext.restore(snapshotFiles, store);
} catch (Exception e) {
throw new IndexShardRestoreFailedException(shard.shardId(), "failed to restore snapshot [" + snapshotId + "]", e);
throw new IndexShardRestoreFailedException(shardId, "failed to restore snapshot [" + snapshotId + "]", e);
}
}

Expand Down Expand Up @@ -1403,13 +1404,13 @@ private class RestoreContext extends FileRestoreContext {

/**
* Constructs new restore context
* @param indexShard shard to restore into
* @param shardId shard id to restore into
* @param snapshotId snapshot id
* @param recoveryState recovery state to report progress
* @param blobContainer the blob container to read the files from
*/
RestoreContext(IndexShard indexShard, SnapshotId snapshotId, RecoveryState recoveryState, BlobContainer blobContainer) {
super(metadata.name(), indexShard, snapshotId, recoveryState, BUFFER_SIZE);
RestoreContext(ShardId shardId, SnapshotId snapshotId, RecoveryState recoveryState, BlobContainer blobContainer) {
super(metadata.name(), shardId, snapshotId, recoveryState, BUFFER_SIZE);
this.blobContainer = blobContainer;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@
import org.apache.lucene.util.BytesRefBuilder;
import org.elasticsearch.common.lucene.Lucene;
import org.elasticsearch.common.util.iterable.Iterables;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.snapshots.IndexShardRestoreFailedException;
import org.elasticsearch.index.snapshots.blobstore.BlobStoreIndexShardSnapshot;
Expand Down Expand Up @@ -64,7 +63,6 @@ public abstract class FileRestoreContext {
protected static final Logger logger = LogManager.getLogger(FileRestoreContext.class);

protected final String repositoryName;
protected final IndexShard indexShard;
protected final RecoveryState recoveryState;
protected final SnapshotId snapshotId;
protected final ShardId shardId;
Expand All @@ -73,26 +71,24 @@ public abstract class FileRestoreContext {
/**
* Constructs new restore context
*
* @param indexShard shard to restore into
* @param shardId shard id to restore into
* @param snapshotId snapshot id
* @param recoveryState recovery state to report progress
* @param bufferSize buffer size for restore
*/
protected FileRestoreContext(String repositoryName, IndexShard indexShard, SnapshotId snapshotId, RecoveryState recoveryState,
protected FileRestoreContext(String repositoryName, ShardId shardId, SnapshotId snapshotId, RecoveryState recoveryState,
int bufferSize) {
this.repositoryName = repositoryName;
this.recoveryState = recoveryState;
this.indexShard = indexShard;
this.snapshotId = snapshotId;
this.shardId = indexShard.shardId();
this.shardId = shardId;
this.bufferSize = bufferSize;
}

/**
* Performs restore operation
*/
public void restore(SnapshotFiles snapshotFiles) throws IOException {
final Store store = indexShard.store();
public void restore(SnapshotFiles snapshotFiles, Store store) throws IOException {
store.incRef();
try {
logger.debug("[{}] [{}] restoring to [{}] ...", snapshotId, repositoryName, shardId);
Expand All @@ -108,7 +104,7 @@ public void restore(SnapshotFiles snapshotFiles) throws IOException {
// version number and no checksum, even though the index itself is perfectly fine to restore, this
// empty shard would cause exceptions to be thrown. Since there is no data to restore from an empty
// shard anyway, we just create the empty shard here and then exit.
store.createEmpty(indexShard.indexSettings().getIndexMetaData().getCreationVersion().luceneVersion);
store.createEmpty(store.indexSettings().getIndexVersionCreated().luceneVersion);
return;
}

Expand All @@ -117,7 +113,7 @@ public void restore(SnapshotFiles snapshotFiles) throws IOException {
// this will throw an IOException if the store has no segments infos file. The
// store can still have existing files but they will be deleted just before being
// restored.
recoveryTargetMetadata = indexShard.snapshotStoreMetadata();
recoveryTargetMetadata = store.getMetadata(null, true);
} catch (org.apache.lucene.index.IndexNotFoundException e) {
// happens when restore to an empty shard, not a big deal
logger.trace("[{}] [{}] restoring from to an empty shard", shardId, snapshotId);
Expand All @@ -127,7 +123,6 @@ public void restore(SnapshotFiles snapshotFiles) throws IOException {
shardId, snapshotId), e);
recoveryTargetMetadata = Store.MetadataSnapshot.EMPTY;
}

final List<BlobStoreIndexShardSnapshot.FileInfo> filesToRecover = new ArrayList<>();
final Map<String, StoreFileMetaData> snapshotMetaData = new HashMap<>();
final Map<String, BlobStoreIndexShardSnapshot.FileInfo> fileInfos = new HashMap<>();
Expand Down Expand Up @@ -157,7 +152,7 @@ public void restore(SnapshotFiles snapshotFiles) throws IOException {
final Store.RecoveryDiff diff = sourceMetaData.recoveryDiff(recoveryTargetMetadata);
for (StoreFileMetaData md : diff.identical) {
BlobStoreIndexShardSnapshot.FileInfo fileInfo = fileInfos.get(md.name());
recoveryState.getIndex().addFileDetail(fileInfo.name(), fileInfo.length(), true);
recoveryState.getIndex().addFileDetail(fileInfo.physicalName(), fileInfo.length(), true);
if (logger.isTraceEnabled()) {
logger.trace("[{}] [{}] not_recovering file [{}] from [{}], exists in local store and is same", shardId, snapshotId,
fileInfo.physicalName(), fileInfo.name());
Expand All @@ -167,7 +162,7 @@ public void restore(SnapshotFiles snapshotFiles) throws IOException {
for (StoreFileMetaData md : concat(diff)) {
BlobStoreIndexShardSnapshot.FileInfo fileInfo = fileInfos.get(md.name());
filesToRecover.add(fileInfo);
recoveryState.getIndex().addFileDetail(fileInfo.name(), fileInfo.length(), false);
recoveryState.getIndex().addFileDetail(fileInfo.physicalName(), fileInfo.length(), false);
if (logger.isTraceEnabled()) {
logger.trace("[{}] [{}] recovering [{}] from [{}]", shardId, snapshotId,
fileInfo.physicalName(), fileInfo.name());
Expand Down Expand Up @@ -260,7 +255,7 @@ private void restoreFile(final BlobStoreIndexShardSnapshot.FileInfo fileInfo, fi
int length;
while ((length = stream.read(buffer)) > 0) {
indexOutput.writeBytes(buffer, 0, length);
recoveryState.getIndex().addRecoveredBytesToFile(fileInfo.name(), length);
recoveryState.getIndex().addRecoveredBytesToFile(fileInfo.physicalName(), length);
}
Store.verify(indexOutput);
indexOutput.close();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -365,8 +365,7 @@ private void snapshot(final IndexShard indexShard, final Snapshot snapshot, fina
try {
// we flush first to make sure we get the latest writes snapshotted
try (Engine.IndexCommitRef snapshotRef = indexShard.acquireLastIndexCommit(true)) {
repository.snapshotShard(indexShard, indexShard.store(), snapshot.getSnapshotId(), indexId, snapshotRef.getIndexCommit(),
snapshotStatus);
repository.snapshotShard(indexShard, snapshot.getSnapshotId(), indexId, snapshotRef.getIndexCommit(), snapshotStatus);
if (logger.isDebugEnabled()) {
final IndexShardSnapshotStatus.Copy lastSnapshotStatus = snapshotStatus.asCopy();
logger.debug("snapshot ({}) completed to {} with {}", snapshot, repository, lastSnapshotStatus);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2308,8 +2308,8 @@ public void testRestoreShard() throws IOException {
target.markAsRecovering("store", new RecoveryState(routing, localNode, null));
assertTrue(target.restoreFromRepository(new RestoreOnlyRepository("test") {
@Override
public void restoreShard(IndexShard shard, SnapshotId snapshotId, Version version, IndexId indexId, ShardId snapshotShardId,
RecoveryState recoveryState) {
public void restoreShard(Store store, SnapshotId snapshotId,
Version version, IndexId indexId, ShardId snapshotShardId, RecoveryState recoveryState) {
try {
cleanLuceneIndex(targetStore.directory());
for (String file : sourceStore.directory().listAll()) {
Expand Down
Loading

0 comments on commit a79cd77

Please sign in to comment.