Skip to content

Commit

Permalink
Track Shard-Snapshot Index Generation at Repository Root (#48371)
Browse files Browse the repository at this point in the history
This change adds a new field `"shards"` to `RepositoryData` that contains a mapping of `IndexId` to a `String[]`. This string array can be accessed by shard id to get the generation of a shard's shard folder (i.e. the `N` in the name of the currently valid `/indices/${indexId}/${shardId}/index-${N}` for the shard in question).

This allows for creating a new snapshot in the shard without doing any LIST operations on the shard's folder. In the case of AWS S3, this saves about 1/3 of the cost for updating an empty shard (see #45736) and removes one out of two remaining potential issues with eventually consistent blob stores (see #38941 ... now only the root `index-${N}` is determined by listing).

Also and equally if not more important, a number of possible failure modes on eventually consistent blob stores like AWS S3 are eliminated by moving all delete operations to the `master` node and moving from incremental naming of shard level index-N to uuid suffixes for these blobs.

This change moves the deleting of the previous shard level `index-${uuid}` blob to the master node instead of the data node allowing for a safe and consistent update of the shard's generation in the `RepositoryData` by first updating `RepositoryData` and then deleting the now unreferenced `index-${newUUID}` blob.
__No deletes are executed on the data nodes at all for any operation with this change.__

Note also: Previous issues with hanging data nodes interfering with master nodes are completely impossible, even on S3 (see next section for details).

This change changes the naming of the shard level `index-${N}` blobs to a uuid suffix `index-${UUID}`. The reason for this is the fact that writing a new shard-level `index-` generation blob is not atomic anymore in its effect. Not only does the blob have to be written to have an effect, it must also be referenced by the root level `index-N` (`RepositoryData`) to become an effective part of the snapshot repository.
This leads to a problem if we were to use incrementing names like we did before. If a blob `index-${N+1}` is written but due to the node/network/cluster/... crashes the root level `RepositoryData` has not been updated then a future operation will determine the shard's generation to be `N` and try to write a new `index-${N+1}` to the already existing path. Updates like that are problematic on S3 for consistency reasons, but also create numerous issues when thinking about stuck data nodes.
Previously stuck data nodes that were tasked to write `index-${N+1}` but got stuck and tried to do so after some other node had already written `index-${N+1}` were prevented form doing so (except for on S3) by us not allowing overwrites for that blob and thus no corruption could occur.
Were we to continue using incrementing names, we could not do this. The stuck node scenario would either allow for overwriting the `N+1` generation or force us to continue using a `LIST` operation to figure out the next `N` (which would make this change pointless).
With uuid naming and moving all deletes to `master` this becomes a non-issue. Data nodes write updated shard generation `index-${uuid}` and `master` makes those `index-${uuid}` part of the `RepositoryData` that it deems correct and cleans up all those `index-` that are unused.

Co-authored-by: Yannick Welsch <[email protected]>
Co-authored-by: Tanguy Leroux <[email protected]>
  • Loading branch information
3 people authored Oct 23, 2019
1 parent 50f565b commit 7215201
Show file tree
Hide file tree
Showing 30 changed files with 1,129 additions and 475 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import org.elasticsearch.repositories.Repository;
import org.elasticsearch.repositories.RepositoryCleanupResult;
import org.elasticsearch.repositories.blobstore.BlobStoreRepository;
import org.elasticsearch.snapshots.SnapshotsService;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;

Expand Down Expand Up @@ -200,7 +201,9 @@ public void clusterStateProcessed(String source, ClusterState oldState, ClusterS
logger.debug("Initialized repository cleanup in cluster state for [{}][{}]", repositoryName, repositoryStateId);
threadPool.executor(ThreadPool.Names.SNAPSHOT).execute(ActionRunnable.wrap(listener,
l -> blobStoreRepository.cleanup(
repositoryStateId, ActionListener.wrap(result -> after(null, result), e -> after(e, null)))));
repositoryStateId,
newState.nodes().getMinNodeVersion().onOrAfter(SnapshotsService.SHARD_GEN_IN_REPO_DATA_VERSION),
ActionListener.wrap(result -> after(null, result), e -> after(e, null)))));
}

private void after(@Nullable Exception failure, @Nullable RepositoryCleanupResult result) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.repositories.IndexId;
import org.elasticsearch.snapshots.Snapshot;
import org.elasticsearch.snapshots.SnapshotsService;

