diff --git a/plugins/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3Repository.java b/plugins/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3Repository.java index 8bd187de0b4ac..ccba064c8fbb4 100644 --- a/plugins/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3Repository.java +++ b/plugins/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3Repository.java @@ -40,6 +40,7 @@ import org.elasticsearch.snapshots.SnapshotId; import org.elasticsearch.snapshots.SnapshotInfo; import org.elasticsearch.snapshots.SnapshotShardFailure; +import org.elasticsearch.snapshots.SnapshotsService; import org.elasticsearch.threadpool.Scheduler; import org.elasticsearch.threadpool.ThreadPool; @@ -142,9 +143,13 @@ class S3Repository extends BlobStoreRepository { /** * Artificial delay to introduce after a snapshot finalization or delete has finished so long as the repository is still using the * backwards compatible snapshot format from before - * {@link org.elasticsearch.snapshots.SnapshotsService#SHARD_GEN_IN_REPO_DATA_VERSION}. + * {@link org.elasticsearch.snapshots.SnapshotsService#SHARD_GEN_IN_REPO_DATA_VERSION} ({@link org.elasticsearch.Version#V_7_6_0}). * This delay is necessary so that the eventually consistent nature of AWS S3 does not randomly result in repository corruption when - * doing repository operations in rapid succession. + * doing repository operations in rapid succession on a repository in the old metadata format. + * This setting should not be adjusted in production when working with an AWS S3 backed repository. Doing so risks the repository + * becoming silently corrupted. To get rid of this waiting period, either create a new S3 repository or remove all snapshots older than + * {@link org.elasticsearch.Version#V_7_6_0} from the repository which will trigger an upgrade of the repository metadata to the new + * format and disable the cooldown period. */ static final Setting COOLDOWN_PERIOD = Setting.timeSetting( "cooldown_period", @@ -258,6 +263,7 @@ private ActionListener delayedListener(ActionListener listener) { return new ActionListener<>() { @Override public void onResponse(T response) { + logCooldownInfo(); final Scheduler.Cancellable existing = finalizationFuture.getAndSet( threadPool.schedule(() -> wrappedListener.onResponse(response), coolDown, ThreadPool.Names.SNAPSHOT)); assert existing == null : "Already have an ongoing finalization " + finalizationFuture; @@ -265,6 +271,7 @@ public void onResponse(T response) { @Override public void onFailure(Exception e) { + logCooldownInfo(); final Scheduler.Cancellable existing = finalizationFuture.getAndSet( threadPool.schedule(() -> wrappedListener.onFailure(e), coolDown, ThreadPool.Names.SNAPSHOT)); assert existing == null : "Already have an ongoing finalization " + finalizationFuture; @@ -272,6 +279,15 @@ public void onFailure(Exception e) { }; } + private void logCooldownInfo() { + logger.info("Sleeping for [{}] after modifying repository [{}] because it contains snapshots older than version [{}]" + + " and therefore is using a backwards compatible metadata format that requires this cooldown period to avoid " + + "repository corruption. To get rid of this message and move to the new repository metadata format, either remove " + + "all snapshots older than version [{}] from the repository or create a new repository at an empty location.", + coolDown, metadata.name(), SnapshotsService.SHARD_GEN_IN_REPO_DATA_VERSION, + SnapshotsService.SHARD_GEN_IN_REPO_DATA_VERSION); + } + private static BlobPath buildBasePath(RepositoryMetaData metadata) { final String basePath = BASE_PATH_SETTING.get(metadata.settings()); if (Strings.hasLength(basePath)) { diff --git a/plugins/repository-s3/src/test/java/org/elasticsearch/repositories/s3/S3BlobStoreRepositoryTests.java b/plugins/repository-s3/src/test/java/org/elasticsearch/repositories/s3/S3BlobStoreRepositoryTests.java index 3c26c99f8a528..9fcb08b2f0bf5 100644 --- a/plugins/repository-s3/src/test/java/org/elasticsearch/repositories/s3/S3BlobStoreRepositoryTests.java +++ b/plugins/repository-s3/src/test/java/org/elasticsearch/repositories/s3/S3BlobStoreRepositoryTests.java @@ -22,30 +22,48 @@ import com.sun.net.httpserver.HttpExchange; import com.sun.net.httpserver.HttpHandler; import fixture.s3.S3HttpHandler; +import org.elasticsearch.action.ActionRunnable; +import org.elasticsearch.action.support.PlainActionFuture; import org.elasticsearch.cluster.metadata.RepositoryMetaData; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.SuppressForbidden; import org.elasticsearch.common.blobstore.BlobContainer; import org.elasticsearch.common.blobstore.BlobPath; import org.elasticsearch.common.blobstore.BlobStore; +import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.settings.MockSecureSettings; import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.ByteSizeUnit; +import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.xcontent.NamedXContentRegistry; +import org.elasticsearch.common.xcontent.XContentFactory; import org.elasticsearch.plugins.Plugin; +import org.elasticsearch.repositories.RepositoriesService; +import org.elasticsearch.repositories.RepositoryData; +import org.elasticsearch.repositories.blobstore.BlobStoreRepository; import org.elasticsearch.repositories.blobstore.ESMockAPIBasedRepositoryIntegTestCase; +import org.elasticsearch.snapshots.SnapshotId; +import org.elasticsearch.snapshots.SnapshotsService; import org.elasticsearch.snapshots.mockstore.BlobStoreWrapper; +import org.elasticsearch.threadpool.ThreadPool; +import java.io.IOException; +import java.io.InputStream; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.List; import java.util.Map; +import static org.hamcrest.Matchers.greaterThan; +import static org.hamcrest.Matchers.lessThan; + @SuppressForbidden(reason = "this test uses a HttpServer to emulate an S3 endpoint") public class S3BlobStoreRepositoryTests extends ESMockAPIBasedRepositoryIntegTestCase { + private static final TimeValue TEST_COOLDOWN_PERIOD = TimeValue.timeValueSeconds(5L); + @Override protected String repositoryType() { return S3Repository.TYPE; @@ -82,6 +100,7 @@ protected Settings nodeSettings(int nodeOrdinal) { secureSettings.setString(S3ClientSettings.SECRET_KEY_SETTING.getConcreteSettingForNamespace("test").getKey(), "secret"); return Settings.builder() + .put(ThreadPool.ESTIMATED_TIME_INTERVAL_SETTING.getKey(), 0) // We have tests that verify an exact wait time .put(S3ClientSettings.ENDPOINT_SETTING.getConcreteSettingForNamespace("test").getKey(), httpServerUrl()) // Disable chunked encoding as it simplifies a lot the request parsing on the httpServer side .put(S3ClientSettings.DISABLE_CHUNKED_ENCODING.getConcreteSettingForNamespace("test").getKey(), true) @@ -92,6 +111,41 @@ protected Settings nodeSettings(int nodeOrdinal) { .build(); } + public void testEnforcedCooldownPeriod() throws IOException { + final String repoName = createRepository(randomName(), Settings.builder().put(repositorySettings()) + .put(S3Repository.COOLDOWN_PERIOD.getKey(), TEST_COOLDOWN_PERIOD).build()); + + final SnapshotId fakeOldSnapshot = client().admin().cluster().prepareCreateSnapshot(repoName, "snapshot-old") + .setWaitForCompletion(true).setIndices().get().getSnapshotInfo().snapshotId(); + final RepositoriesService repositoriesService = internalCluster().getInstance(RepositoriesService.class); + final BlobStoreRepository repository = (BlobStoreRepository) repositoriesService.repository(repoName); + final RepositoryData repositoryData = + PlainActionFuture.get(f -> repository.threadPool().generic().execute(() -> repository.getRepositoryData(f))); + final RepositoryData modifiedRepositoryData = repositoryData.withVersions(Collections.singletonMap(fakeOldSnapshot, + SnapshotsService.SHARD_GEN_IN_REPO_DATA_VERSION.minimumCompatibilityVersion())); + final BytesReference serialized = + BytesReference.bytes(modifiedRepositoryData.snapshotsToXContent(XContentFactory.jsonBuilder(), false)); + PlainActionFuture.get(f -> repository.threadPool().generic().execute(ActionRunnable.run(f, () -> { + try (InputStream stream = serialized.streamInput()) { + repository.blobStore().blobContainer(repository.basePath()).writeBlobAtomic( + BlobStoreRepository.INDEX_FILE_PREFIX + modifiedRepositoryData.getGenId(), stream, serialized.length(), true); + } + }))); + + final String newSnapshotName = "snapshot-new"; + final long beforeThrottledSnapshot = repository.threadPool().relativeTimeInNanos(); + client().admin().cluster().prepareCreateSnapshot(repoName, newSnapshotName).setWaitForCompletion(true).setIndices().get(); + assertThat(repository.threadPool().relativeTimeInNanos() - beforeThrottledSnapshot, greaterThan(TEST_COOLDOWN_PERIOD.getNanos())); + + final long beforeThrottledDelete = repository.threadPool().relativeTimeInNanos(); + client().admin().cluster().prepareDeleteSnapshot(repoName, newSnapshotName).get(); + assertThat(repository.threadPool().relativeTimeInNanos() - beforeThrottledDelete, greaterThan(TEST_COOLDOWN_PERIOD.getNanos())); + + final long beforeFastDelete = repository.threadPool().relativeTimeInNanos(); + client().admin().cluster().prepareDeleteSnapshot(repoName, fakeOldSnapshot.getName()).get(); + assertThat(repository.threadPool().relativeTimeInNanos() - beforeFastDelete, lessThan(TEST_COOLDOWN_PERIOD.getNanos())); + } + /** * S3RepositoryPlugin that allows to disable chunked encoding and to set a low threshold between single upload and multipart upload. */