Skip to content

Commit

Permalink
Track Shard-Snapshot Index Generation at Repository Root (elastic#46250)
Browse files Browse the repository at this point in the history
This change adds a new field `"shards"` to `RepositoryData` that contains a mapping of `IndexId` to a `String[]`. This string array can be accessed by shard id to get the generation of a shard's shard folder (i.e. the `N` in the name of the currently valid `/indices/${indexId}/${shardId}/index-${N}` for the shard in question).

This allows for creating a new snapshot in the shard without doing any LIST operations on the shard's folder. In the case of AWS S3, this saves about 1/3 of the cost for updating an empty shard (see elastic#45736) and removes one out of two remaining potential issues with eventually consistent blob stores (see elastic#38941 ... now only the root `index-${N}` is determined by listing).

Also and equally if not more important, a number of possible failure modes on eventually consistent blob stores like AWS S3 are eliminated by moving all delete operations to the `master` node and moving from incremental naming of shard level index-N to uuid suffixes for these blobs.

This change moves the deleting of the previous shard level `index-${uuid}` blob to the master node instead of the data node allowing for a safe and consistent update of the shard's generation in the `RepositoryData` by first updating `RepositoryData` and then deleting the now unreferenced `index-${newUUID}` blob.
__No deletes are executed on the data nodes at all for any operation with this change.__

Note also: Previous issues with hanging data nodes interfering with master nodes are completely impossible, even on S3 (see next section for details).

This change changes the naming of the shard level `index-${N}` blobs to a uuid suffix `index-${UUID}`. The reason for this is the fact that writing a new shard-level `index-` generation blob is not atomic anymore in its effect. Not only does the blob have to be written to have an effect, it must also be referenced by the root level `index-N` (`RepositoryData`) to become an effective part of the snapshot repository.
This leads to a problem if we were to use incrementing names like we did before. If a blob `index-${N+1}` is written but due to the node/network/cluster/... crashes the root level `RepositoryData` has not been updated then a future operation will determine the shard's generation to be `N` and try to write a new `index-${N+1}` to the already existing path. Updates like that are problematic on S3 for consistency reasons, but also create numerous issues when thinking about stuck data nodes.
Previously stuck data nodes that were tasked to write `index-${N+1}` but got stuck and tried to do so after some other node had already written `index-${N+1}` were prevented form doing so (except for on S3) by us not allowing overwrites for that blob and thus no corruption could occur.
Were we to continue using incrementing names, we could not do this. The stuck node scenario would either allow for overwriting the `N+1` generation or force us to continue using a `LIST` operation to figure out the next `N` (which would make this change pointless).
With uuid naming and moving all deletes to `master` this becomes a non-issue. Data nodes write updated shard generation `index-${uuid}` and `master` makes those `index-${uuid}` part of the `RepositoryData` that it deems correct and cleans up all those `index-` that are unused.

Co-authored-by: Yannick Welsch <[email protected]>
Co-authored-by: Tanguy Leroux <[email protected]>
  • Loading branch information
3 people committed Oct 23, 2019
1 parent 0f84675 commit 0b6fe77
Show file tree
Hide file tree
Showing 29 changed files with 1,002 additions and 358 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import org.elasticsearch.repositories.Repository;
import org.elasticsearch.repositories.RepositoryCleanupResult;
import org.elasticsearch.repositories.blobstore.BlobStoreRepository;
import org.elasticsearch.snapshots.SnapshotsService;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;

Expand Down Expand Up @@ -200,7 +201,9 @@ public void clusterStateProcessed(String source, ClusterState oldState, ClusterS
logger.debug("Initialized repository cleanup in cluster state for [{}][{}]", repositoryName, repositoryStateId);
threadPool.executor(ThreadPool.Names.SNAPSHOT).execute(ActionRunnable.wrap(listener,
l -> blobStoreRepository.cleanup(
repositoryStateId, ActionListener.wrap(result -> after(null, result), e -> after(e, null)))));
repositoryStateId,
newState.nodes().getMinNodeVersion().onOrAfter(SnapshotsService.SHARD_GEN_IN_REPO_DATA_VERSION),
ActionListener.wrap(result -> after(null, result), e -> after(e, null)))));
}