import java.io.IOException;
import java.util.ArrayList;
Expand Down Expand Up @@ -90,12 +91,14 @@ public static class Entry implements ToXContent {
private final ImmutableOpenMap<String, List<ShardId>> waitingIndices;
private final long startTime;
private final long repositoryStateId;
// see #useShardGenerations
private final boolean useShardGenerations;
@Nullable private final Map<String, Object> userMetadata;
@Nullable private final String failure;

public Entry(Snapshot snapshot, boolean includeGlobalState, boolean partial, State state, List<IndexId> indices,
long startTime, long repositoryStateId, ImmutableOpenMap<ShardId, ShardSnapshotStatus> shards,
String failure, Map<String, Object> userMetadata) {
String failure, Map<String, Object> userMetadata, boolean useShardGenerations) {
this.state = state;
this.snapshot = snapshot;
this.includeGlobalState = includeGlobalState;
Expand All @@ -113,6 +116,7 @@ public Entry(Snapshot snapshot, boolean includeGlobalState, boolean partial, Sta
this.repositoryStateId = repositoryStateId;
this.failure = failure;
this.userMetadata = userMetadata;
this.useShardGenerations = useShardGenerations;
}

private static boolean assertShardsConsistent(State state, List<IndexId> indices,
Expand All @@ -127,20 +131,22 @@ private static boolean assertShardsConsistent(State state, List<IndexId> indices
: "Indices in shards " + indexNamesInShards + " differ from expected indices " + indexNames + " for state [" + state + "]";
return true;
}

public Entry(Snapshot snapshot, boolean includeGlobalState, boolean partial, State state, List<IndexId> indices,
long startTime, long repositoryStateId, ImmutableOpenMap<ShardId, ShardSnapshotStatus> shards,
Map<String, Object> userMetadata) {
this(snapshot, includeGlobalState, partial, state, indices, startTime, repositoryStateId, shards, null, userMetadata);
Map<String, Object> userMetadata, boolean useShardGenerations) {
this(snapshot, includeGlobalState, partial, state, indices, startTime, repositoryStateId, shards, null, userMetadata,
useShardGenerations);
}

public Entry(Entry entry, State state, ImmutableOpenMap<ShardId, ShardSnapshotStatus> shards) {
this(entry.snapshot, entry.includeGlobalState, entry.partial, state, entry.indices, entry.startTime,
entry.repositoryStateId, shards, entry.failure, entry.userMetadata);
entry.repositoryStateId, shards, entry.failure, entry.userMetadata, entry.useShardGenerations);
}

public Entry(Entry entry, State state, ImmutableOpenMap<ShardId, ShardSnapshotStatus> shards, String failure) {
this(entry.snapshot, entry.includeGlobalState, entry.partial, state, entry.indices, entry.startTime,
entry.repositoryStateId, shards, failure, entry.userMetadata);
entry.repositoryStateId, shards, failure, entry.userMetadata, entry.useShardGenerations);
}

public Entry(Entry entry, ImmutableOpenMap<ShardId, ShardSnapshotStatus> shards) {
Expand Down Expand Up @@ -191,6 +197,16 @@ public String failure() {
return failure;
}

/**
* Whether to write to the repository in a format only understood by versions newer than
* {@link SnapshotsService#SHARD_GEN_IN_REPO_DATA_VERSION}.
*
* @return true if writing to repository in new format
*/
public boolean useShardGenerations() {
return useShardGenerations;
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
Expand All @@ -206,6 +222,7 @@ public boolean equals(Object o) {
if (!snapshot.equals(entry.snapshot)) return false;
if (state != entry.state) return false;
if (repositoryStateId != entry.repositoryStateId) return false;
if (useShardGenerations != entry.useShardGenerations) return false;

return true;
}
Expand All @@ -220,6 +237,7 @@ public int hashCode() {
result = 31 * result + indices.hashCode();
result = 31 * result + Long.hashCode(startTime);
result = 31 * result + Long.hashCode(repositoryStateId);
result = 31 * result + (useShardGenerations ? 1 : 0);
return result;
}

Expand Down Expand Up @@ -307,27 +325,39 @@ public static boolean completed(ObjectContainer<ShardSnapshotStatus> shards) {
public static class ShardSnapshotStatus {
private final ShardState state;
private final String nodeId;

@Nullable
private final String generation;

@Nullable
private final String reason;

public ShardSnapshotStatus(String nodeId) {
this(nodeId, ShardState.INIT);
public ShardSnapshotStatus(String nodeId, String generation) {
this(nodeId, ShardState.INIT, generation);
}

public ShardSnapshotStatus(String nodeId, ShardState state) {
this(nodeId, state, null);
public ShardSnapshotStatus(String nodeId, ShardState state, String generation) {
this(nodeId, state, null, generation);
}

public ShardSnapshotStatus(String nodeId, ShardState state, String reason) {
public ShardSnapshotStatus(String nodeId, ShardState state, String reason, String generation) {
this.nodeId = nodeId;
this.state = state;
this.reason = reason;
this.generation = generation;
// If the state is failed we have to have a reason for this failure
assert state.failed() == false || reason != null;
}

public ShardSnapshotStatus(StreamInput in) throws IOException {
nodeId = in.readOptionalString();
state = ShardState.fromValue(in.readByte());
if (in.getVersion().onOrAfter(SnapshotsService.SHARD_GEN_IN_REPO_DATA_VERSION)) {
generation = in.readOptionalString();
assert generation != null || state != ShardState.SUCCESS : "Received null generation for shard state [" + state + "]";
} else {
generation = null;
}
reason = in.readOptionalString();
}

Expand All @@ -339,13 +369,20 @@ public String nodeId() {
return nodeId;
}

public String generation() {
return this.generation;
}

public String reason() {
return reason;
}

public void writeTo(StreamOutput out) throws IOException {
out.writeOptionalString(nodeId);
out.writeByte(state.value);
if (out.getVersion().onOrAfter(SnapshotsService.SHARD_GEN_IN_REPO_DATA_VERSION)) {
out.writeOptionalString(generation);
}
out.writeOptionalString(reason);
}

Expand All @@ -354,21 +391,22 @@ public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
ShardSnapshotStatus status = (ShardSnapshotStatus) o;
return Objects.equals(nodeId, status.nodeId) && Objects.equals(reason, status.reason) && state == status.state;

return Objects.equals(nodeId, status.nodeId) && Objects.equals(reason, status.reason)
&& Objects.equals(generation, status.generation) && state == status.state;
}

@Override
public int hashCode() {
int result = state != null ? state.hashCode() : 0;
result = 31 * result + (nodeId != null ? nodeId.hashCode() : 0);
result = 31 * result + (reason != null ? reason.hashCode() : 0);
result = 31 * result + (generation != null ? generation.hashCode() : 0);
return result;
}

@Override
public String toString() {
return "ShardSnapshotStatus[state=" + state + ", nodeId=" + nodeId + ", reason=" + reason + "]";
return "ShardSnapshotStatus[state=" + state + ", nodeId=" + nodeId + ", reason=" + reason + ", generation=" + generation + "]";
}
}

Expand Down Expand Up @@ -497,6 +535,12 @@ public SnapshotsInProgress(StreamInput in) throws IOException {
if (in.getVersion().onOrAfter(METADATA_FIELD_INTRODUCED)) {
userMetadata = in.readMap();
}
final boolean useShardGenerations;
if (in.getVersion().onOrAfter(SnapshotsService.SHARD_GEN_IN_REPO_DATA_VERSION)) {
useShardGenerations = in.readBoolean();
} else {
useShardGenerations = false;
}
entries[i] = new Entry(snapshot,
includeGlobalState,
partial,
Expand All @@ -506,7 +550,8 @@ public SnapshotsInProgress(StreamInput in) throws IOException {
repositoryStateId,
builder.build(),
failure,
userMetadata
userMetadata,
useShardGenerations
);
}
this.entries = Arrays.asList(entries);
Expand Down Expand Up @@ -542,6 +587,9 @@ public void writeTo(StreamOutput out) throws IOException {
if (out.getVersion().onOrAfter(METADATA_FIELD_INTRODUCED)) {
out.writeMap(entry.userMetadata);
}
if (out.getVersion().onOrAfter(SnapshotsService.SHARD_GEN_IN_REPO_DATA_VERSION)) {
out.writeBoolean(entry.useShardGenerations);
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ public enum Stage {
}

private final AtomicReference<Stage> stage;
private final AtomicReference<String> generation;
private long startTime;
private long totalTime;
private int incrementalFileCount;
Expand All @@ -71,9 +72,10 @@ public enum Stage {

private IndexShardSnapshotStatus(final Stage stage, final long startTime, final long totalTime,
final int incrementalFileCount, final int totalFileCount, final int processedFileCount,
final long incrementalSize, final long totalSize, final long processedSize,
final long indexVersion, final String failure) {
final long incrementalSize, final long totalSize, final long processedSize, final String failure,
final String generation) {
this.stage = new AtomicReference<>(Objects.requireNonNull(stage));
this.generation = new AtomicReference<>(generation);
this.startTime = startTime;
this.totalTime = totalTime;
this.incrementalFileCount = incrementalFileCount;
Expand All @@ -82,7 +84,6 @@ private IndexShardSnapshotStatus(final Stage stage, final long startTime, final
this.totalSize = totalSize;
this.processedSize = processedSize;
this.incrementalSize = incrementalSize;
this.indexVersion = indexVersion;
this.failure = failure;
}

Expand Down Expand Up @@ -111,9 +112,11 @@ public synchronized Copy moveToFinalize(final long indexVersion) {
return asCopy();
}

public synchronized void moveToDone(final long endTime) {
public synchronized void moveToDone(final long endTime, final String newGeneration) {
assert newGeneration != null;
if (stage.compareAndSet(Stage.FINALIZE, Stage.DONE)) {
this.totalTime = Math.max(0L, endTime - startTime);
this.generation.set(newGeneration);
} else {
throw new IllegalStateException("Unable to move the shard snapshot status to [DONE]: " +
"expecting [FINALIZE] but got [" + stage.get() + "]");
Expand All @@ -133,6 +136,10 @@ public synchronized void moveToFailed(final long endTime, final String failure)
}
}

public String generation() {
return generation.get();
}

public boolean isAborted() {
return stage.get() == Stage.ABORTED;
}
Expand All @@ -158,24 +165,24 @@ public synchronized IndexShardSnapshotStatus.Copy asCopy() {
indexVersion, failure);
}

public static IndexShardSnapshotStatus newInitializing() {
return new IndexShardSnapshotStatus(Stage.INIT, 0L, 0L, 0, 0, 0, 0, 0, 0, 0, null);
public static IndexShardSnapshotStatus newInitializing(String generation) {
return new IndexShardSnapshotStatus(Stage.INIT, 0L, 0L, 0, 0, 0, 0, 0, 0, null, generation);
}

public static IndexShardSnapshotStatus newFailed(final String failure) {
assert failure != null : "expecting non null failure for a failed IndexShardSnapshotStatus";
if (failure == null) {
throw new IllegalArgumentException("A failure description is required for a failed IndexShardSnapshotStatus");
}
return new IndexShardSnapshotStatus(Stage.FAILURE, 0L, 0L, 0, 0, 0, 0, 0, 0, 0, failure);
return new IndexShardSnapshotStatus(Stage.FAILURE, 0L, 0L, 0, 0, 0, 0, 0, 0, failure, null);
}

public static IndexShardSnapshotStatus newDone(final long startTime, final long totalTime,
final int incrementalFileCount, final int fileCount,
final long incrementalSize, final long size) {
final long incrementalSize, final long size, String generation) {
// The snapshot is done which means the number of processed files is the same as total
return new IndexShardSnapshotStatus(Stage.DONE, startTime, totalTime, incrementalFileCount, fileCount, incrementalFileCount,
incrementalSize, size, incrementalSize, 0, null);
incrementalSize, size, incrementalSize, null, generation);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,16 +78,17 @@ public void initializeSnapshot(SnapshotId snapshotId, List<IndexId> indices, Met
}

@Override
public void finalizeSnapshot(SnapshotId snapshotId, List<IndexId> indices, long startTime, String failure, int totalShards,
List<SnapshotShardFailure> shardFailures, long repositoryStateId, boolean includeGlobalState,
MetaData metaData, Map<String, Object> userMetadata, ActionListener<SnapshotInfo> listener) {
in.finalizeSnapshot(snapshotId, indices, startTime, failure, totalShards, shardFailures, repositoryStateId,
includeGlobalState, metaData, userMetadata, listener);
public void finalizeSnapshot(SnapshotId snapshotId, ShardGenerations shardGenerations, long startTime, String failure,
int totalShards, List<SnapshotShardFailure> shardFailures, long repositoryStateId,
boolean includeGlobalState, MetaData metaData, Map<String, Object> userMetadata,
boolean writeShardGens, ActionListener<SnapshotInfo> listener) {
in.finalizeSnapshot(snapshotId, shardGenerations, startTime, failure, totalShards, shardFailures, repositoryStateId,
includeGlobalState, metaData, userMetadata, writeShardGens, listener);
}

@Override
public void deleteSnapshot(SnapshotId snapshotId, long repositoryStateId, ActionListener<Void> listener) {
in.deleteSnapshot(snapshotId, repositoryStateId, listener);
public void deleteSnapshot(SnapshotId snapshotId, long repositoryStateId, boolean writeShardGens, ActionListener<Void> listener) {
in.deleteSnapshot(snapshotId, repositoryStateId, writeShardGens, listener);
}

@Override
Expand Down Expand Up @@ -122,8 +123,9 @@ public boolean isReadOnly() {

@Override
public void snapshotShard(Store store, MapperService mapperService, SnapshotId snapshotId, IndexId indexId,
IndexCommit snapshotIndexCommit, IndexShardSnapshotStatus snapshotStatus, ActionListener<Void> listener) {
in.snapshotShard(store, mapperService, snapshotId, indexId, snapshotIndexCommit, snapshotStatus, listener);
IndexCommit snapshotIndexCommit, IndexShardSnapshotStatus snapshotStatus, boolean writeShardGens,
ActionListener<String> listener) {
in.snapshotShard(store, mapperService, snapshotId, indexId, snapshotIndexCommit, snapshotStatus, writeShardGens, listener);
}
@Override
public void restoreShard(Store store, SnapshotId snapshotId, IndexId indexId, ShardId snapshotShardId, RecoveryState recoveryState) {
Expand Down
Loading

0 comments on commit 7215201

Please sign in to comment.