Skip to content

Commit

Permalink
Refactor Inflexible Snapshot Repository BwC (#52365)
Browse files Browse the repository at this point in the history
Transport the version to use for  a snapshot instead of whether to use shard generations in the snapshots in progress entry. This allows making upcoming repository metadata changes in a flexible manner in an analogous way to how we handle serialization BwC elsewhere.
Also, exposing the version at the repository API level will make it easier to do BwC relevant changes in derived repositories like source only or encrypted.
  • Loading branch information
original-brownbear authored Feb 17, 2020
1 parent 82ede04 commit f06f9fa
Show file tree
Hide file tree
Showing 27 changed files with 227 additions and 169 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionRunnable;
import org.elasticsearch.cluster.metadata.MetaData;
Expand Down Expand Up @@ -235,21 +236,22 @@ class S3Repository extends BlobStoreRepository {
@Override
public void finalizeSnapshot(SnapshotId snapshotId, ShardGenerations shardGenerations, long startTime, String failure, int totalShards,
List<SnapshotShardFailure> shardFailures, long repositoryStateId, boolean includeGlobalState,
MetaData clusterMetaData, Map<String, Object> userMetadata, boolean writeShardGens,
MetaData clusterMetaData, Map<String, Object> userMetadata, Version repositoryMetaVersion,
ActionListener<SnapshotInfo> listener) {
if (writeShardGens == false) {
if (SnapshotsService.useShardGenerations(repositoryMetaVersion) == false) {
listener = delayedListener(listener);
}
super.finalizeSnapshot(snapshotId, shardGenerations, startTime, failure, totalShards, shardFailures, repositoryStateId,
includeGlobalState, clusterMetaData, userMetadata, writeShardGens, listener);
includeGlobalState, clusterMetaData, userMetadata, repositoryMetaVersion, listener);
}

@Override
public void deleteSnapshot(SnapshotId snapshotId, long repositoryStateId, boolean writeShardGens, ActionListener<Void> listener) {
if (writeShardGens == false) {
public void deleteSnapshot(SnapshotId snapshotId, long repositoryStateId, Version repositoryMetaVersion,
ActionListener<Void> listener) {
if (SnapshotsService.useShardGenerations(repositoryMetaVersion) == false) {
listener = delayedListener(listener);
}
super.deleteSnapshot(snapshotId, repositoryStateId, writeShardGens, listener);
super.deleteSnapshot(snapshotId, repositoryStateId, repositoryMetaVersion, listener);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,7 @@ public void testUpgradeMovesRepoToNewMetaVersion() throws IOException {
ensureSnapshotRestoreWorks(client, repoName, "snapshot-2", shards);
}
} else {
if (minimumNodeVersion().before(SnapshotsService.SHARD_GEN_IN_REPO_DATA_VERSION)) {
if (SnapshotsService.useShardGenerations(minimumNodeVersion()) == false) {
assertThat(TEST_STEP, is(TestStep.STEP3_OLD_CLUSTER));
final List<Class<? extends Exception>> expectedExceptions =
List.of(ResponseException.class, ElasticsearchStatusException.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -220,8 +220,8 @@ public void clusterStateProcessed(String source, ClusterState oldState, ClusterS
threadPool.executor(ThreadPool.Names.SNAPSHOT).execute(ActionRunnable.wrap(listener,
l -> blobStoreRepository.cleanup(
repositoryStateId,
newState.nodes().getMinNodeVersion().onOrAfter(SnapshotsService.SHARD_GEN_IN_REPO_DATA_VERSION)
&& snapshotsService.hasOldVersionSnapshots(repositoryName, repositoryData, null) == false,
snapshotsService.minCompatibleVersion(
newState.nodes().getMinNodeVersion(), repositoryName, repositoryData, null),
ActionListener.wrap(result -> after(null, result), e -> after(e, null)))
));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,9 @@
* Meta data about snapshots that are currently executing
*/
public class SnapshotsInProgress extends AbstractNamedDiffable<Custom> implements Custom {

private static final Version VERSION_IN_SNAPSHOT_VERSION = Version.V_8_0_0;

public static final String TYPE = "snapshots";

@Override
Expand Down Expand Up @@ -93,13 +96,13 @@ public static class Entry implements ToXContent, RepositoryOperation {
private final long startTime;
private final long repositoryStateId;
// see #useShardGenerations
private final boolean useShardGenerations;
private final Version version;
@Nullable private final Map<String, Object> userMetadata;
@Nullable private final String failure;

public Entry(Snapshot snapshot, boolean includeGlobalState, boolean partial, State state, List<IndexId> indices,
long startTime, long repositoryStateId, ImmutableOpenMap<ShardId, ShardSnapshotStatus> shards,
String failure, Map<String, Object> userMetadata, boolean useShardGenerations) {
String failure, Map<String, Object> userMetadata, Version version) {
this.state = state;
this.snapshot = snapshot;
this.includeGlobalState = includeGlobalState;
Expand All @@ -117,7 +120,7 @@ public Entry(Snapshot snapshot, boolean includeGlobalState, boolean partial, Sta
this.repositoryStateId = repositoryStateId;
this.failure = failure;
this.userMetadata = userMetadata;
this.useShardGenerations = useShardGenerations;
this.version = version;
}

private static boolean assertShardsConsistent(State state, List<IndexId> indices,
Expand All @@ -135,25 +138,25 @@ private static boolean assertShardsConsistent(State state, List<IndexId> indices

public Entry(Snapshot snapshot, boolean includeGlobalState, boolean partial, State state, List<IndexId> indices,
long startTime, long repositoryStateId, ImmutableOpenMap<ShardId, ShardSnapshotStatus> shards,
Map<String, Object> userMetadata, boolean useShardGenerations) {
Map<String, Object> userMetadata, Version version) {
this(snapshot, includeGlobalState, partial, state, indices, startTime, repositoryStateId, shards, null, userMetadata,
useShardGenerations);
version);
}

public Entry(Entry entry, State state, List<IndexId> indices, long repositoryStateId,
ImmutableOpenMap<ShardId, ShardSnapshotStatus> shards, boolean useShardGenerations, String failure) {
ImmutableOpenMap<ShardId, ShardSnapshotStatus> shards, Version version, String failure) {
this(entry.snapshot, entry.includeGlobalState, entry.partial, state, indices, entry.startTime, repositoryStateId, shards,
failure, entry.userMetadata, useShardGenerations);
failure, entry.userMetadata, version);
}

public Entry(Entry entry, State state, ImmutableOpenMap<ShardId, ShardSnapshotStatus> shards) {
this(entry.snapshot, entry.includeGlobalState, entry.partial, state, entry.indices, entry.startTime,
entry.repositoryStateId, shards, entry.failure, entry.userMetadata, entry.useShardGenerations);
entry.repositoryStateId, shards, entry.failure, entry.userMetadata, entry.version);
}

public Entry(Entry entry, State state, ImmutableOpenMap<ShardId, ShardSnapshotStatus> shards, String failure) {
this(entry.snapshot, entry.includeGlobalState, entry.partial, state, entry.indices, entry.startTime,
entry.repositoryStateId, shards, failure, entry.userMetadata, entry.useShardGenerations);
entry.repositoryStateId, shards, failure, entry.userMetadata, entry.version);
}

public Entry(Entry entry, ImmutableOpenMap<ShardId, ShardSnapshotStatus> shards) {
Expand Down Expand Up @@ -211,13 +214,10 @@ public String failure() {
}

/**
* Whether to write to the repository in a format only understood by versions newer than
* {@link SnapshotsService#SHARD_GEN_IN_REPO_DATA_VERSION}.
*
* @return true if writing to repository in new format
* What version of metadata to use for the snapshot in the repository
*/
public boolean useShardGenerations() {
return useShardGenerations;
public Version version() {
return version;
}

@Override
Expand All @@ -235,7 +235,7 @@ public boolean equals(Object o) {
if (!snapshot.equals(entry.snapshot)) return false;
if (state != entry.state) return false;
if (repositoryStateId != entry.repositoryStateId) return false;
if (useShardGenerations != entry.useShardGenerations) return false;
if (version.equals(entry.version) == false) return false;

return true;
}
Expand All @@ -250,7 +250,7 @@ public int hashCode() {
result = 31 * result + indices.hashCode();
result = 31 * result + Long.hashCode(startTime);
result = 31 * result + Long.hashCode(repositoryStateId);
result = 31 * result + (useShardGenerations ? 1 : 0);
result = 31 * result + version.hashCode();
return result;
}

Expand Down Expand Up @@ -365,7 +365,7 @@ public ShardSnapshotStatus(String nodeId, ShardState state, String reason, Strin
public ShardSnapshotStatus(StreamInput in) throws IOException {
nodeId = in.readOptionalString();
state = ShardState.fromValue(in.readByte());
if (in.getVersion().onOrAfter(SnapshotsService.SHARD_GEN_IN_REPO_DATA_VERSION)) {
if (SnapshotsService.useShardGenerations(in.getVersion())) {
generation = in.readOptionalString();
} else {
generation = null;
Expand All @@ -392,7 +392,7 @@ public String reason() {
public void writeTo(StreamOutput out) throws IOException {
out.writeOptionalString(nodeId);
out.writeByte(state.value);
if (out.getVersion().onOrAfter(SnapshotsService.SHARD_GEN_IN_REPO_DATA_VERSION)) {
if (SnapshotsService.useShardGenerations(out.getVersion())) {
out.writeOptionalString(generation);
}
out.writeOptionalString(reason);
Expand Down Expand Up @@ -533,11 +533,14 @@ public SnapshotsInProgress(StreamInput in) throws IOException {
if (in.getVersion().onOrAfter(METADATA_FIELD_INTRODUCED)) {
userMetadata = in.readMap();
}
final boolean useShardGenerations;
if (in.getVersion().onOrAfter(SnapshotsService.SHARD_GEN_IN_REPO_DATA_VERSION)) {
useShardGenerations = in.readBoolean();
final Version version;
if (in.getVersion().onOrAfter(VERSION_IN_SNAPSHOT_VERSION)) {
version = Version.readVersion(in);
} else {
useShardGenerations = false;
// If an older master informs us that shard generations are supported we use the minimum shard generation compatible
// version. If shard generations are not supported yet we use a placeholder for a version that does not use shard
// generations.
version = in.readBoolean() ? SnapshotsService.SHARD_GEN_IN_REPO_DATA_VERSION : SnapshotsService.OLD_SNAPSHOT_FORMAT;
}
entries[i] = new Entry(snapshot,
includeGlobalState,
Expand All @@ -549,7 +552,7 @@ public SnapshotsInProgress(StreamInput in) throws IOException {
builder.build(),
failure,
userMetadata,
useShardGenerations
version
);
}
this.entries = Arrays.asList(entries);
Expand Down Expand Up @@ -578,8 +581,10 @@ public void writeTo(StreamOutput out) throws IOException {
if (out.getVersion().onOrAfter(METADATA_FIELD_INTRODUCED)) {
out.writeMap(entry.userMetadata);
}
if (out.getVersion().onOrAfter(SnapshotsService.SHARD_GEN_IN_REPO_DATA_VERSION)) {
out.writeBoolean(entry.useShardGenerations);
if (out.getVersion().onOrAfter(VERSION_IN_SNAPSHOT_VERSION)) {
Version.writeVersion(entry.version, out);
} else {
out.writeBoolean(SnapshotsService.useShardGenerations(entry.version));
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.elasticsearch.repositories;

import org.apache.lucene.index.IndexCommit;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexMetaData;
Expand Down Expand Up @@ -77,14 +78,15 @@ public void getRepositoryData(ActionListener<RepositoryData> listener) {
public void finalizeSnapshot(SnapshotId snapshotId, ShardGenerations shardGenerations, long startTime, String failure,
int totalShards, List<SnapshotShardFailure> shardFailures, long repositoryStateId,
boolean includeGlobalState, MetaData metaData, Map<String, Object> userMetadata,
boolean writeShardGens, ActionListener<SnapshotInfo> listener) {
Version repositoryMetaVersion, ActionListener<SnapshotInfo> listener) {
in.finalizeSnapshot(snapshotId, shardGenerations, startTime, failure, totalShards, shardFailures, repositoryStateId,
includeGlobalState, metaData, userMetadata, writeShardGens, listener);
includeGlobalState, metaData, userMetadata, repositoryMetaVersion, listener);
}

@Override
public void deleteSnapshot(SnapshotId snapshotId, long repositoryStateId, boolean writeShardGens, ActionListener<Void> listener) {
in.deleteSnapshot(snapshotId, repositoryStateId, writeShardGens, listener);
public void deleteSnapshot(SnapshotId snapshotId, long repositoryStateId, Version repositoryMetaVersion,
ActionListener<Void> listener) {
in.deleteSnapshot(snapshotId, repositoryStateId, repositoryMetaVersion, listener);
}

@Override
Expand Down Expand Up @@ -119,10 +121,10 @@ public boolean isReadOnly() {

@Override
public void snapshotShard(Store store, MapperService mapperService, SnapshotId snapshotId, IndexId indexId,
IndexCommit snapshotIndexCommit, IndexShardSnapshotStatus snapshotStatus, boolean writeShardGens,
IndexCommit snapshotIndexCommit, IndexShardSnapshotStatus snapshotStatus, Version repositoryMetaVersion,
Map<String, Object> userMetadata, ActionListener<String> listener) {
in.snapshotShard(
store, mapperService, snapshotId, indexId, snapshotIndexCommit, snapshotStatus, writeShardGens, userMetadata, listener);
store, mapperService, snapshotId, indexId, snapshotIndexCommit, snapshotStatus, repositoryMetaVersion, userMetadata, listener);
}
@Override
public void restoreShard(Store store, SnapshotId snapshotId, IndexId indexId, ShardId snapshotShardId, RecoveryState recoveryState,
Expand Down
Loading

0 comments on commit f06f9fa

Please sign in to comment.