From 16051c604615ef9461172e8caeb15eadb83b454a Mon Sep 17 00:00:00 2001 From: Armin Braun Date: Mon, 20 Jan 2020 10:32:21 +0100 Subject: [PATCH] Add CoolDown Period to S3 Repository (#51074) Add cool down period after snapshot finalization and delete to prevent eventually consistent AWS S3 from corrupting shard level metadata as long as the repository is using the old format metadata on the shard level. --- .../repositories/s3/S3Repository.java | 116 +++++++++++++++++- .../s3/S3BlobStoreRepositoryTests.java | 54 ++++++++ 2 files changed, 169 insertions(+), 1 deletion(-) diff --git a/plugins/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3Repository.java b/plugins/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3Repository.java index 7a59463feb0e8..0e1ebcfdd7884 100644 --- a/plugins/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3Repository.java +++ b/plugins/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3Repository.java @@ -21,6 +21,9 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.ActionRunnable; +import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.metadata.RepositoryMetaData; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.Strings; @@ -32,11 +35,23 @@ import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.unit.ByteSizeUnit; import org.elasticsearch.common.unit.ByteSizeValue; +import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.xcontent.NamedXContentRegistry; import org.elasticsearch.monitor.jvm.JvmInfo; import org.elasticsearch.repositories.RepositoryException; +import org.elasticsearch.repositories.ShardGenerations; import org.elasticsearch.repositories.blobstore.BlobStoreRepository; - +import org.elasticsearch.snapshots.SnapshotId; +import org.elasticsearch.snapshots.SnapshotInfo; +import org.elasticsearch.snapshots.SnapshotShardFailure; +import org.elasticsearch.snapshots.SnapshotsService; +import org.elasticsearch.threadpool.Scheduler; +import org.elasticsearch.threadpool.ThreadPool; + +import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; import java.util.function.Function; /** @@ -142,6 +157,23 @@ class S3Repository extends BlobStoreRepository { static final Setting CLIENT_NAME = new Setting<>("client", "default", Function.identity()); + /** + * Artificial delay to introduce after a snapshot finalization or delete has finished so long as the repository is still using the + * backwards compatible snapshot format from before + * {@link org.elasticsearch.snapshots.SnapshotsService#SHARD_GEN_IN_REPO_DATA_VERSION} ({@link org.elasticsearch.Version#V_7_6_0}). + * This delay is necessary so that the eventually consistent nature of AWS S3 does not randomly result in repository corruption when + * doing repository operations in rapid succession on a repository in the old metadata format. + * This setting should not be adjusted in production when working with an AWS S3 backed repository. Doing so risks the repository + * becoming silently corrupted. To get rid of this waiting period, either create a new S3 repository or remove all snapshots older than + * {@link org.elasticsearch.Version#V_7_6_0} from the repository which will trigger an upgrade of the repository metadata to the new + * format and disable the cooldown period. + */ + static final Setting COOLDOWN_PERIOD = Setting.timeSetting( + "cooldown_period", + new TimeValue(3, TimeUnit.MINUTES), + new TimeValue(0, TimeUnit.MILLISECONDS), + Setting.Property.Dynamic); + /** * Specifies the path within bucket to repository data. Defaults to root directory. */ @@ -165,6 +197,12 @@ class S3Repository extends BlobStoreRepository { private final RepositoryMetaData repositoryMetaData; + /** + * Time period to delay repository operations by after finalizing or deleting a snapshot. + * See {@link #COOLDOWN_PERIOD} for details. + */ + private final TimeValue coolDown; + /** * Constructs an s3 backed repository */ @@ -211,6 +249,8 @@ class S3Repository extends BlobStoreRepository { + "store these in named clients and the elasticsearch keystore for secure settings."); } + coolDown = COOLDOWN_PERIOD.get(metadata.settings()); + logger.debug( "using bucket [{}], chunk_size [{}], server_side_encryption [{}], buffer_size [{}], cannedACL [{}], storageClass [{}]", bucket, @@ -221,6 +261,70 @@ class S3Repository extends BlobStoreRepository { storageClass); } + /** + * Holds a reference to delayed repository operation {@link Scheduler.Cancellable} so it can be cancelled should the repository be + * closed concurrently. + */ + private final AtomicReference finalizationFuture = new AtomicReference<>(); + + @Override + public void finalizeSnapshot(SnapshotId snapshotId, ShardGenerations shardGenerations, long startTime, String failure, int totalShards, + List shardFailures, long repositoryStateId, boolean includeGlobalState, + MetaData clusterMetaData, Map userMetadata, boolean writeShardGens, + ActionListener listener) { + if (writeShardGens == false) { + listener = delayedListener(listener); + } + super.finalizeSnapshot(snapshotId, shardGenerations, startTime, failure, totalShards, shardFailures, repositoryStateId, + includeGlobalState, clusterMetaData, userMetadata, writeShardGens, listener); + } + + @Override + public void deleteSnapshot(SnapshotId snapshotId, long repositoryStateId, boolean writeShardGens, ActionListener listener) { + if (writeShardGens == false) { + listener = delayedListener(listener); + } + super.deleteSnapshot(snapshotId, repositoryStateId, writeShardGens, listener); + } + + /** + * Wraps given listener such that it is executed with a delay of {@link #coolDown} on the snapshot thread-pool after being invoked. + * See {@link #COOLDOWN_PERIOD} for details. + */ + private ActionListener delayedListener(ActionListener listener) { + final ActionListener wrappedListener = ActionListener.runBefore(listener, () -> { + final Scheduler.Cancellable cancellable = finalizationFuture.getAndSet(null); + assert cancellable != null; + }); + return new ActionListener() { + @Override + public void onResponse(T response) { + logCooldownInfo(); + final Scheduler.Cancellable existing = finalizationFuture.getAndSet( + threadPool.schedule(ActionRunnable.wrap(wrappedListener, l -> l.onResponse(response)), + coolDown, ThreadPool.Names.SNAPSHOT)); + assert existing == null : "Already have an ongoing finalization " + finalizationFuture; + } + + @Override + public void onFailure(Exception e) { + logCooldownInfo(); + final Scheduler.Cancellable existing = finalizationFuture.getAndSet( + threadPool.schedule(ActionRunnable.wrap(wrappedListener, l -> l.onFailure(e)), coolDown, ThreadPool.Names.SNAPSHOT)); + assert existing == null : "Already have an ongoing finalization " + finalizationFuture; + } + }; + } + + private void logCooldownInfo() { + logger.info("Sleeping for [{}] after modifying repository [{}] because it contains snapshots older than version [{}]" + + " and therefore is using a backwards compatible metadata format that requires this cooldown period to avoid " + + "repository corruption. To get rid of this message and move to the new repository metadata format, either remove " + + "all snapshots older than version [{}] from the repository or create a new repository at an empty location.", + coolDown, metadata.name(), SnapshotsService.SHARD_GEN_IN_REPO_DATA_VERSION, + SnapshotsService.SHARD_GEN_IN_REPO_DATA_VERSION); + } + @Override protected S3BlobStore createBlobStore() { return new S3BlobStore(service, bucket, serverSideEncryption, bufferSize, cannedACL, storageClass, repositoryMetaData); @@ -241,4 +345,14 @@ public BlobPath basePath() { protected ByteSizeValue chunkSize() { return chunkSize; } + + @Override + protected void doClose() { + final Scheduler.Cancellable cancellable = finalizationFuture.getAndSet(null); + if (cancellable != null) { + logger.debug("Repository [{}] closed during cool-down period", metadata.name()); + cancellable.cancel(); + } + super.doClose(); + } } diff --git a/plugins/repository-s3/src/test/java/org/elasticsearch/repositories/s3/S3BlobStoreRepositoryTests.java b/plugins/repository-s3/src/test/java/org/elasticsearch/repositories/s3/S3BlobStoreRepositoryTests.java index 3c26c99f8a528..f4f4990d1c5b4 100644 --- a/plugins/repository-s3/src/test/java/org/elasticsearch/repositories/s3/S3BlobStoreRepositoryTests.java +++ b/plugins/repository-s3/src/test/java/org/elasticsearch/repositories/s3/S3BlobStoreRepositoryTests.java @@ -22,30 +22,48 @@ import com.sun.net.httpserver.HttpExchange; import com.sun.net.httpserver.HttpHandler; import fixture.s3.S3HttpHandler; +import org.elasticsearch.action.ActionRunnable; +import org.elasticsearch.action.support.PlainActionFuture; import org.elasticsearch.cluster.metadata.RepositoryMetaData; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.SuppressForbidden; import org.elasticsearch.common.blobstore.BlobContainer; import org.elasticsearch.common.blobstore.BlobPath; import org.elasticsearch.common.blobstore.BlobStore; +import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.settings.MockSecureSettings; import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.ByteSizeUnit; +import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.xcontent.NamedXContentRegistry; +import org.elasticsearch.common.xcontent.XContentFactory; import org.elasticsearch.plugins.Plugin; +import org.elasticsearch.repositories.RepositoriesService; +import org.elasticsearch.repositories.RepositoryData; +import org.elasticsearch.repositories.blobstore.BlobStoreRepository; import org.elasticsearch.repositories.blobstore.ESMockAPIBasedRepositoryIntegTestCase; +import org.elasticsearch.snapshots.SnapshotId; +import org.elasticsearch.snapshots.SnapshotsService; import org.elasticsearch.snapshots.mockstore.BlobStoreWrapper; +import org.elasticsearch.threadpool.ThreadPool; +import java.io.IOException; +import java.io.InputStream; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.List; import java.util.Map; +import static org.hamcrest.Matchers.greaterThan; +import static org.hamcrest.Matchers.lessThan; + @SuppressForbidden(reason = "this test uses a HttpServer to emulate an S3 endpoint") public class S3BlobStoreRepositoryTests extends ESMockAPIBasedRepositoryIntegTestCase { + private static final TimeValue TEST_COOLDOWN_PERIOD = TimeValue.timeValueSeconds(5L); + @Override protected String repositoryType() { return S3Repository.TYPE; @@ -82,6 +100,7 @@ protected Settings nodeSettings(int nodeOrdinal) { secureSettings.setString(S3ClientSettings.SECRET_KEY_SETTING.getConcreteSettingForNamespace("test").getKey(), "secret"); return Settings.builder() + .put(ThreadPool.ESTIMATED_TIME_INTERVAL_SETTING.getKey(), 0) // We have tests that verify an exact wait time .put(S3ClientSettings.ENDPOINT_SETTING.getConcreteSettingForNamespace("test").getKey(), httpServerUrl()) // Disable chunked encoding as it simplifies a lot the request parsing on the httpServer side .put(S3ClientSettings.DISABLE_CHUNKED_ENCODING.getConcreteSettingForNamespace("test").getKey(), true) @@ -92,6 +111,41 @@ protected Settings nodeSettings(int nodeOrdinal) { .build(); } + public void testEnforcedCooldownPeriod() throws IOException { + final String repoName = createRepository(randomName(), Settings.builder().put(repositorySettings()) + .put(S3Repository.COOLDOWN_PERIOD.getKey(), TEST_COOLDOWN_PERIOD).build()); + + final SnapshotId fakeOldSnapshot = client().admin().cluster().prepareCreateSnapshot(repoName, "snapshot-old") + .setWaitForCompletion(true).setIndices().get().getSnapshotInfo().snapshotId(); + final RepositoriesService repositoriesService = internalCluster().getCurrentMasterNodeInstance(RepositoriesService.class); + final BlobStoreRepository repository = (BlobStoreRepository) repositoriesService.repository(repoName); + final RepositoryData repositoryData = + PlainActionFuture.get(f -> repository.threadPool().generic().execute(() -> repository.getRepositoryData(f))); + final RepositoryData modifiedRepositoryData = repositoryData.withVersions(Collections.singletonMap(fakeOldSnapshot, + SnapshotsService.SHARD_GEN_IN_REPO_DATA_VERSION.minimumCompatibilityVersion())); + final BytesReference serialized = + BytesReference.bytes(modifiedRepositoryData.snapshotsToXContent(XContentFactory.jsonBuilder(), false)); + PlainActionFuture.get(f -> repository.threadPool().generic().execute(ActionRunnable.run(f, () -> { + try (InputStream stream = serialized.streamInput()) { + repository.blobStore().blobContainer(repository.basePath()).writeBlobAtomic( + BlobStoreRepository.INDEX_FILE_PREFIX + modifiedRepositoryData.getGenId(), stream, serialized.length(), true); + } + }))); + + final String newSnapshotName = "snapshot-new"; + final long beforeThrottledSnapshot = repository.threadPool().relativeTimeInNanos(); + client().admin().cluster().prepareCreateSnapshot(repoName, newSnapshotName).setWaitForCompletion(true).setIndices().get(); + assertThat(repository.threadPool().relativeTimeInNanos() - beforeThrottledSnapshot, greaterThan(TEST_COOLDOWN_PERIOD.getNanos())); + + final long beforeThrottledDelete = repository.threadPool().relativeTimeInNanos(); + client().admin().cluster().prepareDeleteSnapshot(repoName, newSnapshotName).get(); + assertThat(repository.threadPool().relativeTimeInNanos() - beforeThrottledDelete, greaterThan(TEST_COOLDOWN_PERIOD.getNanos())); + + final long beforeFastDelete = repository.threadPool().relativeTimeInNanos(); + client().admin().cluster().prepareDeleteSnapshot(repoName, fakeOldSnapshot.getName()).get(); + assertThat(repository.threadPool().relativeTimeInNanos() - beforeFastDelete, lessThan(TEST_COOLDOWN_PERIOD.getNanos())); + } + /** * S3RepositoryPlugin that allows to disable chunked encoding and to set a low threshold between single upload and multipart upload. */