Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove IndexShard dependency from Repository #42213

Merged
merged 10 commits into from
May 22, 2019
Original file line number Diff line number Diff line change
Expand Up @@ -468,7 +468,8 @@ private void restore(final IndexShard indexShard, final Repository repository, f
snapshotShardId = new ShardId(indexName, IndexMetaData.INDEX_UUID_NA_VALUE, shardId.id());
}
final IndexId indexId = repository.getRepositoryData().resolveIndexId(indexName);
repository.restoreShard(indexShard, restoreSource.snapshot().getSnapshotId(),
assert indexShard.getEngineOrNull() == null;
repository.restoreShard(indexShard, indexShard.store(), restoreSource.snapshot().getSnapshotId(),
restoreSource.version(), indexId, snapshotShardId, indexShard.recoveryState());
final Store store = indexShard.store();
store.bootstrapNewHistory();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.component.Lifecycle;
import org.elasticsearch.common.component.LifecycleListener;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.mapper.MapperService;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.snapshots.IndexShardSnapshotStatus;
import org.elasticsearch.index.store.Store;
Expand Down Expand Up @@ -119,16 +119,17 @@ public boolean isReadOnly() {
return in.isReadOnly();
}


@Override
public void snapshotShard(IndexShard shard, Store store, SnapshotId snapshotId, IndexId indexId, IndexCommit snapshotIndexCommit,
IndexShardSnapshotStatus snapshotStatus) {
in.snapshotShard(shard, store, snapshotId, indexId, snapshotIndexCommit, snapshotStatus);
public void snapshotShard(Store store, MapperService mapperService, SnapshotId snapshotId, IndexId indexId,
IndexCommit snapshotIndexCommit, IndexShardSnapshotStatus snapshotStatus) {
in.snapshotShard(store, mapperService, snapshotId, indexId, snapshotIndexCommit, snapshotStatus);
}

@Override
public void restoreShard(IndexShard shard, SnapshotId snapshotId, Version version, IndexId indexId, ShardId snapshotShardId,
RecoveryState recoveryState) {
in.restoreShard(shard, snapshotId, version, indexId, snapshotShardId, recoveryState);
public void restoreShard(Store store, SnapshotId snapshotId,
Version version, IndexId indexId, ShardId snapshotShardId, RecoveryState recoveryState) {
in.restoreShard(store, snapshotId, version, indexId, snapshotShardId, recoveryState);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.elasticsearch.cluster.metadata.RepositoryMetaData;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.component.LifecycleComponent;
import org.elasticsearch.index.mapper.MapperService;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.snapshots.IndexShardSnapshotStatus;
Expand All @@ -49,7 +50,7 @@
* <ul>
* <li>Master calls {@link #initializeSnapshot(SnapshotId, List, org.elasticsearch.cluster.metadata.MetaData)}
* with list of indices that will be included into the snapshot</li>
* <li>Data nodes call {@link Repository#snapshotShard(IndexShard, Store, SnapshotId, IndexId, IndexCommit, IndexShardSnapshotStatus)}
* <li>Data nodes call {@link Repository#snapshotShard(Store, MapperService, SnapshotId, IndexId, IndexCommit, IndexShardSnapshotStatus)}
* for each shard</li>
* <li>When all shard calls return master calls {@link #finalizeSnapshot} with possible list of failures</li>
* </ul>
Expand Down Expand Up @@ -196,30 +197,69 @@ SnapshotInfo finalizeSnapshot(SnapshotId snapshotId, List<IndexId> indices, long
* <p>
* As snapshot process progresses, implementation of this method should update {@link IndexShardSnapshotStatus} object and check
* {@link IndexShardSnapshotStatus#isAborted()} to see if the snapshot process should be aborted.
* @param shard shard to be snapshotted
* @param indexShard the shard to be snapshotted
* @param snapshotId snapshot id
* @param indexId id for the index being snapshotted
* @param snapshotIndexCommit commit point
* @param snapshotStatus snapshot status
* @deprecated use {@link #snapshotShard(Store, MapperService, SnapshotId, IndexId, IndexCommit, IndexShardSnapshotStatus)} instead
Copy link
Member

@original-brownbear original-brownbear May 20, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if we even need this deprecation (instead of just outright changing the interface)? We will need to make changes to this interface anyway to fix #41581 so maybe it's not worth being careful here when we have to break 3rd party implementations (not sure if there even are any) to fix functionality anyway?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not drop the remaining deprecations as well 4a1bd35 ? :)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

well I can drop it in a followup but I want to backport this change to 7.x and there the deprecation is mandatory IMO

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Interesting :) I don't wanna take this one off-topic but I have to ask:

If we don't want to break the Repository interface in 7.x that makes fixing #41581 in 7.x effectively impossible. We simply need to change what initialize snapshot and finalize snapshot do to get that one right.
So if we really can't break the contract for that one, we'd have to add new versions of initialize and finalize and keep the deprecated ones with broken behavior around?
(note that we also made a number of breaking changes to BlobStoreRepository itself lately and are planning to make even bigger ones in #42189. Those would break any 3rd party implementation of BlobStoreRepository as well. Should we tread more carefully there than planned?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I personally see this being a different reason. If you need to break an interface for the sake of fixing a bug or a correctness issue we should and can break it in a minor release. If we just factor stuff out to make it more testable I think being nice to a user is a wise decision. I will for sure remove the deprecated methods from master once it's backported.

*/
@Deprecated
default void snapshotShard(IndexShard indexShard, SnapshotId snapshotId, IndexId indexId, IndexCommit snapshotIndexCommit,
IndexShardSnapshotStatus snapshotStatus) {
snapshotShard(indexShard.store(), indexShard.mapperService(), snapshotId, indexId, snapshotIndexCommit, snapshotStatus);
}

/**
* Creates a snapshot of the shard based on the index commit point.
* <p>
* The index commit point can be obtained by using {@link org.elasticsearch.index.engine.Engine#acquireLastIndexCommit} method.
* Repository implementations shouldn't release the snapshot index commit point. It is done by the method caller.
* <p>
* As snapshot process progresses, implementation of this method should update {@link IndexShardSnapshotStatus} object and check
* {@link IndexShardSnapshotStatus#isAborted()} to see if the snapshot process should be aborted.
* @param store store to be snapshotted
* @param mapperService the shards mapper service
* @param snapshotId snapshot id
* @param indexId id for the index being snapshotted
* @param snapshotIndexCommit commit point
* @param snapshotStatus snapshot status
*/
void snapshotShard(IndexShard shard, Store store, SnapshotId snapshotId, IndexId indexId, IndexCommit snapshotIndexCommit,
void snapshotShard(Store store, MapperService mapperService, SnapshotId snapshotId, IndexId indexId, IndexCommit snapshotIndexCommit,
IndexShardSnapshotStatus snapshotStatus);

/**
* Restores snapshot of the shard.
* <p>
* The index can be renamed on restore, hence different {@code shardId} and {@code snapshotShardId} are supplied.
*
* @param shard the shard to restore the index into
* @param store the store to restore the index into
* @param snapshotId snapshot id
* @param version version of elasticsearch that created this snapshot
* @param indexId id of the index in the repository from which the restore is occurring
* @param snapshotShardId shard id (in the snapshot)
* @param recoveryState recovery state
* @deprecated use {@link #restoreShard(Store, SnapshotId, Version, IndexId, ShardId, RecoveryState)} instead
*/
@Deprecated
default void restoreShard(IndexShard shard, Store store, SnapshotId snapshotId, Version version, IndexId indexId,
ShardId snapshotShardId, RecoveryState recoveryState) {
restoreShard(store, snapshotId, version, indexId, snapshotShardId, recoveryState);
}

/**
* Restores snapshot of the shard.
* <p>
* The index can be renamed on restore, hence different {@code shardId} and {@code snapshotShardId} are supplied.
* @param store the store to restore the index into
* @param snapshotId snapshot id
* @param version version of elasticsearch that created this snapshot
* @param indexId id of the index in the repository from which the restore is occurring
* @param snapshotShardId shard id (in the snapshot)
* @param recoveryState recovery state
*/
void restoreShard(IndexShard shard, SnapshotId snapshotId, Version version, IndexId indexId,
ShardId snapshotShardId, RecoveryState recoveryState);
void restoreShard(Store store, SnapshotId snapshotId, Version version, IndexId indexId, ShardId snapshotShardId,
RecoveryState recoveryState);

/**
* Retrieve shard snapshot status for the stored snapshot
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.core.internal.io.Streams;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.mapper.MapperService;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.snapshots.IndexShardRestoreFailedException;
import org.elasticsearch.index.snapshots.IndexShardSnapshotException;
Expand Down Expand Up @@ -829,8 +829,8 @@ private void writeAtomic(final String blobName, final BytesReference bytesRef, b
}

@Override
public void snapshotShard(IndexShard shard, Store store, SnapshotId snapshotId, IndexId indexId, IndexCommit snapshotIndexCommit,
IndexShardSnapshotStatus snapshotStatus) {
public void snapshotShard(Store store, MapperService mapperService, SnapshotId snapshotId, IndexId indexId,
IndexCommit snapshotIndexCommit, IndexShardSnapshotStatus snapshotStatus) {
SnapshotContext snapshotContext = new SnapshotContext(store, snapshotId, indexId, snapshotStatus, System.currentTimeMillis());
try {
snapshotContext.snapshot(snapshotIndexCommit);
Expand All @@ -845,18 +845,19 @@ public void snapshotShard(IndexShard shard, Store store, SnapshotId snapshotId,
}

@Override
public void restoreShard(IndexShard shard, SnapshotId snapshotId, Version version, IndexId indexId, ShardId snapshotShardId,
RecoveryState recoveryState) {
final Context context = new Context(snapshotId, indexId, shard.shardId(), snapshotShardId);
public void restoreShard(Store store, SnapshotId snapshotId,
Version version, IndexId indexId, ShardId snapshotShardId, RecoveryState recoveryState) {
ShardId shardId = store.shardId();
final Context context = new Context(snapshotId, indexId, shardId, snapshotShardId);
BlobPath path = basePath().add("indices").add(indexId.getId()).add(Integer.toString(snapshotShardId.getId()));
BlobContainer blobContainer = blobStore().blobContainer(path);
final RestoreContext snapshotContext = new RestoreContext(shard, snapshotId, recoveryState, blobContainer);
final RestoreContext snapshotContext = new RestoreContext(shardId, snapshotId, recoveryState, blobContainer);
try {
BlobStoreIndexShardSnapshot snapshot = context.loadSnapshot();
SnapshotFiles snapshotFiles = new SnapshotFiles(snapshot.snapshot(), snapshot.indexFiles());
snapshotContext.restore(snapshotFiles);
snapshotContext.restore(snapshotFiles, store);
} catch (Exception e) {
throw new IndexShardRestoreFailedException(shard.shardId(), "failed to restore snapshot [" + snapshotId + "]", e);
throw new IndexShardRestoreFailedException(shardId, "failed to restore snapshot [" + snapshotId + "]", e);
}
}

Expand Down Expand Up @@ -1402,13 +1403,13 @@ private class RestoreContext extends FileRestoreContext {

/**
* Constructs new restore context
* @param indexShard shard to restore into
* @param shardId shard id to restore into
* @param snapshotId snapshot id
* @param recoveryState recovery state to report progress
* @param blobContainer the blob container to read the files from
*/
RestoreContext(IndexShard indexShard, SnapshotId snapshotId, RecoveryState recoveryState, BlobContainer blobContainer) {
super(metadata.name(), indexShard, snapshotId, recoveryState, BUFFER_SIZE);
RestoreContext(ShardId shardId, SnapshotId snapshotId, RecoveryState recoveryState, BlobContainer blobContainer) {
super(metadata.name(), shardId, snapshotId, recoveryState, BUFFER_SIZE);
this.blobContainer = blobContainer;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@
import org.apache.lucene.util.BytesRefBuilder;
import org.elasticsearch.common.lucene.Lucene;
import org.elasticsearch.common.util.iterable.Iterables;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.snapshots.IndexShardRestoreFailedException;
import org.elasticsearch.index.snapshots.blobstore.BlobStoreIndexShardSnapshot;
Expand Down Expand Up @@ -64,7 +63,6 @@ public abstract class FileRestoreContext {
protected static final Logger logger = LogManager.getLogger(FileRestoreContext.class);

protected final String repositoryName;
protected final IndexShard indexShard;
protected final RecoveryState recoveryState;
protected final SnapshotId snapshotId;
protected final ShardId shardId;
Expand All @@ -73,26 +71,24 @@ public abstract class FileRestoreContext {
/**
* Constructs new restore context
*
* @param indexShard shard to restore into
* @param shardId shard id to restore into
* @param snapshotId snapshot id
* @param recoveryState recovery state to report progress
* @param bufferSize buffer size for restore
*/
protected FileRestoreContext(String repositoryName, IndexShard indexShard, SnapshotId snapshotId, RecoveryState recoveryState,
protected FileRestoreContext(String repositoryName, ShardId shardId, SnapshotId snapshotId, RecoveryState recoveryState,
int bufferSize) {
this.repositoryName = repositoryName;
this.recoveryState = recoveryState;
this.indexShard = indexShard;
this.snapshotId = snapshotId;
this.shardId = indexShard.shardId();
this.shardId = shardId;
this.bufferSize = bufferSize;
}

/**
* Performs restore operation
*/
public void restore(SnapshotFiles snapshotFiles) throws IOException {
final Store store = indexShard.store();
public void restore(SnapshotFiles snapshotFiles, Store store) throws IOException {
store.incRef();
try {
logger.debug("[{}] [{}] restoring to [{}] ...", snapshotId, repositoryName, shardId);
Expand All @@ -108,7 +104,7 @@ public void restore(SnapshotFiles snapshotFiles) throws IOException {
// version number and no checksum, even though the index itself is perfectly fine to restore, this
// empty shard would cause exceptions to be thrown. Since there is no data to restore from an empty
// shard anyway, we just create the empty shard here and then exit.
store.createEmpty(indexShard.indexSettings().getIndexMetaData().getCreationVersion().luceneVersion);
store.createEmpty(store.indexSettings().getIndexVersionCreated().luceneVersion);
return;
}

Expand All @@ -117,7 +113,7 @@ public void restore(SnapshotFiles snapshotFiles) throws IOException {
// this will throw an IOException if the store has no segments infos file. The
// store can still have existing files but they will be deleted just before being
// restored.
recoveryTargetMetadata = indexShard.snapshotStoreMetadata();
recoveryTargetMetadata = store.getMetadata(null, true);
} catch (org.apache.lucene.index.IndexNotFoundException e) {
// happens when restore to an empty shard, not a big deal
logger.trace("[{}] [{}] restoring from to an empty shard", shardId, snapshotId);
Expand All @@ -127,7 +123,6 @@ public void restore(SnapshotFiles snapshotFiles) throws IOException {
shardId, snapshotId), e);
recoveryTargetMetadata = Store.MetadataSnapshot.EMPTY;
}

final List<BlobStoreIndexShardSnapshot.FileInfo> filesToRecover = new ArrayList<>();
final Map<String, StoreFileMetaData> snapshotMetaData = new HashMap<>();
final Map<String, BlobStoreIndexShardSnapshot.FileInfo> fileInfos = new HashMap<>();
Expand Down Expand Up @@ -157,7 +152,7 @@ public void restore(SnapshotFiles snapshotFiles) throws IOException {
final Store.RecoveryDiff diff = sourceMetaData.recoveryDiff(recoveryTargetMetadata);
for (StoreFileMetaData md : diff.identical) {
BlobStoreIndexShardSnapshot.FileInfo fileInfo = fileInfos.get(md.name());
recoveryState.getIndex().addFileDetail(fileInfo.name(), fileInfo.length(), true);
recoveryState.getIndex().addFileDetail(fileInfo.physicalName(), fileInfo.length(), true);
if (logger.isTraceEnabled()) {
logger.trace("[{}] [{}] not_recovering file [{}] from [{}], exists in local store and is same", shardId, snapshotId,
fileInfo.physicalName(), fileInfo.name());
Expand All @@ -167,7 +162,7 @@ public void restore(SnapshotFiles snapshotFiles) throws IOException {
for (StoreFileMetaData md : concat(diff)) {
BlobStoreIndexShardSnapshot.FileInfo fileInfo = fileInfos.get(md.name());
filesToRecover.add(fileInfo);
recoveryState.getIndex().addFileDetail(fileInfo.name(), fileInfo.length(), false);
recoveryState.getIndex().addFileDetail(fileInfo.physicalName(), fileInfo.length(), false);
if (logger.isTraceEnabled()) {
logger.trace("[{}] [{}] recovering [{}] from [{}]", shardId, snapshotId,
fileInfo.physicalName(), fileInfo.name());
Expand Down Expand Up @@ -260,7 +255,7 @@ private void restoreFile(final BlobStoreIndexShardSnapshot.FileInfo fileInfo, fi
int length;
while ((length = stream.read(buffer)) > 0) {
indexOutput.writeBytes(buffer, 0, length);
recoveryState.getIndex().addRecoveredBytesToFile(fileInfo.name(), length);
recoveryState.getIndex().addRecoveredBytesToFile(fileInfo.physicalName(), length);
}
Store.verify(indexOutput);
indexOutput.close();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -365,8 +365,7 @@ private void snapshot(final IndexShard indexShard, final Snapshot snapshot, fina
try {
// we flush first to make sure we get the latest writes snapshotted
try (Engine.IndexCommitRef snapshotRef = indexShard.acquireLastIndexCommit(true)) {
repository.snapshotShard(indexShard, indexShard.store(), snapshot.getSnapshotId(), indexId, snapshotRef.getIndexCommit(),
snapshotStatus);
repository.snapshotShard(indexShard, snapshot.getSnapshotId(), indexId, snapshotRef.getIndexCommit(), snapshotStatus);
if (logger.isDebugEnabled()) {
final IndexShardSnapshotStatus.Copy lastSnapshotStatus = snapshotStatus.asCopy();
logger.debug("snapshot ({}) completed to {} with {}", snapshot, repository, lastSnapshotStatus);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2197,8 +2197,8 @@ public void testRestoreShard() throws IOException {
target.markAsRecovering("store", new RecoveryState(routing, localNode, null));
assertTrue(target.restoreFromRepository(new RestoreOnlyRepository("test") {
@Override
public void restoreShard(IndexShard shard, SnapshotId snapshotId, Version version, IndexId indexId, ShardId snapshotShardId,
RecoveryState recoveryState) {
public void restoreShard(Store store, SnapshotId snapshotId,
Version version, IndexId indexId, ShardId snapshotShardId, RecoveryState recoveryState) {
try {
cleanLuceneIndex(targetStore.directory());
for (String file : sourceStore.directory().listAll()) {
Expand Down
Loading