diff --git a/plugins/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3Repository.java b/plugins/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3Repository.java index db6968ea71059..91b5dfeb5f546 100644 --- a/plugins/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3Repository.java +++ b/plugins/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3Repository.java @@ -21,6 +21,9 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.ActionRunnable; +import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.metadata.RepositoryMetaData; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.Strings; @@ -29,11 +32,23 @@ import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.unit.ByteSizeUnit; import org.elasticsearch.common.unit.ByteSizeValue; +import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.xcontent.NamedXContentRegistry; import org.elasticsearch.monitor.jvm.JvmInfo; import org.elasticsearch.repositories.RepositoryException; +import org.elasticsearch.repositories.ShardGenerations; import org.elasticsearch.repositories.blobstore.BlobStoreRepository; - +import org.elasticsearch.snapshots.SnapshotId; +import org.elasticsearch.snapshots.SnapshotInfo; +import org.elasticsearch.snapshots.SnapshotShardFailure; +import org.elasticsearch.snapshots.SnapshotsService; +import org.elasticsearch.threadpool.Scheduler; +import org.elasticsearch.threadpool.ThreadPool; + +import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; import java.util.function.Function; /** @@ -126,6 +141,23 @@ class S3Repository extends BlobStoreRepository { static final Setting CLIENT_NAME = new Setting<>("client", "default", Function.identity()); + /** + * Artificial delay to introduce after a snapshot finalization or delete has finished so long as the repository is still using the + * backwards compatible snapshot format from before + * {@link org.elasticsearch.snapshots.SnapshotsService#SHARD_GEN_IN_REPO_DATA_VERSION} ({@link org.elasticsearch.Version#V_7_6_0}). + * This delay is necessary so that the eventually consistent nature of AWS S3 does not randomly result in repository corruption when + * doing repository operations in rapid succession on a repository in the old metadata format. + * This setting should not be adjusted in production when working with an AWS S3 backed repository. Doing so risks the repository + * becoming silently corrupted. To get rid of this waiting period, either create a new S3 repository or remove all snapshots older than + * {@link org.elasticsearch.Version#V_7_6_0} from the repository which will trigger an upgrade of the repository metadata to the new + * format and disable the cooldown period. + */ + static final Setting COOLDOWN_PERIOD = Setting.timeSetting( + "cooldown_period", + new TimeValue(3, TimeUnit.MINUTES), + new TimeValue(0, TimeUnit.MILLISECONDS), + Setting.Property.Dynamic); + /** * Specifies the path within bucket to repository data. Defaults to root directory. */ @@ -145,6 +177,12 @@ class S3Repository extends BlobStoreRepository { private final String cannedACL; + /** + * Time period to delay repository operations by after finalizing or deleting a snapshot. + * See {@link #COOLDOWN_PERIOD} for details. + */ + private final TimeValue coolDown; + /** * Constructs an s3 backed repository */ @@ -176,6 +214,8 @@ class S3Repository extends BlobStoreRepository { this.storageClass = STORAGE_CLASS_SETTING.get(metadata.settings()); this.cannedACL = CANNED_ACL_SETTING.get(metadata.settings()); + coolDown = COOLDOWN_PERIOD.get(metadata.settings()); + logger.debug( "using bucket [{}], chunk_size [{}], server_side_encryption [{}], buffer_size [{}], cannedACL [{}], storageClass [{}]", bucket, @@ -186,6 +226,70 @@ class S3Repository extends BlobStoreRepository { storageClass); } + /** + * Holds a reference to delayed repository operation {@link Scheduler.Cancellable} so it can be cancelled should the repository be + * closed concurrently. + */ + private final AtomicReference finalizationFuture = new AtomicReference<>(); + + @Override + public void finalizeSnapshot(SnapshotId snapshotId, ShardGenerations shardGenerations, long startTime, String failure, int totalShards, + List shardFailures, long repositoryStateId, boolean includeGlobalState, + MetaData clusterMetaData, Map userMetadata, boolean writeShardGens, + ActionListener listener) { + if (writeShardGens == false) { + listener = delayedListener(listener); + } + super.finalizeSnapshot(snapshotId, shardGenerations, startTime, failure, totalShards, shardFailures, repositoryStateId, + includeGlobalState, clusterMetaData, userMetadata, writeShardGens, listener); + } + + @Override + public void deleteSnapshot(SnapshotId snapshotId, long repositoryStateId, boolean writeShardGens, ActionListener listener) { + if (writeShardGens == false) { + listener = delayedListener(listener); + } + super.deleteSnapshot(snapshotId, repositoryStateId, writeShardGens, listener); + } + + /** + * Wraps given listener such that it is executed with a delay of {@link #coolDown} on the snapshot thread-pool after being invoked. + * See {@link #COOLDOWN_PERIOD} for details. + */ + private ActionListener delayedListener(ActionListener listener) { + final ActionListener wrappedListener = ActionListener.runBefore(listener, () -> { + final Scheduler.Cancellable cancellable = finalizationFuture.getAndSet(null); + assert cancellable != null; + }); + return new ActionListener<>() { + @Override + public void onResponse(T response) { + logCooldownInfo(); + final Scheduler.Cancellable existing = finalizationFuture.getAndSet( + threadPool.schedule(ActionRunnable.wrap(wrappedListener, l -> l.onResponse(response)), + coolDown, ThreadPool.Names.SNAPSHOT)); + assert existing == null : "Already have an ongoing finalization " + finalizationFuture; + } + + @Override + public void onFailure(Exception e) { + logCooldownInfo(); + final Scheduler.Cancellable existing = finalizationFuture.getAndSet( + threadPool.schedule(ActionRunnable.wrap(wrappedListener, l -> l.onFailure(e)), coolDown, ThreadPool.Names.SNAPSHOT)); + assert existing == null : "Already have an ongoing finalization " + finalizationFuture; + } + }; + } + + private void logCooldownInfo() { + logger.info("Sleeping for [{}] after modifying repository [{}] because it contains snapshots older than version [{}]" + + " and therefore is using a backwards compatible metadata format that requires this cooldown period to avoid " + + "repository corruption. To get rid of this message and move to the new repository metadata format, either remove " + + "all snapshots older than version [{}] from the repository or create a new repository at an empty location.", + coolDown, metadata.name(), SnapshotsService.SHARD_GEN_IN_REPO_DATA_VERSION, + SnapshotsService.SHARD_GEN_IN_REPO_DATA_VERSION); + } + private static BlobPath buildBasePath(RepositoryMetaData metadata) { final String basePath = BASE_PATH_SETTING.get(metadata.settings()); if (Strings.hasLength(basePath)) { @@ -210,4 +314,14 @@ protected BlobStore getBlobStore() { protected ByteSizeValue chunkSize() { return chunkSize; } + + @Override + protected void doClose() { + final Scheduler.Cancellable cancellable = finalizationFuture.getAndSet(null); + if (cancellable != null) { + logger.debug("Repository [{}] closed during cool-down period", metadata.name()); + cancellable.cancel(); + } + super.doClose(); + } } diff --git a/plugins/repository-s3/src/test/java/org/elasticsearch/repositories/s3/S3BlobStoreRepositoryTests.java b/plugins/repository-s3/src/test/java/org/elasticsearch/repositories/s3/S3BlobStoreRepositoryTests.java index 3c26c99f8a528..f4f4990d1c5b4 100644 --- a/plugins/repository-s3/src/test/java/org/elasticsearch/repositories/s3/S3BlobStoreRepositoryTests.java +++ b/plugins/repository-s3/src/test/java/org/elasticsearch/repositories/s3/S3BlobStoreRepositoryTests.java @@ -22,30 +22,48 @@ import com.sun.net.httpserver.HttpExchange; import com.sun.net.httpserver.HttpHandler; import fixture.s3.S3HttpHandler; +import org.elasticsearch.action.ActionRunnable; +import org.elasticsearch.action.support.PlainActionFuture; import org.elasticsearch.cluster.metadata.RepositoryMetaData; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.SuppressForbidden; import org.elasticsearch.common.blobstore.BlobContainer; import org.elasticsearch.common.blobstore.BlobPath; import org.elasticsearch.common.blobstore.BlobStore; +import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.settings.MockSecureSettings; import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.ByteSizeUnit; +import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.xcontent.NamedXContentRegistry; +import org.elasticsearch.common.xcontent.XContentFactory; import org.elasticsearch.plugins.Plugin; +import org.elasticsearch.repositories.RepositoriesService; +import org.elasticsearch.repositories.RepositoryData; +import org.elasticsearch.repositories.blobstore.BlobStoreRepository; import org.elasticsearch.repositories.blobstore.ESMockAPIBasedRepositoryIntegTestCase; +import org.elasticsearch.snapshots.SnapshotId; +import org.elasticsearch.snapshots.SnapshotsService; import org.elasticsearch.snapshots.mockstore.BlobStoreWrapper; +import org.elasticsearch.threadpool.ThreadPool; +import java.io.IOException; +import java.io.InputStream; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.List; import java.util.Map; +import static org.hamcrest.Matchers.greaterThan; +import static org.hamcrest.Matchers.lessThan; + @SuppressForbidden(reason = "this test uses a HttpServer to emulate an S3 endpoint") public class S3BlobStoreRepositoryTests extends ESMockAPIBasedRepositoryIntegTestCase { + private static final TimeValue TEST_COOLDOWN_PERIOD = TimeValue.timeValueSeconds(5L); + @Override protected String repositoryType() { return S3Repository.TYPE; @@ -82,6 +100,7 @@ protected Settings nodeSettings(int nodeOrdinal) { secureSettings.setString(S3ClientSettings.SECRET_KEY_SETTING.getConcreteSettingForNamespace("test").getKey(), "secret"); return Settings.builder() + .put(ThreadPool.ESTIMATED_TIME_INTERVAL_SETTING.getKey(), 0) // We have tests that verify an exact wait time .put(S3ClientSettings.ENDPOINT_SETTING.getConcreteSettingForNamespace("test").getKey(), httpServerUrl()) // Disable chunked encoding as it simplifies a lot the request parsing on the httpServer side .put(S3ClientSettings.DISABLE_CHUNKED_ENCODING.getConcreteSettingForNamespace("test").getKey(), true) @@ -92,6 +111,41 @@ protected Settings nodeSettings(int nodeOrdinal) { .build(); } + public void testEnforcedCooldownPeriod() throws IOException { + final String repoName = createRepository(randomName(), Settings.builder().put(repositorySettings()) + .put(S3Repository.COOLDOWN_PERIOD.getKey(), TEST_COOLDOWN_PERIOD).build()); + + final SnapshotId fakeOldSnapshot = client().admin().cluster().prepareCreateSnapshot(repoName, "snapshot-old") + .setWaitForCompletion(true).setIndices().get().getSnapshotInfo().snapshotId(); + final RepositoriesService repositoriesService = internalCluster().getCurrentMasterNodeInstance(RepositoriesService.class); + final BlobStoreRepository repository = (BlobStoreRepository) repositoriesService.repository(repoName); + final RepositoryData repositoryData = + PlainActionFuture.get(f -> repository.threadPool().generic().execute(() -> repository.getRepositoryData(f))); + final RepositoryData modifiedRepositoryData = repositoryData.withVersions(Collections.singletonMap(fakeOldSnapshot, + SnapshotsService.SHARD_GEN_IN_REPO_DATA_VERSION.minimumCompatibilityVersion())); + final BytesReference serialized = + BytesReference.bytes(modifiedRepositoryData.snapshotsToXContent(XContentFactory.jsonBuilder(), false)); + PlainActionFuture.get(f -> repository.threadPool().generic().execute(ActionRunnable.run(f, () -> { + try (InputStream stream = serialized.streamInput()) { + repository.blobStore().blobContainer(repository.basePath()).writeBlobAtomic( + BlobStoreRepository.INDEX_FILE_PREFIX + modifiedRepositoryData.getGenId(), stream, serialized.length(), true); + } + }))); + + final String newSnapshotName = "snapshot-new"; + final long beforeThrottledSnapshot = repository.threadPool().relativeTimeInNanos(); + client().admin().cluster().prepareCreateSnapshot(repoName, newSnapshotName).setWaitForCompletion(true).setIndices().get(); + assertThat(repository.threadPool().relativeTimeInNanos() - beforeThrottledSnapshot, greaterThan(TEST_COOLDOWN_PERIOD.getNanos())); + + final long beforeThrottledDelete = repository.threadPool().relativeTimeInNanos(); + client().admin().cluster().prepareDeleteSnapshot(repoName, newSnapshotName).get(); + assertThat(repository.threadPool().relativeTimeInNanos() - beforeThrottledDelete, greaterThan(TEST_COOLDOWN_PERIOD.getNanos())); + + final long beforeFastDelete = repository.threadPool().relativeTimeInNanos(); + client().admin().cluster().prepareDeleteSnapshot(repoName, fakeOldSnapshot.getName()).get(); + assertThat(repository.threadPool().relativeTimeInNanos() - beforeFastDelete, lessThan(TEST_COOLDOWN_PERIOD.getNanos())); + } + /** * S3RepositoryPlugin that allows to disable chunked encoding and to set a low threshold between single upload and multipart upload. */