Skip to content

Commit

Permalink
[Remove] Legacy Version support from Snapshot/Restore Service (#4728)
Browse files Browse the repository at this point in the history
Removes all stale snapshot / restore code for legacy versions prior to
OpenSearch 2.0. This includes handling null ShardGenerations, partial
concurrency, null index generations, etc. which are no longer supported in
snapshot versions after legacy 7.5.

Signed-off-by: Nicholas Walter Knize <[email protected]>
  • Loading branch information
nknize authored Oct 19, 2022
1 parent eb33015 commit ec34737
Show file tree
Hide file tree
Showing 20 changed files with 160 additions and 972 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,8 @@ Inspired from [Keep a Changelog](https://keepachangelog.com/en/1.0.0/)
- Remove LegacyESVersion.V_7_2_ and V_7_3_ Constants ([#4702](https://github.com/opensearch-project/OpenSearch/pull/4702))
- Always auto release the flood stage block ([#4703](https://github.com/opensearch-project/OpenSearch/pull/4703))
- Remove LegacyESVersion.V_7_4_ and V_7_5_ Constants ([#4704](https://github.com/opensearch-project/OpenSearch/pull/4704))
- Remove Legacy Version support from Snapshot/Restore Service ([#4728](https://github.com/opensearch-project/OpenSearch/pull/4728))

### Fixed
- `opensearch-service.bat start` and `opensearch-service.bat manager` failing to run ([#4289](https://github.com/opensearch-project/OpenSearch/pull/4289))
- PR reference to checkout code for changelog verifier ([#4296](https://github.com/opensearch-project/OpenSearch/pull/4296))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,56 +36,42 @@
import com.sun.net.httpserver.HttpExchange;
import com.sun.net.httpserver.HttpHandler;
import fixture.s3.S3HttpHandler;
import org.opensearch.action.ActionRunnable;
import org.opensearch.action.support.PlainActionFuture;

import org.opensearch.cluster.metadata.RepositoryMetadata;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.SuppressForbidden;
import org.opensearch.common.blobstore.BlobContainer;
import org.opensearch.common.blobstore.BlobPath;
import org.opensearch.common.blobstore.BlobStore;
import org.opensearch.common.bytes.BytesReference;
import org.opensearch.common.regex.Regex;
import org.opensearch.common.settings.MockSecureSettings;
import org.opensearch.common.settings.Setting;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.unit.ByteSizeUnit;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.common.xcontent.NamedXContentRegistry;
import org.opensearch.common.xcontent.XContentFactory;
import org.opensearch.indices.recovery.RecoverySettings;
import org.opensearch.plugins.Plugin;
import org.opensearch.repositories.RepositoriesService;
import org.opensearch.repositories.RepositoryData;
import org.opensearch.repositories.blobstore.BlobStoreRepository;
import org.opensearch.repositories.blobstore.OpenSearchMockAPIBasedRepositoryIntegTestCase;
import org.opensearch.snapshots.SnapshotId;
import org.opensearch.snapshots.SnapshotsService;
import org.opensearch.snapshots.mockstore.BlobStoreWrapper;
import org.opensearch.test.OpenSearchIntegTestCase;
import org.opensearch.threadpool.ThreadPool;

import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;

import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.lessThan;
import static org.hamcrest.Matchers.startsWith;

@SuppressForbidden(reason = "this test uses a HttpServer to emulate an S3 endpoint")
// Need to set up a new cluster for each test because cluster settings use randomized authentication settings
@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST)
public class S3BlobStoreRepositoryTests extends OpenSearchMockAPIBasedRepositoryIntegTestCase {

private static final TimeValue TEST_COOLDOWN_PERIOD = TimeValue.timeValueSeconds(10L);

private String region;
private String signerOverride;

Expand Down Expand Up @@ -158,56 +144,6 @@ protected Settings nodeSettings(int nodeOrdinal) {
return builder.build();
}

public void testEnforcedCooldownPeriod() throws IOException {
final String repoName = createRepository(
randomName(),
Settings.builder().put(repositorySettings()).put(S3Repository.COOLDOWN_PERIOD.getKey(), TEST_COOLDOWN_PERIOD).build()
);

final SnapshotId fakeOldSnapshot = client().admin()
.cluster()
.prepareCreateSnapshot(repoName, "snapshot-old")
.setWaitForCompletion(true)
.setIndices()
.get()
.getSnapshotInfo()
.snapshotId();
final RepositoriesService repositoriesService = internalCluster().getCurrentClusterManagerNodeInstance(RepositoriesService.class);
final BlobStoreRepository repository = (BlobStoreRepository) repositoriesService.repository(repoName);
final RepositoryData repositoryData = getRepositoryData(repository);
final RepositoryData modifiedRepositoryData = repositoryData.withVersions(
Collections.singletonMap(fakeOldSnapshot, SnapshotsService.SHARD_GEN_IN_REPO_DATA_VERSION.minimumCompatibilityVersion())
);
final BytesReference serialized = BytesReference.bytes(
modifiedRepositoryData.snapshotsToXContent(XContentFactory.jsonBuilder(), SnapshotsService.OLD_SNAPSHOT_FORMAT)
);
PlainActionFuture.get(f -> repository.threadPool().generic().execute(ActionRunnable.run(f, () -> {
try (InputStream stream = serialized.streamInput()) {
repository.blobStore()
.blobContainer(repository.basePath())
.writeBlobAtomic(
BlobStoreRepository.INDEX_FILE_PREFIX + modifiedRepositoryData.getGenId(),
stream,
serialized.length(),
true
);
}
})));

final String newSnapshotName = "snapshot-new";
final long beforeThrottledSnapshot = repository.threadPool().relativeTimeInNanos();
client().admin().cluster().prepareCreateSnapshot(repoName, newSnapshotName).setWaitForCompletion(true).setIndices().get();
assertThat(repository.threadPool().relativeTimeInNanos() - beforeThrottledSnapshot, greaterThan(TEST_COOLDOWN_PERIOD.getNanos()));

final long beforeThrottledDelete = repository.threadPool().relativeTimeInNanos();
client().admin().cluster().prepareDeleteSnapshot(repoName, newSnapshotName).get();
assertThat(repository.threadPool().relativeTimeInNanos() - beforeThrottledDelete, greaterThan(TEST_COOLDOWN_PERIOD.getNanos()));

final long beforeFastDelete = repository.threadPool().relativeTimeInNanos();
client().admin().cluster().prepareDeleteSnapshot(repoName, fakeOldSnapshot.getName()).get();
assertThat(repository.threadPool().relativeTimeInNanos() - beforeFastDelete, lessThan(TEST_COOLDOWN_PERIOD.getNanos()));
}

/**
* S3RepositoryPlugin that allows to disable chunked encoding and to set a low threshold between single upload and multipart upload.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,8 @@
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import org.opensearch.LegacyESVersion;
import org.opensearch.Version;
import org.opensearch.action.ActionListener;
import org.opensearch.action.ActionRunnable;
import org.opensearch.cluster.ClusterState;
import org.opensearch.cluster.metadata.Metadata;
import org.opensearch.cluster.metadata.RepositoryMetadata;
Expand All @@ -52,7 +50,6 @@
import org.opensearch.common.settings.Setting;
import org.opensearch.common.unit.ByteSizeUnit;
import org.opensearch.common.unit.ByteSizeValue;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.common.xcontent.NamedXContentRegistry;
import org.opensearch.indices.recovery.RecoverySettings;
import org.opensearch.monitor.jvm.JvmInfo;
Expand All @@ -62,13 +59,10 @@
import org.opensearch.repositories.blobstore.MeteredBlobStoreRepository;
import org.opensearch.snapshots.SnapshotId;
import org.opensearch.snapshots.SnapshotInfo;
import org.opensearch.snapshots.SnapshotsService;
import org.opensearch.threadpool.Scheduler;
import org.opensearch.threadpool.ThreadPool;

import java.util.Collection;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;

Expand Down Expand Up @@ -182,24 +176,6 @@ class S3Repository extends MeteredBlobStoreRepository {

static final Setting<String> CLIENT_NAME = new Setting<>("client", "default", Function.identity());

/**
* Artificial delay to introduce after a snapshot finalization or delete has finished so long as the repository is still using the
* backwards compatible snapshot format from before
* {@link org.opensearch.snapshots.SnapshotsService#SHARD_GEN_IN_REPO_DATA_VERSION} ({@link LegacyESVersion#V_7_6_0}).
* This delay is necessary so that the eventually consistent nature of AWS S3 does not randomly result in repository corruption when
* doing repository operations in rapid succession on a repository in the old metadata format.
* This setting should not be adjusted in production when working with an AWS S3 backed repository. Doing so risks the repository
* becoming silently corrupted. To get rid of this waiting period, either create a new S3 repository or remove all snapshots older than
* {@link LegacyESVersion#V_7_6_0} from the repository which will trigger an upgrade of the repository metadata to the new
* format and disable the cooldown period.
*/
static final Setting<TimeValue> COOLDOWN_PERIOD = Setting.timeSetting(
"cooldown_period",
new TimeValue(3, TimeUnit.MINUTES),
new TimeValue(0, TimeUnit.MILLISECONDS),
Setting.Property.Dynamic
);

/**
* Specifies the path within bucket to repository data. Defaults to root directory.
*/
Expand All @@ -223,12 +199,6 @@ class S3Repository extends MeteredBlobStoreRepository {

private final RepositoryMetadata repositoryMetadata;

/**
* Time period to delay repository operations by after finalizing or deleting a snapshot.
* See {@link #COOLDOWN_PERIOD} for details.
*/
private final TimeValue coolDown;

/**
* Constructs an s3 backed repository
*/
Expand Down Expand Up @@ -296,8 +266,6 @@ class S3Repository extends MeteredBlobStoreRepository {
);
}

coolDown = COOLDOWN_PERIOD.get(metadata.settings());

logger.debug(
"using bucket [{}], chunk_size [{}], server_side_encryption [{}], buffer_size [{}], cannedACL [{}], storageClass [{}]",
bucket,
Expand Down Expand Up @@ -334,9 +302,6 @@ public void finalizeSnapshot(
Function<ClusterState, ClusterState> stateTransformer,
ActionListener<RepositoryData> listener
) {
if (SnapshotsService.useShardGenerations(repositoryMetaVersion) == false) {
listener = delayedListener(listener);
}
super.finalizeSnapshot(
shardGenerations,
repositoryStateId,
Expand All @@ -355,59 +320,9 @@ public void deleteSnapshots(
Version repositoryMetaVersion,
ActionListener<RepositoryData> listener
) {
if (SnapshotsService.useShardGenerations(repositoryMetaVersion) == false) {
listener = delayedListener(listener);
}
super.deleteSnapshots(snapshotIds, repositoryStateId, repositoryMetaVersion, listener);
}

/**
* Wraps given listener such that it is executed with a delay of {@link #coolDown} on the snapshot thread-pool after being invoked.
* See {@link #COOLDOWN_PERIOD} for details.
*/
private <T> ActionListener<T> delayedListener(ActionListener<T> listener) {
final ActionListener<T> wrappedListener = ActionListener.runBefore(listener, () -> {
final Scheduler.Cancellable cancellable = finalizationFuture.getAndSet(null);
assert cancellable != null;
});
return new ActionListener<T>() {
@Override
public void onResponse(T response) {
logCooldownInfo();
final Scheduler.Cancellable existing = finalizationFuture.getAndSet(
threadPool.schedule(
ActionRunnable.wrap(wrappedListener, l -> l.onResponse(response)),
coolDown,
ThreadPool.Names.SNAPSHOT
)
);
assert existing == null : "Already have an ongoing finalization " + finalizationFuture;
}

@Override
public void onFailure(Exception e) {
logCooldownInfo();
final Scheduler.Cancellable existing = finalizationFuture.getAndSet(
threadPool.schedule(ActionRunnable.wrap(wrappedListener, l -> l.onFailure(e)), coolDown, ThreadPool.Names.SNAPSHOT)
);
assert existing == null : "Already have an ongoing finalization " + finalizationFuture;
}
};
}

private void logCooldownInfo() {
logger.info(
"Sleeping for [{}] after modifying repository [{}] because it contains snapshots older than version [{}]"
+ " and therefore is using a backwards compatible metadata format that requires this cooldown period to avoid "
+ "repository corruption. To get rid of this message and move to the new repository metadata format, either remove "
+ "all snapshots older than version [{}] from the repository or create a new repository at an empty location.",
coolDown,
metadata.name(),
SnapshotsService.SHARD_GEN_IN_REPO_DATA_VERSION,
SnapshotsService.SHARD_GEN_IN_REPO_DATA_VERSION
);
}

@Override
protected S3BlobStore createBlobStore() {
return new S3BlobStore(service, bucket, serverSideEncryption, bufferSize, cannedACL, storageClass, repositoryMetadata);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@

package org.opensearch.upgrades;

import org.opensearch.OpenSearchStatusException;
import org.opensearch.action.admin.cluster.repositories.put.PutRepositoryRequest;
import org.opensearch.action.admin.cluster.snapshots.delete.DeleteSnapshotRequest;
import org.opensearch.action.admin.cluster.snapshots.status.SnapshotStatus;
Expand All @@ -42,20 +41,17 @@
import org.opensearch.client.Request;
import org.opensearch.client.RequestOptions;
import org.opensearch.client.Response;
import org.opensearch.client.ResponseException;
import org.opensearch.client.RestClient;
import org.opensearch.client.RestHighLevelClient;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.xcontent.DeprecationHandler;
import org.opensearch.common.xcontent.XContentParser;
import org.opensearch.common.xcontent.json.JsonXContent;
import org.opensearch.snapshots.SnapshotsService;
import org.opensearch.test.rest.OpenSearchRestTestCase;

import java.io.IOException;
import java.io.InputStream;
import java.net.HttpURLConnection;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -231,20 +227,10 @@ public void testUpgradeMovesRepoToNewMetaVersion() throws IOException {
ensureSnapshotRestoreWorks(repoName, "snapshot-2", shards);
}
} else {
if (SnapshotsService.useIndexGenerations(minimumNodeVersion()) == false) {
assertThat(TEST_STEP, is(TestStep.STEP3_OLD_CLUSTER));
final List<Class<? extends Exception>> expectedExceptions =
Arrays.asList(ResponseException.class, OpenSearchStatusException.class);
expectThrowsAnyOf(expectedExceptions, () -> listSnapshots(repoName));
expectThrowsAnyOf(expectedExceptions, () -> deleteSnapshot(client, repoName, "snapshot-1"));
expectThrowsAnyOf(expectedExceptions, () -> deleteSnapshot(client, repoName, "snapshot-2"));
expectThrowsAnyOf(expectedExceptions, () -> createSnapshot(client, repoName, "snapshot-impossible", index));
} else {
assertThat(listSnapshots(repoName), hasSize(2));
if (TEST_STEP == TestStep.STEP4_NEW_CLUSTER) {
ensureSnapshotRestoreWorks(repoName, "snapshot-1", shards);
ensureSnapshotRestoreWorks(repoName, "snapshot-2", shards);
}
assertThat(listSnapshots(repoName), hasSize(2));
if (TEST_STEP == TestStep.STEP4_NEW_CLUSTER) {
ensureSnapshotRestoreWorks(repoName, "snapshot-1", shards);
ensureSnapshotRestoreWorks(repoName, "snapshot-2", shards);
}
}
} finally {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,14 +78,6 @@ public void testShardClone() throws Exception {
final Path repoPath = randomRepoPath();
createRepository(repoName, "fs", repoPath);

final boolean useBwCFormat = randomBoolean();
if (useBwCFormat) {
initWithSnapshotVersion(repoName, repoPath, SnapshotsService.OLD_SNAPSHOT_FORMAT);
// Re-create repo to clear repository data cache
assertAcked(clusterAdmin().prepareDeleteRepository(repoName).get());
createRepository(repoName, "fs", repoPath);
}

final String indexName = "test-index";
createIndexWithRandomDocs(indexName, randomIntBetween(5, 10));
final String sourceSnapshot = "source-snapshot";
Expand All @@ -101,21 +93,11 @@ public void testShardClone() throws Exception {

final SnapshotId targetSnapshotId = new SnapshotId("target-snapshot", UUIDs.randomBase64UUID(random()));

final String currentShardGen;
if (useBwCFormat) {
currentShardGen = null;
} else {
currentShardGen = repositoryData.shardGenerations().getShardGen(indexId, shardId);
}
final String currentShardGen = repositoryData.shardGenerations().getShardGen(indexId, shardId);
final String newShardGeneration = PlainActionFuture.get(
f -> repository.cloneShardSnapshot(sourceSnapshotInfo.snapshotId(), targetSnapshotId, repositoryShardId, currentShardGen, f)
);

if (useBwCFormat) {
final long gen = Long.parseLong(newShardGeneration);
assertEquals(gen, 1L); // Initial snapshot brought it to 0, clone increments it to 1
}

final BlobStoreIndexShardSnapshot targetShardSnapshot = readShardSnapshot(repository, repositoryShardId, targetSnapshotId);
final BlobStoreIndexShardSnapshot sourceShardSnapshot = readShardSnapshot(
repository,
Expand Down
Loading

0 comments on commit ec34737

Please sign in to comment.