From de5a65ca7725bd983d6b389205f71a0ecb139b1f Mon Sep 17 00:00:00 2001 From: Armin Braun Date: Wed, 15 Jan 2020 14:49:41 +0100 Subject: [PATCH 1/6] bck --- .../repositories/s3/S3Repository.java | 23 +++++++++++++++++++ .../repositories/Repository.java | 6 +++++ 2 files changed, 29 insertions(+) diff --git a/plugins/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3Repository.java b/plugins/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3Repository.java index db6968ea71059..2d9064df27bd3 100644 --- a/plugins/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3Repository.java +++ b/plugins/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3Repository.java @@ -21,6 +21,7 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import org.elasticsearch.action.ActionListener; import org.elasticsearch.cluster.metadata.RepositoryMetaData; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.Strings; @@ -29,11 +30,15 @@ import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.unit.ByteSizeUnit; import org.elasticsearch.common.unit.ByteSizeValue; +import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.xcontent.NamedXContentRegistry; import org.elasticsearch.monitor.jvm.JvmInfo; +import org.elasticsearch.repositories.RepositoryData; import org.elasticsearch.repositories.RepositoryException; import org.elasticsearch.repositories.blobstore.BlobStoreRepository; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; import java.util.function.Function; /** @@ -126,6 +131,12 @@ class S3Repository extends BlobStoreRepository { static final Setting CLIENT_NAME = new Setting<>("client", "default", Function.identity()); + static final Setting COOLDOWN_PERIOD = Setting.timeSetting( + "cooldown_period", + new TimeValue(3, TimeUnit.MINUTES), + new TimeValue(0, TimeUnit.MILLISECONDS), + Setting.Property.Dynamic); + /** * Specifies the path within bucket to repository data. Defaults to root directory. */ @@ -145,6 +156,10 @@ class S3Repository extends BlobStoreRepository { private final String cannedACL; + private final long coolDown; + + private final AtomicLong lastWriteIndexN = new AtomicLong(-1L); + /** * Constructs an s3 backed repository */ @@ -176,6 +191,8 @@ class S3Repository extends BlobStoreRepository { this.storageClass = STORAGE_CLASS_SETTING.get(metadata.settings()); this.cannedACL = CANNED_ACL_SETTING.get(metadata.settings()); + coolDown = COOLDOWN_PERIOD.get(metadata.settings()).millis(); + logger.debug( "using bucket [{}], chunk_size [{}], server_side_encryption [{}], buffer_size [{}], cannedACL [{}], storageClass [{}]", bucket, @@ -186,6 +203,12 @@ class S3Repository extends BlobStoreRepository { storageClass); } + @Override + protected void writeIndexGen(RepositoryData repositoryData, long expectedGen, boolean writeShardGens, ActionListener listener) { + lastWriteIndexN.set(threadPool.relativeTimeInMillis()); + super.writeIndexGen(repositoryData, expectedGen, writeShardGens, listener); + } + private static BlobPath buildBasePath(RepositoryMetaData metadata) { final String basePath = BASE_PATH_SETTING.get(metadata.settings()); if (Strings.hasLength(basePath)) { diff --git a/server/src/main/java/org/elasticsearch/repositories/Repository.java b/server/src/main/java/org/elasticsearch/repositories/Repository.java index fab0a448007e8..f3701af563323 100644 --- a/server/src/main/java/org/elasticsearch/repositories/Repository.java +++ b/server/src/main/java/org/elasticsearch/repositories/Repository.java @@ -234,4 +234,10 @@ void restoreShard(Store store, SnapshotId snapshotId, IndexId indexId, ShardId s * @param state new cluster state */ void updateState(ClusterState state); + + /** + * Hook invoked on the master node before it delegates data nodes to start snapshotting shards via {@link #snapshotShard}. + */ + default void beforeSnapshotShards() { + } } From 5c25c52802e704a64aca0558229f243721ab1d3f Mon Sep 17 00:00:00 2001 From: Armin Braun Date: Wed, 15 Jan 2020 22:58:24 +0100 Subject: [PATCH 2/6] Add Cooldown Period to S3 Repository WIP, still missing tests but would like to confirm we agree on the approach taken here first. --- .../repositories/s3/S3Repository.java | 93 +++++++++++++++++-- 1 file changed, 83 insertions(+), 10 deletions(-) diff --git a/plugins/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3Repository.java b/plugins/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3Repository.java index 2d9064df27bd3..8bd187de0b4ac 100644 --- a/plugins/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3Repository.java +++ b/plugins/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3Repository.java @@ -22,6 +22,7 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.elasticsearch.action.ActionListener; +import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.metadata.RepositoryMetaData; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.Strings; @@ -33,12 +34,19 @@ import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.xcontent.NamedXContentRegistry; import org.elasticsearch.monitor.jvm.JvmInfo; -import org.elasticsearch.repositories.RepositoryData; import org.elasticsearch.repositories.RepositoryException; +import org.elasticsearch.repositories.ShardGenerations; import org.elasticsearch.repositories.blobstore.BlobStoreRepository; - +import org.elasticsearch.snapshots.SnapshotId; +import org.elasticsearch.snapshots.SnapshotInfo; +import org.elasticsearch.snapshots.SnapshotShardFailure; +import org.elasticsearch.threadpool.Scheduler; +import org.elasticsearch.threadpool.ThreadPool; + +import java.util.List; +import java.util.Map; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReference; import java.util.function.Function; /** @@ -131,6 +139,13 @@ class S3Repository extends BlobStoreRepository { static final Setting CLIENT_NAME = new Setting<>("client", "default", Function.identity()); + /** + * Artificial delay to introduce after a snapshot finalization or delete has finished so long as the repository is still using the + * backwards compatible snapshot format from before + * {@link org.elasticsearch.snapshots.SnapshotsService#SHARD_GEN_IN_REPO_DATA_VERSION}. + * This delay is necessary so that the eventually consistent nature of AWS S3 does not randomly result in repository corruption when + * doing repository operations in rapid succession. + */ static final Setting COOLDOWN_PERIOD = Setting.timeSetting( "cooldown_period", new TimeValue(3, TimeUnit.MINUTES), @@ -156,9 +171,11 @@ class S3Repository extends BlobStoreRepository { private final String cannedACL; - private final long coolDown; - - private final AtomicLong lastWriteIndexN = new AtomicLong(-1L); + /** + * Time period to delay repository operations by after finalizing or deleting a snapshot. + * See {@link #COOLDOWN_PERIOD} for details. + */ + private final TimeValue coolDown; /** * Constructs an s3 backed repository @@ -191,7 +208,7 @@ class S3Repository extends BlobStoreRepository { this.storageClass = STORAGE_CLASS_SETTING.get(metadata.settings()); this.cannedACL = CANNED_ACL_SETTING.get(metadata.settings()); - coolDown = COOLDOWN_PERIOD.get(metadata.settings()).millis(); + coolDown = COOLDOWN_PERIOD.get(metadata.settings()); logger.debug( "using bucket [{}], chunk_size [{}], server_side_encryption [{}], buffer_size [{}], cannedACL [{}], storageClass [{}]", @@ -203,10 +220,56 @@ class S3Repository extends BlobStoreRepository { storageClass); } + /** + * Holds a reference to delayed repository operation {@link Scheduler.Cancellable} so it can be cancelled should the repository be + * closed concurrently. + */ + private final AtomicReference finalizationFuture = new AtomicReference<>(); + + @Override + public void finalizeSnapshot(SnapshotId snapshotId, ShardGenerations shardGenerations, long startTime, String failure, int totalShards, + List shardFailures, long repositoryStateId, boolean includeGlobalState, + MetaData clusterMetaData, Map userMetadata, boolean writeShardGens, + ActionListener listener) { + if (writeShardGens == false) { + listener = delayedListener(listener); + } + super.finalizeSnapshot(snapshotId, shardGenerations, startTime, failure, totalShards, shardFailures, repositoryStateId, + includeGlobalState, clusterMetaData, userMetadata, writeShardGens, listener); + } + @Override - protected void writeIndexGen(RepositoryData repositoryData, long expectedGen, boolean writeShardGens, ActionListener listener) { - lastWriteIndexN.set(threadPool.relativeTimeInMillis()); - super.writeIndexGen(repositoryData, expectedGen, writeShardGens, listener); + public void deleteSnapshot(SnapshotId snapshotId, long repositoryStateId, boolean writeShardGens, ActionListener listener) { + if (writeShardGens == false) { + listener = delayedListener(listener); + } + super.deleteSnapshot(snapshotId, repositoryStateId, writeShardGens, listener); + } + + /** + * Wraps given listener such that it is executed with a delay of {@link #coolDown} on the snapshot thread-pool after being invoked. + * See {@link #COOLDOWN_PERIOD} for details. + */ + private ActionListener delayedListener(ActionListener listener) { + final ActionListener wrappedListener = ActionListener.runBefore(listener, () -> { + final Scheduler.Cancellable cancellable = finalizationFuture.getAndSet(null); + assert cancellable != null; + }); + return new ActionListener<>() { + @Override + public void onResponse(T response) { + final Scheduler.Cancellable existing = finalizationFuture.getAndSet( + threadPool.schedule(() -> wrappedListener.onResponse(response), coolDown, ThreadPool.Names.SNAPSHOT)); + assert existing == null : "Already have an ongoing finalization " + finalizationFuture; + } + + @Override + public void onFailure(Exception e) { + final Scheduler.Cancellable existing = finalizationFuture.getAndSet( + threadPool.schedule(() -> wrappedListener.onFailure(e), coolDown, ThreadPool.Names.SNAPSHOT)); + assert existing == null : "Already have an ongoing finalization " + finalizationFuture; + } + }; } private static BlobPath buildBasePath(RepositoryMetaData metadata) { @@ -233,4 +296,14 @@ protected BlobStore getBlobStore() { protected ByteSizeValue chunkSize() { return chunkSize; } + + @Override + protected void doClose() { + final Scheduler.Cancellable cancellable = finalizationFuture.getAndSet(null); + if (cancellable != null) { + logger.warn("Repository closed during cooldown period"); + cancellable.cancel(); + } + super.doClose(); + } } From f4a8e098bd9952c8949ac017e72d657f4b5f7ad2 Mon Sep 17 00:00:00 2001 From: Armin Braun Date: Wed, 15 Jan 2020 23:00:07 +0100 Subject: [PATCH 3/6] cleanup --- .../java/org/elasticsearch/repositories/Repository.java | 6 ------ 1 file changed, 6 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/repositories/Repository.java b/server/src/main/java/org/elasticsearch/repositories/Repository.java index f3701af563323..fab0a448007e8 100644 --- a/server/src/main/java/org/elasticsearch/repositories/Repository.java +++ b/server/src/main/java/org/elasticsearch/repositories/Repository.java @@ -234,10 +234,4 @@ void restoreShard(Store store, SnapshotId snapshotId, IndexId indexId, ShardId s * @param state new cluster state */ void updateState(ClusterState state); - - /** - * Hook invoked on the master node before it delegates data nodes to start snapshotting shards via {@link #snapshotShard}. - */ - default void beforeSnapshotShards() { - } } From f9047d7738f432dea04cd21efb1128d1686fb1be Mon Sep 17 00:00:00 2001 From: Armin Braun Date: Thu, 16 Jan 2020 22:53:20 +0100 Subject: [PATCH 4/6] test + docs --- .../repositories/s3/S3Repository.java | 20 ++++++- .../s3/S3BlobStoreRepositoryTests.java | 54 +++++++++++++++++++ 2 files changed, 72 insertions(+), 2 deletions(-) diff --git a/plugins/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3Repository.java b/plugins/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3Repository.java index 8bd187de0b4ac..ccba064c8fbb4 100644 --- a/plugins/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3Repository.java +++ b/plugins/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3Repository.java @@ -40,6 +40,7 @@ import org.elasticsearch.snapshots.SnapshotId; import org.elasticsearch.snapshots.SnapshotInfo; import org.elasticsearch.snapshots.SnapshotShardFailure; +import org.elasticsearch.snapshots.SnapshotsService; import org.elasticsearch.threadpool.Scheduler; import org.elasticsearch.threadpool.ThreadPool; @@ -142,9 +143,13 @@ class S3Repository extends BlobStoreRepository { /** * Artificial delay to introduce after a snapshot finalization or delete has finished so long as the repository is still using the * backwards compatible snapshot format from before - * {@link org.elasticsearch.snapshots.SnapshotsService#SHARD_GEN_IN_REPO_DATA_VERSION}. + * {@link org.elasticsearch.snapshots.SnapshotsService#SHARD_GEN_IN_REPO_DATA_VERSION} ({@link org.elasticsearch.Version#V_7_6_0}). * This delay is necessary so that the eventually consistent nature of AWS S3 does not randomly result in repository corruption when - * doing repository operations in rapid succession. + * doing repository operations in rapid succession on a repository in the old metadata format. + * This setting should not be adjusted in production when working with an AWS S3 backed repository. Doing so risks the repository + * becoming silently corrupted. To get rid of this waiting period, either create a new S3 repository or remove all snapshots older than + * {@link org.elasticsearch.Version#V_7_6_0} from the repository which will trigger an upgrade of the repository metadata to the new + * format and disable the cooldown period. */ static final Setting COOLDOWN_PERIOD = Setting.timeSetting( "cooldown_period", @@ -258,6 +263,7 @@ private ActionListener delayedListener(ActionListener listener) { return new ActionListener<>() { @Override public void onResponse(T response) { + logCooldownInfo(); final Scheduler.Cancellable existing = finalizationFuture.getAndSet( threadPool.schedule(() -> wrappedListener.onResponse(response), coolDown, ThreadPool.Names.SNAPSHOT)); assert existing == null : "Already have an ongoing finalization " + finalizationFuture; @@ -265,6 +271,7 @@ public void onResponse(T response) { @Override public void onFailure(Exception e) { + logCooldownInfo(); final Scheduler.Cancellable existing = finalizationFuture.getAndSet( threadPool.schedule(() -> wrappedListener.onFailure(e), coolDown, ThreadPool.Names.SNAPSHOT)); assert existing == null : "Already have an ongoing finalization " + finalizationFuture; @@ -272,6 +279,15 @@ public void onFailure(Exception e) { }; } + private void logCooldownInfo() { + logger.info("Sleeping for [{}] after modifying repository [{}] because it contains snapshots older than version [{}]" + + " and therefore is using a backwards compatible metadata format that requires this cooldown period to avoid " + + "repository corruption. To get rid of this message and move to the new repository metadata format, either remove " + + "all snapshots older than version [{}] from the repository or create a new repository at an empty location.", + coolDown, metadata.name(), SnapshotsService.SHARD_GEN_IN_REPO_DATA_VERSION, + SnapshotsService.SHARD_GEN_IN_REPO_DATA_VERSION); + } + private static BlobPath buildBasePath(RepositoryMetaData metadata) { final String basePath = BASE_PATH_SETTING.get(metadata.settings()); if (Strings.hasLength(basePath)) { diff --git a/plugins/repository-s3/src/test/java/org/elasticsearch/repositories/s3/S3BlobStoreRepositoryTests.java b/plugins/repository-s3/src/test/java/org/elasticsearch/repositories/s3/S3BlobStoreRepositoryTests.java index 3c26c99f8a528..9fcb08b2f0bf5 100644 --- a/plugins/repository-s3/src/test/java/org/elasticsearch/repositories/s3/S3BlobStoreRepositoryTests.java +++ b/plugins/repository-s3/src/test/java/org/elasticsearch/repositories/s3/S3BlobStoreRepositoryTests.java @@ -22,30 +22,48 @@ import com.sun.net.httpserver.HttpExchange; import com.sun.net.httpserver.HttpHandler; import fixture.s3.S3HttpHandler; +import org.elasticsearch.action.ActionRunnable; +import org.elasticsearch.action.support.PlainActionFuture; import org.elasticsearch.cluster.metadata.RepositoryMetaData; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.SuppressForbidden; import org.elasticsearch.common.blobstore.BlobContainer; import org.elasticsearch.common.blobstore.BlobPath; import org.elasticsearch.common.blobstore.BlobStore; +import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.settings.MockSecureSettings; import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.ByteSizeUnit; +import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.xcontent.NamedXContentRegistry; +import org.elasticsearch.common.xcontent.XContentFactory; import org.elasticsearch.plugins.Plugin; +import org.elasticsearch.repositories.RepositoriesService; +import org.elasticsearch.repositories.RepositoryData; +import org.elasticsearch.repositories.blobstore.BlobStoreRepository; import org.elasticsearch.repositories.blobstore.ESMockAPIBasedRepositoryIntegTestCase; +import org.elasticsearch.snapshots.SnapshotId; +import org.elasticsearch.snapshots.SnapshotsService; import org.elasticsearch.snapshots.mockstore.BlobStoreWrapper; +import org.elasticsearch.threadpool.ThreadPool; +import java.io.IOException; +import java.io.InputStream; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.List; import java.util.Map; +import static org.hamcrest.Matchers.greaterThan; +import static org.hamcrest.Matchers.lessThan; + @SuppressForbidden(reason = "this test uses a HttpServer to emulate an S3 endpoint") public class S3BlobStoreRepositoryTests extends ESMockAPIBasedRepositoryIntegTestCase { + private static final TimeValue TEST_COOLDOWN_PERIOD = TimeValue.timeValueSeconds(5L); + @Override protected String repositoryType() { return S3Repository.TYPE; @@ -82,6 +100,7 @@ protected Settings nodeSettings(int nodeOrdinal) { secureSettings.setString(S3ClientSettings.SECRET_KEY_SETTING.getConcreteSettingForNamespace("test").getKey(), "secret"); return Settings.builder() + .put(ThreadPool.ESTIMATED_TIME_INTERVAL_SETTING.getKey(), 0) // We have tests that verify an exact wait time .put(S3ClientSettings.ENDPOINT_SETTING.getConcreteSettingForNamespace("test").getKey(), httpServerUrl()) // Disable chunked encoding as it simplifies a lot the request parsing on the httpServer side .put(S3ClientSettings.DISABLE_CHUNKED_ENCODING.getConcreteSettingForNamespace("test").getKey(), true) @@ -92,6 +111,41 @@ protected Settings nodeSettings(int nodeOrdinal) { .build(); } + public void testEnforcedCooldownPeriod() throws IOException { + final String repoName = createRepository(randomName(), Settings.builder().put(repositorySettings()) + .put(S3Repository.COOLDOWN_PERIOD.getKey(), TEST_COOLDOWN_PERIOD).build()); + + final SnapshotId fakeOldSnapshot = client().admin().cluster().prepareCreateSnapshot(repoName, "snapshot-old") + .setWaitForCompletion(true).setIndices().get().getSnapshotInfo().snapshotId(); + final RepositoriesService repositoriesService = internalCluster().getInstance(RepositoriesService.class); + final BlobStoreRepository repository = (BlobStoreRepository) repositoriesService.repository(repoName); + final RepositoryData repositoryData = + PlainActionFuture.get(f -> repository.threadPool().generic().execute(() -> repository.getRepositoryData(f))); + final RepositoryData modifiedRepositoryData = repositoryData.withVersions(Collections.singletonMap(fakeOldSnapshot, + SnapshotsService.SHARD_GEN_IN_REPO_DATA_VERSION.minimumCompatibilityVersion())); + final BytesReference serialized = + BytesReference.bytes(modifiedRepositoryData.snapshotsToXContent(XContentFactory.jsonBuilder(), false)); + PlainActionFuture.get(f -> repository.threadPool().generic().execute(ActionRunnable.run(f, () -> { + try (InputStream stream = serialized.streamInput()) { + repository.blobStore().blobContainer(repository.basePath()).writeBlobAtomic( + BlobStoreRepository.INDEX_FILE_PREFIX + modifiedRepositoryData.getGenId(), stream, serialized.length(), true); + } + }))); + + final String newSnapshotName = "snapshot-new"; + final long beforeThrottledSnapshot = repository.threadPool().relativeTimeInNanos(); + client().admin().cluster().prepareCreateSnapshot(repoName, newSnapshotName).setWaitForCompletion(true).setIndices().get(); + assertThat(repository.threadPool().relativeTimeInNanos() - beforeThrottledSnapshot, greaterThan(TEST_COOLDOWN_PERIOD.getNanos())); + + final long beforeThrottledDelete = repository.threadPool().relativeTimeInNanos(); + client().admin().cluster().prepareDeleteSnapshot(repoName, newSnapshotName).get(); + assertThat(repository.threadPool().relativeTimeInNanos() - beforeThrottledDelete, greaterThan(TEST_COOLDOWN_PERIOD.getNanos())); + + final long beforeFastDelete = repository.threadPool().relativeTimeInNanos(); + client().admin().cluster().prepareDeleteSnapshot(repoName, fakeOldSnapshot.getName()).get(); + assertThat(repository.threadPool().relativeTimeInNanos() - beforeFastDelete, lessThan(TEST_COOLDOWN_PERIOD.getNanos())); + } + /** * S3RepositoryPlugin that allows to disable chunked encoding and to set a low threshold between single upload and multipart upload. */ From 4f5816701cd953fa16bcec12f14e5834037c6c98 Mon Sep 17 00:00:00 2001 From: Armin Braun Date: Fri, 17 Jan 2020 06:43:30 +0100 Subject: [PATCH 5/6] fix test --- .../repositories/s3/S3BlobStoreRepositoryTests.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/plugins/repository-s3/src/test/java/org/elasticsearch/repositories/s3/S3BlobStoreRepositoryTests.java b/plugins/repository-s3/src/test/java/org/elasticsearch/repositories/s3/S3BlobStoreRepositoryTests.java index 9fcb08b2f0bf5..f4f4990d1c5b4 100644 --- a/plugins/repository-s3/src/test/java/org/elasticsearch/repositories/s3/S3BlobStoreRepositoryTests.java +++ b/plugins/repository-s3/src/test/java/org/elasticsearch/repositories/s3/S3BlobStoreRepositoryTests.java @@ -117,7 +117,7 @@ public void testEnforcedCooldownPeriod() throws IOException { final SnapshotId fakeOldSnapshot = client().admin().cluster().prepareCreateSnapshot(repoName, "snapshot-old") .setWaitForCompletion(true).setIndices().get().getSnapshotInfo().snapshotId(); - final RepositoriesService repositoriesService = internalCluster().getInstance(RepositoriesService.class); + final RepositoriesService repositoriesService = internalCluster().getCurrentMasterNodeInstance(RepositoriesService.class); final BlobStoreRepository repository = (BlobStoreRepository) repositoriesService.repository(repoName); final RepositoryData repositoryData = PlainActionFuture.get(f -> repository.threadPool().generic().execute(() -> repository.getRepositoryData(f))); From 9f1c00d56d2af7fd72338fee56d48870f81f42f3 Mon Sep 17 00:00:00 2001 From: Armin Braun Date: Fri, 17 Jan 2020 23:58:50 +0100 Subject: [PATCH 6/6] CR comments --- .../org/elasticsearch/repositories/s3/S3Repository.java | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/plugins/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3Repository.java b/plugins/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3Repository.java index ccba064c8fbb4..91b5dfeb5f546 100644 --- a/plugins/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3Repository.java +++ b/plugins/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3Repository.java @@ -22,6 +22,7 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.ActionRunnable; import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.metadata.RepositoryMetaData; import org.elasticsearch.cluster.service.ClusterService; @@ -265,7 +266,8 @@ private ActionListener delayedListener(ActionListener listener) { public void onResponse(T response) { logCooldownInfo(); final Scheduler.Cancellable existing = finalizationFuture.getAndSet( - threadPool.schedule(() -> wrappedListener.onResponse(response), coolDown, ThreadPool.Names.SNAPSHOT)); + threadPool.schedule(ActionRunnable.wrap(wrappedListener, l -> l.onResponse(response)), + coolDown, ThreadPool.Names.SNAPSHOT)); assert existing == null : "Already have an ongoing finalization " + finalizationFuture; } @@ -273,7 +275,7 @@ public void onResponse(T response) { public void onFailure(Exception e) { logCooldownInfo(); final Scheduler.Cancellable existing = finalizationFuture.getAndSet( - threadPool.schedule(() -> wrappedListener.onFailure(e), coolDown, ThreadPool.Names.SNAPSHOT)); + threadPool.schedule(ActionRunnable.wrap(wrappedListener, l -> l.onFailure(e)), coolDown, ThreadPool.Names.SNAPSHOT)); assert existing == null : "Already have an ongoing finalization " + finalizationFuture; } }; @@ -317,7 +319,7 @@ protected ByteSizeValue chunkSize() { protected void doClose() { final Scheduler.Cancellable cancellable = finalizationFuture.getAndSet(null); if (cancellable != null) { - logger.warn("Repository closed during cooldown period"); + logger.debug("Repository [{}] closed during cool-down period", metadata.name()); cancellable.cancel(); } super.doClose();