private void after(@Nullable Exception failure, @Nullable RepositoryCleanupResult result) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,12 +91,14 @@ public static class Entry implements ToXContent {
private final ImmutableOpenMap<String, List<ShardId>> waitingIndices;
private final long startTime;
private final long repositoryStateId;
// see #useShardGenerations
private final boolean useShardGenerations;
@Nullable private final Map<String, Object> userMetadata;
@Nullable private final String failure;

public Entry(Snapshot snapshot, boolean includeGlobalState, boolean partial, State state, List<IndexId> indices,
long startTime, long repositoryStateId, ImmutableOpenMap<ShardId, ShardSnapshotStatus> shards,
String failure, Map<String, Object> userMetadata) {
String failure, Map<String, Object> userMetadata, boolean useShardGenerations) {
this.state = state;
this.snapshot = snapshot;
this.includeGlobalState = includeGlobalState;
Expand All @@ -114,6 +116,7 @@ public Entry(Snapshot snapshot, boolean includeGlobalState, boolean partial, Sta
this.repositoryStateId = repositoryStateId;
this.failure = failure;
this.userMetadata = userMetadata;
this.useShardGenerations = useShardGenerations;
}

private static boolean assertShardsConsistent(State state, List<IndexId> indices,
Expand All @@ -128,20 +131,22 @@ private static boolean assertShardsConsistent(State state, List<IndexId> indices
: "Indices in shards " + indexNamesInShards + " differ from expected indices " + indexNames + " for state [" + state + "]";
return true;
}

public Entry(Snapshot snapshot, boolean includeGlobalState, boolean partial, State state, List<IndexId> indices,
long startTime, long repositoryStateId, ImmutableOpenMap<ShardId, ShardSnapshotStatus> shards,
Map<String, Object> userMetadata) {
this(snapshot, includeGlobalState, partial, state, indices, startTime, repositoryStateId, shards, null, userMetadata);
Map<String, Object> userMetadata, boolean useShardGenerations) {
this(snapshot, includeGlobalState, partial, state, indices, startTime, repositoryStateId, shards, null, userMetadata,
useShardGenerations);
}

public Entry(Entry entry, State state, ImmutableOpenMap<ShardId, ShardSnapshotStatus> shards) {
this(entry.snapshot, entry.includeGlobalState, entry.partial, state, entry.indices, entry.startTime,
entry.repositoryStateId, shards, entry.failure, entry.userMetadata);
entry.repositoryStateId, shards, entry.failure, entry.userMetadata, entry.useShardGenerations);
}

public Entry(Entry entry, State state, ImmutableOpenMap<ShardId, ShardSnapshotStatus> shards, String failure) {
this(entry.snapshot, entry.includeGlobalState, entry.partial, state, entry.indices, entry.startTime,
entry.repositoryStateId, shards, failure, entry.userMetadata);
entry.repositoryStateId, shards, failure, entry.userMetadata, entry.useShardGenerations);
}

public Entry(Entry entry, ImmutableOpenMap<ShardId, ShardSnapshotStatus> shards) {
Expand Down Expand Up @@ -192,6 +197,16 @@ public String failure() {
return failure;
}

/**
* Whether to write to the repository in a format only understood by versions newer than
* {@link SnapshotsService#SHARD_GEN_IN_REPO_DATA_VERSION}.
*
* @return true if writing to repository in new format
*/
public boolean useShardGenerations() {
return useShardGenerations;
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
Expand All @@ -207,6 +222,7 @@ public boolean equals(Object o) {
if (!snapshot.equals(entry.snapshot)) return false;
if (state != entry.state) return false;
if (repositoryStateId != entry.repositoryStateId) return false;
if (useShardGenerations != entry.useShardGenerations) return false;

return true;
}
Expand All @@ -221,6 +237,7 @@ public int hashCode() {
result = 31 * result + indices.hashCode();
result = 31 * result + Long.hashCode(startTime);
result = 31 * result + Long.hashCode(repositoryStateId);
result = 31 * result + (useShardGenerations ? 1 : 0);
return result;
}

Expand Down Expand Up @@ -518,6 +535,12 @@ public SnapshotsInProgress(StreamInput in) throws IOException {
if (in.getVersion().onOrAfter(METADATA_FIELD_INTRODUCED)) {
userMetadata = in.readMap();
}
final boolean useShardGenerations;
if (in.getVersion().onOrAfter(SnapshotsService.SHARD_GEN_IN_REPO_DATA_VERSION)) {
useShardGenerations = in.readBoolean();
} else {
useShardGenerations = false;
}
entries[i] = new Entry(snapshot,
includeGlobalState,
partial,
Expand All @@ -527,7 +550,8 @@ public SnapshotsInProgress(StreamInput in) throws IOException {
repositoryStateId,
builder.build(),
failure,
userMetadata
userMetadata,
useShardGenerations
);
}
this.entries = Arrays.asList(entries);
Expand Down Expand Up @@ -563,6 +587,9 @@ public void writeTo(StreamOutput out) throws IOException {
if (out.getVersion().onOrAfter(METADATA_FIELD_INTRODUCED)) {
out.writeMap(entry.userMetadata);
}
if (out.getVersion().onOrAfter(SnapshotsService.SHARD_GEN_IN_REPO_DATA_VERSION)) {
out.writeBoolean(entry.useShardGenerations);
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,16 +78,17 @@ public void initializeSnapshot(SnapshotId snapshotId, List<IndexId> indices, Met
}

@Override
public void finalizeSnapshot(SnapshotId snapshotId, List<IndexId> indices, long startTime, String failure, int totalShards,
List<SnapshotShardFailure> shardFailures, long repositoryStateId, boolean includeGlobalState,
MetaData metaData, Map<String, Object> userMetadata, ActionListener<SnapshotInfo> listener) {
in.finalizeSnapshot(snapshotId, indices, startTime, failure, totalShards, shardFailures, repositoryStateId,
includeGlobalState, metaData, userMetadata, listener);
public void finalizeSnapshot(SnapshotId snapshotId, ShardGenerations shardGenerations, long startTime, String failure,
int totalShards, List<SnapshotShardFailure> shardFailures, long repositoryStateId,
boolean includeGlobalState, MetaData metaData, Map<String, Object> userMetadata,
boolean writeShardGens, ActionListener<SnapshotInfo> listener) {
in.finalizeSnapshot(snapshotId, shardGenerations, startTime, failure, totalShards, shardFailures, repositoryStateId,
includeGlobalState, metaData, userMetadata, writeShardGens, listener);
}

@Override
public void deleteSnapshot(SnapshotId snapshotId, long repositoryStateId, ActionListener<Void> listener) {
in.deleteSnapshot(snapshotId, repositoryStateId, listener);
public void deleteSnapshot(SnapshotId snapshotId, long repositoryStateId, boolean writeShardGens, ActionListener<Void> listener) {
in.deleteSnapshot(snapshotId, repositoryStateId, writeShardGens, listener);
}

@Override
Expand Down Expand Up @@ -122,8 +123,9 @@ public boolean isReadOnly() {

@Override
public void snapshotShard(Store store, MapperService mapperService, SnapshotId snapshotId, IndexId indexId,
IndexCommit snapshotIndexCommit, IndexShardSnapshotStatus snapshotStatus, ActionListener<String> listener) {
in.snapshotShard(store, mapperService, snapshotId, indexId, snapshotIndexCommit, snapshotStatus, listener);
IndexCommit snapshotIndexCommit, IndexShardSnapshotStatus snapshotStatus, boolean writeShardGens,
ActionListener<String> listener) {
in.snapshotShard(store, mapperService, snapshotId, indexId, snapshotIndexCommit, snapshotStatus, writeShardGens, listener);
}
@Override
public void restoreShard(Store store, SnapshotId snapshotId, IndexId indexId, ShardId snapshotShardId, RecoveryState recoveryState) {
Expand Down
33 changes: 19 additions & 14 deletions server/src/main/java/org/elasticsearch/repositories/Repository.java
Original file line number Diff line number Diff line change
Expand Up @@ -125,28 +125,33 @@ default Repository create(RepositoryMetaData metaData, Function<String, Reposito
* <p>
* This method is called on master after all shards are snapshotted.
*
* @param snapshotId snapshot id
* @param indices list of indices in the snapshot
* @param startTime start time of the snapshot
* @param failure global failure reason or null
* @param totalShards total number of shards
* @param shardFailures list of shard failures
* @param repositoryStateId the unique id identifying the state of the repository when the snapshot began
* @param snapshotId snapshot id
* @param shardGenerations updated shard generations
* @param startTime start time of the snapshot
* @param failure global failure reason or null
* @param totalShards total number of shards
* @param shardFailures list of shard failures
* @param repositoryStateId the unique id identifying the state of the repository when the snapshot began
* @param includeGlobalState include cluster global state
* @param clusterMetaData cluster metadata
* @param userMetadata user metadata
* @param writeShardGens if shard generations should be written to the repository
* @param listener listener to be called on completion of the snapshot
*/
void finalizeSnapshot(SnapshotId snapshotId, List<IndexId> indices, long startTime, String failure, int totalShards,
List<SnapshotShardFailure> shardFailures, long repositoryStateId, boolean includeGlobalState,
MetaData clusterMetaData, Map<String, Object> userMetadata, ActionListener<SnapshotInfo> listener);
void finalizeSnapshot(SnapshotId snapshotId, ShardGenerations shardGenerations, long startTime, String failure,
int totalShards, List<SnapshotShardFailure> shardFailures, long repositoryStateId,
boolean includeGlobalState, MetaData clusterMetaData, Map<String, Object> userMetadata,
boolean writeShardGens, ActionListener<SnapshotInfo> listener);

/**
* Deletes snapshot
*
* @param snapshotId snapshot id
* @param snapshotId snapshot id
* @param repositoryStateId the unique id identifying the state of the repository when the snapshot deletion began
* @param listener completion listener
* @param writeShardGens if shard generations should be written to the repository
* @param listener completion listener
*/
void deleteSnapshot(SnapshotId snapshotId, long repositoryStateId, ActionListener<Void> listener);
void deleteSnapshot(SnapshotId snapshotId, long repositoryStateId, boolean writeShardGens, ActionListener<Void> listener);

/**
* Returns snapshot throttle time in nanoseconds
Expand Down Expand Up @@ -208,7 +213,7 @@ void finalizeSnapshot(SnapshotId snapshotId, List<IndexId> indices, long startTi
* @param listener listener invoked on completion
*/
void snapshotShard(Store store, MapperService mapperService, SnapshotId snapshotId, IndexId indexId, IndexCommit snapshotIndexCommit,
IndexShardSnapshotStatus snapshotStatus, ActionListener<String> listener);
IndexShardSnapshotStatus snapshotStatus, boolean writeShardGens, ActionListener<String> listener);

/**
* Restores snapshot of the shard.
Expand Down
Loading

0 comments on commit 0b6fe77

Please sign in to comment.