-
Notifications
You must be signed in to change notification settings - Fork 24.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add CoolDown Period to S3 Repository #51074
Changes from 6 commits
de5a65c
c14ac7c
5c25c52
f4a8e09
d0ffb44
f9047d7
4f58167
f9ad026
9f1c00d
88ffa99
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -21,6 +21,8 @@ | |
|
||
import org.apache.logging.log4j.LogManager; | ||
import org.apache.logging.log4j.Logger; | ||
import org.elasticsearch.action.ActionListener; | ||
import org.elasticsearch.cluster.metadata.MetaData; | ||
import org.elasticsearch.cluster.metadata.RepositoryMetaData; | ||
import org.elasticsearch.cluster.service.ClusterService; | ||
import org.elasticsearch.common.Strings; | ||
|
@@ -29,11 +31,23 @@ | |
import org.elasticsearch.common.settings.Setting; | ||
import org.elasticsearch.common.unit.ByteSizeUnit; | ||
import org.elasticsearch.common.unit.ByteSizeValue; | ||
import org.elasticsearch.common.unit.TimeValue; | ||
import org.elasticsearch.common.xcontent.NamedXContentRegistry; | ||
import org.elasticsearch.monitor.jvm.JvmInfo; | ||
import org.elasticsearch.repositories.RepositoryException; | ||
import org.elasticsearch.repositories.ShardGenerations; | ||
import org.elasticsearch.repositories.blobstore.BlobStoreRepository; | ||
|
||
import org.elasticsearch.snapshots.SnapshotId; | ||
import org.elasticsearch.snapshots.SnapshotInfo; | ||
import org.elasticsearch.snapshots.SnapshotShardFailure; | ||
import org.elasticsearch.snapshots.SnapshotsService; | ||
import org.elasticsearch.threadpool.Scheduler; | ||
import org.elasticsearch.threadpool.ThreadPool; | ||
|
||
import java.util.List; | ||
import java.util.Map; | ||
import java.util.concurrent.TimeUnit; | ||
import java.util.concurrent.atomic.AtomicReference; | ||
import java.util.function.Function; | ||
|
||
/** | ||
|
@@ -126,6 +140,23 @@ class S3Repository extends BlobStoreRepository { | |
|
||
static final Setting<String> CLIENT_NAME = new Setting<>("client", "default", Function.identity()); | ||
|
||
/** | ||
* Artificial delay to introduce after a snapshot finalization or delete has finished so long as the repository is still using the | ||
* backwards compatible snapshot format from before | ||
* {@link org.elasticsearch.snapshots.SnapshotsService#SHARD_GEN_IN_REPO_DATA_VERSION} ({@link org.elasticsearch.Version#V_7_6_0}). | ||
* This delay is necessary so that the eventually consistent nature of AWS S3 does not randomly result in repository corruption when | ||
* doing repository operations in rapid succession on a repository in the old metadata format. | ||
* This setting should not be adjusted in production when working with an AWS S3 backed repository. Doing so risks the repository | ||
* becoming silently corrupted. To get rid of this waiting period, either create a new S3 repository or remove all snapshots older than | ||
* {@link org.elasticsearch.Version#V_7_6_0} from the repository which will trigger an upgrade of the repository metadata to the new | ||
* format and disable the cooldown period. | ||
*/ | ||
static final Setting<TimeValue> COOLDOWN_PERIOD = Setting.timeSetting( | ||
"cooldown_period", | ||
new TimeValue(3, TimeUnit.MINUTES), | ||
new TimeValue(0, TimeUnit.MILLISECONDS), | ||
Setting.Property.Dynamic); | ||
|
||
/** | ||
* Specifies the path within bucket to repository data. Defaults to root directory. | ||
*/ | ||
|
@@ -145,6 +176,12 @@ class S3Repository extends BlobStoreRepository { | |
|
||
private final String cannedACL; | ||
|
||
/** | ||
* Time period to delay repository operations by after finalizing or deleting a snapshot. | ||
* See {@link #COOLDOWN_PERIOD} for details. | ||
*/ | ||
private final TimeValue coolDown; | ||
|
||
/** | ||
* Constructs an s3 backed repository | ||
*/ | ||
|
@@ -176,6 +213,8 @@ class S3Repository extends BlobStoreRepository { | |
this.storageClass = STORAGE_CLASS_SETTING.get(metadata.settings()); | ||
this.cannedACL = CANNED_ACL_SETTING.get(metadata.settings()); | ||
|
||
coolDown = COOLDOWN_PERIOD.get(metadata.settings()); | ||
|
||
logger.debug( | ||
"using bucket [{}], chunk_size [{}], server_side_encryption [{}], buffer_size [{}], cannedACL [{}], storageClass [{}]", | ||
bucket, | ||
|
@@ -186,6 +225,69 @@ class S3Repository extends BlobStoreRepository { | |
storageClass); | ||
} | ||
|
||
/** | ||
* Holds a reference to delayed repository operation {@link Scheduler.Cancellable} so it can be cancelled should the repository be | ||
* closed concurrently. | ||
*/ | ||
private final AtomicReference<Scheduler.Cancellable> finalizationFuture = new AtomicReference<>(); | ||
|
||
@Override | ||
public void finalizeSnapshot(SnapshotId snapshotId, ShardGenerations shardGenerations, long startTime, String failure, int totalShards, | ||
List<SnapshotShardFailure> shardFailures, long repositoryStateId, boolean includeGlobalState, | ||
MetaData clusterMetaData, Map<String, Object> userMetadata, boolean writeShardGens, | ||
ActionListener<SnapshotInfo> listener) { | ||
if (writeShardGens == false) { | ||
listener = delayedListener(listener); | ||
} | ||
super.finalizeSnapshot(snapshotId, shardGenerations, startTime, failure, totalShards, shardFailures, repositoryStateId, | ||
includeGlobalState, clusterMetaData, userMetadata, writeShardGens, listener); | ||
} | ||
|
||
@Override | ||
public void deleteSnapshot(SnapshotId snapshotId, long repositoryStateId, boolean writeShardGens, ActionListener<Void> listener) { | ||
if (writeShardGens == false) { | ||
listener = delayedListener(listener); | ||
} | ||
super.deleteSnapshot(snapshotId, repositoryStateId, writeShardGens, listener); | ||
} | ||
|
||
/** | ||
* Wraps given listener such that it is executed with a delay of {@link #coolDown} on the snapshot thread-pool after being invoked. | ||
* See {@link #COOLDOWN_PERIOD} for details. | ||
*/ | ||
private <T> ActionListener<T> delayedListener(ActionListener<T> listener) { | ||
final ActionListener<T> wrappedListener = ActionListener.runBefore(listener, () -> { | ||
final Scheduler.Cancellable cancellable = finalizationFuture.getAndSet(null); | ||
assert cancellable != null; | ||
}); | ||
return new ActionListener<>() { | ||
@Override | ||
public void onResponse(T response) { | ||
logCooldownInfo(); | ||
final Scheduler.Cancellable existing = finalizationFuture.getAndSet( | ||
threadPool.schedule(() -> wrappedListener.onResponse(response), coolDown, ThreadPool.Names.SNAPSHOT)); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. what if we are rejected from the snapshot threadpool? Let's force it onto the threadpool (use AbstractRunnable), and notify listener as welll on AbstractRunnable.onFaillure There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
I think that's impossible unless the snapshot pool is shutting down (in which case it's kinda irrelevant what we do anyway I guess). No other action will go onto the snapshot pool until this listener is resolved (because the snapshot or delete in progress in the CS will prevent anything else from running + we specifically made it so that no steps in the snapshot operations runs on the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I wouldn't bet my life on that (think e.g. about master failover). Let's make this safe. |
||
assert existing == null : "Already have an ongoing finalization " + finalizationFuture; | ||
} | ||
|
||
@Override | ||
public void onFailure(Exception e) { | ||
logCooldownInfo(); | ||
final Scheduler.Cancellable existing = finalizationFuture.getAndSet( | ||
threadPool.schedule(() -> wrappedListener.onFailure(e), coolDown, ThreadPool.Names.SNAPSHOT)); | ||
assert existing == null : "Already have an ongoing finalization " + finalizationFuture; | ||
} | ||
}; | ||
} | ||
|
||
private void logCooldownInfo() { | ||
logger.info("Sleeping for [{}] after modifying repository [{}] because it contains snapshots older than version [{}]" + | ||
" and therefore is using a backwards compatible metadata format that requires this cooldown period to avoid " + | ||
"repository corruption. To get rid of this message and move to the new repository metadata format, either remove " + | ||
"all snapshots older than version [{}] from the repository or create a new repository at an empty location.", | ||
coolDown, metadata.name(), SnapshotsService.SHARD_GEN_IN_REPO_DATA_VERSION, | ||
SnapshotsService.SHARD_GEN_IN_REPO_DATA_VERSION); | ||
} | ||
|
||
private static BlobPath buildBasePath(RepositoryMetaData metadata) { | ||
final String basePath = BASE_PATH_SETTING.get(metadata.settings()); | ||
if (Strings.hasLength(basePath)) { | ||
|
@@ -210,4 +312,14 @@ protected BlobStore getBlobStore() { | |
protected ByteSizeValue chunkSize() { | ||
return chunkSize; | ||
} | ||
|
||
@Override | ||
protected void doClose() { | ||
final Scheduler.Cancellable cancellable = finalizationFuture.getAndSet(null); | ||
if (cancellable != null) { | ||
logger.warn("Repository closed during cooldown period"); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Do we need to log this at warn level? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
... retracted see below There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Actually the situation here is better than I described above ... since if we're running into this we're always re-running the last step of the delete or snapshot operation on the next master (and will fail there because the repository generation has already moved) which will trigger another wait period. So this isn't a bad spot at all :) => moving this to |
||
cancellable.cancel(); | ||
} | ||
super.doClose(); | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -22,30 +22,48 @@ | |
import com.sun.net.httpserver.HttpExchange; | ||
import com.sun.net.httpserver.HttpHandler; | ||
import fixture.s3.S3HttpHandler; | ||
import org.elasticsearch.action.ActionRunnable; | ||
import org.elasticsearch.action.support.PlainActionFuture; | ||
import org.elasticsearch.cluster.metadata.RepositoryMetaData; | ||
import org.elasticsearch.cluster.service.ClusterService; | ||
import org.elasticsearch.common.SuppressForbidden; | ||
import org.elasticsearch.common.blobstore.BlobContainer; | ||
import org.elasticsearch.common.blobstore.BlobPath; | ||
import org.elasticsearch.common.blobstore.BlobStore; | ||
import org.elasticsearch.common.bytes.BytesReference; | ||
import org.elasticsearch.common.settings.MockSecureSettings; | ||
import org.elasticsearch.common.settings.Setting; | ||
import org.elasticsearch.common.settings.Settings; | ||
import org.elasticsearch.common.unit.ByteSizeUnit; | ||
import org.elasticsearch.common.unit.TimeValue; | ||
import org.elasticsearch.common.xcontent.NamedXContentRegistry; | ||
import org.elasticsearch.common.xcontent.XContentFactory; | ||
import org.elasticsearch.plugins.Plugin; | ||
import org.elasticsearch.repositories.RepositoriesService; | ||
import org.elasticsearch.repositories.RepositoryData; | ||
import org.elasticsearch.repositories.blobstore.BlobStoreRepository; | ||
import org.elasticsearch.repositories.blobstore.ESMockAPIBasedRepositoryIntegTestCase; | ||
import org.elasticsearch.snapshots.SnapshotId; | ||
import org.elasticsearch.snapshots.SnapshotsService; | ||
import org.elasticsearch.snapshots.mockstore.BlobStoreWrapper; | ||
import org.elasticsearch.threadpool.ThreadPool; | ||
|
||
import java.io.IOException; | ||
import java.io.InputStream; | ||
import java.util.ArrayList; | ||
import java.util.Collection; | ||
import java.util.Collections; | ||
import java.util.List; | ||
import java.util.Map; | ||
|
||
import static org.hamcrest.Matchers.greaterThan; | ||
import static org.hamcrest.Matchers.lessThan; | ||
|
||
@SuppressForbidden(reason = "this test uses a HttpServer to emulate an S3 endpoint") | ||
public class S3BlobStoreRepositoryTests extends ESMockAPIBasedRepositoryIntegTestCase { | ||
|
||
private static final TimeValue TEST_COOLDOWN_PERIOD = TimeValue.timeValueSeconds(5L); | ||
|
||
@Override | ||
protected String repositoryType() { | ||
return S3Repository.TYPE; | ||
|
@@ -82,6 +100,7 @@ protected Settings nodeSettings(int nodeOrdinal) { | |
secureSettings.setString(S3ClientSettings.SECRET_KEY_SETTING.getConcreteSettingForNamespace("test").getKey(), "secret"); | ||
|
||
return Settings.builder() | ||
.put(ThreadPool.ESTIMATED_TIME_INTERVAL_SETTING.getKey(), 0) // We have tests that verify an exact wait time | ||
.put(S3ClientSettings.ENDPOINT_SETTING.getConcreteSettingForNamespace("test").getKey(), httpServerUrl()) | ||
// Disable chunked encoding as it simplifies a lot the request parsing on the httpServer side | ||
.put(S3ClientSettings.DISABLE_CHUNKED_ENCODING.getConcreteSettingForNamespace("test").getKey(), true) | ||
|
@@ -92,6 +111,41 @@ protected Settings nodeSettings(int nodeOrdinal) { | |
.build(); | ||
} | ||
|
||
public void testEnforcedCooldownPeriod() throws IOException { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is admittedly quite the hacky test and it takes 2x 5s of hard waits to verify behaviour. We could create a cleaner test by adding some BwC test infrastructure to the S3 plugin tests but I'm not sure it's worth the complexity. Also, running a real rest test to verify the timing here makes the test even more prone to run into random CI slowness and fail in the last step that verifies no waiting is happening when moving to a repo without any old version snapshot => this seemed like the least bad option to me. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can we have a There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Not trivial but doable => On it :) There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Argh never mind ... then we'd have to move the cool down logic to Think this is worth it, given that this is a stop-gap solution? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. bummer :/ |
||
final String repoName = createRepository(randomName(), Settings.builder().put(repositorySettings()) | ||
.put(S3Repository.COOLDOWN_PERIOD.getKey(), TEST_COOLDOWN_PERIOD).build()); | ||
|
||
final SnapshotId fakeOldSnapshot = client().admin().cluster().prepareCreateSnapshot(repoName, "snapshot-old") | ||
.setWaitForCompletion(true).setIndices().get().getSnapshotInfo().snapshotId(); | ||
final RepositoriesService repositoriesService = internalCluster().getInstance(RepositoriesService.class); | ||
final BlobStoreRepository repository = (BlobStoreRepository) repositoriesService.repository(repoName); | ||
final RepositoryData repositoryData = | ||
PlainActionFuture.get(f -> repository.threadPool().generic().execute(() -> repository.getRepositoryData(f))); | ||
final RepositoryData modifiedRepositoryData = repositoryData.withVersions(Collections.singletonMap(fakeOldSnapshot, | ||
SnapshotsService.SHARD_GEN_IN_REPO_DATA_VERSION.minimumCompatibilityVersion())); | ||
final BytesReference serialized = | ||
BytesReference.bytes(modifiedRepositoryData.snapshotsToXContent(XContentFactory.jsonBuilder(), false)); | ||
PlainActionFuture.get(f -> repository.threadPool().generic().execute(ActionRunnable.run(f, () -> { | ||
try (InputStream stream = serialized.streamInput()) { | ||
repository.blobStore().blobContainer(repository.basePath()).writeBlobAtomic( | ||
BlobStoreRepository.INDEX_FILE_PREFIX + modifiedRepositoryData.getGenId(), stream, serialized.length(), true); | ||
} | ||
}))); | ||
|
||
final String newSnapshotName = "snapshot-new"; | ||
final long beforeThrottledSnapshot = repository.threadPool().relativeTimeInNanos(); | ||
client().admin().cluster().prepareCreateSnapshot(repoName, newSnapshotName).setWaitForCompletion(true).setIndices().get(); | ||
assertThat(repository.threadPool().relativeTimeInNanos() - beforeThrottledSnapshot, greaterThan(TEST_COOLDOWN_PERIOD.getNanos())); | ||
|
||
final long beforeThrottledDelete = repository.threadPool().relativeTimeInNanos(); | ||
client().admin().cluster().prepareDeleteSnapshot(repoName, newSnapshotName).get(); | ||
assertThat(repository.threadPool().relativeTimeInNanos() - beforeThrottledDelete, greaterThan(TEST_COOLDOWN_PERIOD.getNanos())); | ||
|
||
final long beforeFastDelete = repository.threadPool().relativeTimeInNanos(); | ||
client().admin().cluster().prepareDeleteSnapshot(repoName, fakeOldSnapshot.getName()).get(); | ||
assertThat(repository.threadPool().relativeTimeInNanos() - beforeFastDelete, lessThan(TEST_COOLDOWN_PERIOD.getNanos())); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I wonder if there are situations where There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
I figured that's impossible since I turned off the timestamp cache? I think otherwise the underlying primitives in
And rightfully so? => that said :) ... let me see about the suggested test via the resiliency tests |
||
} | ||
|
||
/** | ||
* S3RepositoryPlugin that allows to disable chunked encoding and to set a low threshold between single upload and multipart upload. | ||
*/ | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@ywelsch I went with doing it this way instead of just keeping track of a timestamp and then failing a new snapshot if it's started to close to the last timestamp. I'm afraid having random failures from concurrent snapshot exceptions when no running snapshot is visible to APIs could mess with Cloud orchestration (not necessarily breaking it but causing an unreasonable amount of
_status
requests).There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I had to think a bit about this, and consulted @DaveCTurner as well. We both agree that this is the right path forward (simpler to explain to users and simpler for existing orchestration tools).
In short, this artificially extends the duration of the snapshot, i.e., taking or deleting a snapshot takes 3 minutes longer. Can we add a log message that details why we are doing this (and that we are in a repo with legacy snapshots)? Let's also document this somewhere (with the setting). This gives users the choice e.g. to move to a different repo.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we really document this? It seems to me that if you're on AWS S3 not having the cool down is a risk in 100% of cases. If we document it, those that this functionality is intended to protect might opt to turn it off to "speed things up"?
Maybe just document the waiting but not the setting?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we must document how users can safely speed it up (i.e. by moving to a new repo or deleting all their legacy snapshots). I'm ok with not documenting the setting itself - we already have form for leaving dangerous settings undocumented (see
MergePolicyConfig
for instance). Let's add this reasoning to its Javadoc along with explicit instructions not to adjust it and instead to move to a new repo or delete all the legacy snapshots, to deal with the inevitable user who comes across it in the source code.Can we also mention
{@link Version#V_7_6_0}
in the Javadoc so we get a reminder to remove this in v9?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Alright, added docs to the setting, a link to 7.6, an explanatory log message and a test in f9047d7 :)