Skip to content

Commit

Permalink
Check all actions preventing snapshot delete during retention (#45992)
Browse files Browse the repository at this point in the history
* Check all actions preventing snapshot delete during retention run

Previously we only checked to see if a snapshot was currently running,
but it turns out that more things can block snapshot deletion. This
changes the check to be a check for:

- a snapshot currently running
- a deletion already in progress
- a repo cleanup in progress
- a restore currently running

This was found by CI where a third party delete in a test caused SLM
retention deletion to throw an exception.

Relates to #43663

* Add unit test for okayToDeleteSnapshots

* Fix bug where SLM retention task would be scheduled on every node

* Enhance test logging

* Ignore if snapshot is already deleted

* Missing import

* Fix SnapshotRetentionServiceTests
  • Loading branch information
dakrone authored Sep 3, 2019
1 parent 06e2d69 commit 7129832
Show file tree
Hide file tree
Showing 7 changed files with 120 additions and 29 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ private Entry(StreamInput in) throws IOException {
repositoryStateId = in.readLong();
}

private Entry(String repository, long repositoryStateId) {
public Entry(String repository, long repositoryStateId) {
this.repository = repository;
this.repositoryStateId = repositoryStateId;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ public class SnapshotDeletionsInProgress extends AbstractNamedDiffable<Custom> i
// the list of snapshot deletion request entries
private final List<Entry> entries;

private SnapshotDeletionsInProgress(List<Entry> entries) {
public SnapshotDeletionsInProgress(List<Entry> entries) {
this.entries = Collections.unmodifiableList(entries);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import static org.elasticsearch.xpack.core.slm.history.SnapshotHistoryItem.DELETE_OPERATION;
import static org.elasticsearch.xpack.core.slm.history.SnapshotHistoryStore.SLM_HISTORY_INDEX_PREFIX;
import static org.elasticsearch.xpack.ilm.TimeSeriesLifecycleActionsIT.getStepKeyForIndex;
import static org.hamcrest.Matchers.anyOf;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;
Expand Down Expand Up @@ -383,7 +384,11 @@ public void testSnapshotInProgress() throws Exception {
});

// Cancel the snapshot since it is not going to complete quickly
assertOK(client().performRequest(new Request("DELETE", "/_snapshot/" + repoId + "/" + snapshotName)));
try {
client().performRequest(new Request("DELETE", "/_snapshot/" + repoId + "/" + snapshotName));
} catch (Exception e) {
// ignore
}
}
}

Expand All @@ -403,9 +408,9 @@ public void testRetentionWhileSnapshotInProgress() throws Exception {
initializeRepo(slowRepo, "1b");
initializeRepo(fastRepo, "10mb");

createSnapshotPolicy(slowPolicy, "snap", "1 2 3 4 5 ?", slowRepo, indexName, true,
createSnapshotPolicy(slowPolicy, "slow-snap", "1 2 3 4 5 ?", slowRepo, indexName, true,
new SnapshotRetentionConfiguration(TimeValue.timeValueSeconds(0), null, null));
createSnapshotPolicy(fastPolicy, "snap", "1 2 3 4 5 ?", fastRepo, indexName, true,
createSnapshotPolicy(fastPolicy, "fast-snap", "1 2 3 4 5 ?", fastRepo, indexName, true,
new SnapshotRetentionConfiguration(TimeValue.timeValueSeconds(0), null, null));

// Create a snapshot and wait for it to be complete (need something that can be deleted)
Expand All @@ -419,6 +424,19 @@ public void testRetentionWhileSnapshotInProgress() throws Exception {
logger.info("--> waiting for snapshot {} to be successful, got: {}", completedSnapshotName, snaps);
List<Map<String, Object>> snaps2 = (List<Map<String, Object>>) snaps.get("snapshots");
assertThat(snaps2.get(0).get("state"), equalTo("SUCCESS"));

// Check that no in_progress snapshots show up
Response response = client().performRequest(new Request("GET", "/_slm/policy"));
Map<String, Object> policyResponseMap;
try (InputStream content2 = response.getEntity().getContent()) {
policyResponseMap = XContentHelper.convertToMap(XContentType.JSON.xContent(), content2, true);
}
assertThat(policyResponseMap.size(), greaterThan(0));
Optional<Map<String, Object>> inProgress = Optional.ofNullable((Map<String, Object>) policyResponseMap.get(slowPolicy))
.map(policy -> (Map<String, Object>) policy.get("in_progress"));

// Ensure no snapshots are running
assertFalse("expected no in progress snapshots but got " + inProgress.orElse(null), inProgress.isPresent());
}
} catch (NullPointerException | ResponseException e) {
fail("unable to retrieve completed snapshot: " + e);
Expand All @@ -431,11 +449,12 @@ public void testRetentionWhileSnapshotInProgress() throws Exception {
// Check that the executed snapshot shows up in the SLM output as in_progress
assertBusy(() -> {
try {
Response response = client().performRequest(new Request("GET", "/_slm/policy" + (randomBoolean() ? "" : "?human")));
Response response = client().performRequest(new Request("GET", "/_slm/policy"));
Map<String, Object> policyResponseMap;
try (InputStream content = response.getEntity().getContent()) {
policyResponseMap = XContentHelper.convertToMap(XContentType.JSON.xContent(), content, true);
}
logger.info("--> checking for 'slow-*' snapshot to show up in policy response, got: " + policyResponseMap);
assertThat(policyResponseMap.size(), greaterThan(0));
Optional<Map<String, Object>> inProgress = Optional.ofNullable((Map<String, Object>) policyResponseMap.get(slowPolicy))
.map(policy -> (Map<String, Object>) policy.get("in_progress"));
Expand All @@ -444,7 +463,7 @@ public void testRetentionWhileSnapshotInProgress() throws Exception {
Map<String, Object> inProgressMap = inProgress.get();
assertThat(inProgressMap.get("name"), equalTo(slowSnapshotName));
assertNotNull(inProgressMap.get("uuid"));
assertThat(inProgressMap.get("state"), equalTo("STARTED"));
assertThat(inProgressMap.get("state"), anyOf(equalTo("STARTED"), equalTo("INIT")));
assertThat((long) inProgressMap.get("start_time_millis"), greaterThan(0L));
assertNull(inProgressMap.get("failure"));
} else {
Expand Down Expand Up @@ -481,9 +500,10 @@ public void testRetentionWhileSnapshotInProgress() throws Exception {
assertBusy(() -> {
// We expect a failed response because the snapshot should not exist
try {
logger.info("--> checking to see if snapshot has been deleted...");
Response response = client().performRequest(new Request("GET", "/_snapshot/" + slowRepo + "/" + completedSnapshotName));
assertThat(EntityUtils.toString(response.getEntity()), containsString("snapshot_missing_exception"));
String resp = EntityUtils.toString(response.getEntity());
logger.info("--> checking to see if snapshot has been deleted, got: " + resp);
assertThat(resp, containsString("snapshot_missing_exception"));
} catch (ResponseException e) {
assertThat(EntityUtils.toString(e.getResponse().getEntity()), containsString("snapshot_missing_exception"));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ public class SnapshotRetentionService implements LocalNodeMasterListener, Closea
private final SchedulerEngine scheduler;

private volatile String slmRetentionSchedule;
private volatile boolean isMaster = false;

public SnapshotRetentionService(Settings settings,
Supplier<SnapshotRetentionTask> taskSupplier,
Expand All @@ -63,17 +64,19 @@ SchedulerEngine getScheduler() {

@Override
public void onMaster() {
this.isMaster = true;
rescheduleRetentionJob();
}

@Override
public void offMaster() {
this.isMaster = false;
cancelRetentionJob();
}

private void rescheduleRetentionJob() {
final String schedule = this.slmRetentionSchedule;
if (Strings.hasText(schedule)) {
if (this.isMaster && Strings.hasText(schedule)) {
final SchedulerEngine.Job retentionJob = new SchedulerEngine.Job(SLM_RETENTION_JOB_ID,
new CronSchedule(schedule));
logger.debug("scheduling SLM retention job for [{}]", schedule);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@
import org.elasticsearch.client.OriginSettingClient;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateObserver;
import org.elasticsearch.cluster.RepositoryCleanupInProgress;
import org.elasticsearch.cluster.RestoreInProgress;
import org.elasticsearch.cluster.SnapshotDeletionsInProgress;
import org.elasticsearch.cluster.SnapshotsInProgress;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Strings;
Expand Down Expand Up @@ -280,19 +283,21 @@ private void maybeDeleteSnapshots(Map<String, List<SnapshotInfo>> snapshotsToDel
}

ClusterState state = clusterService.state();
if (snapshotInProgress(state)) {
if (okayToDeleteSnapshots(state)) {
deleteSnapshots(snapshotsToDelete, maximumTime, slmStats);
} else {
logger.debug("a snapshot is currently running, rescheduling SLM retention for after snapshot has completed");
ClusterStateObserver observer = new ClusterStateObserver(clusterService, maximumTime, logger, threadPool.getThreadContext());
CountDownLatch latch = new CountDownLatch(1);
observer.waitForNextChange(
new NoSnapshotRunningListener(observer,
newState -> threadPool.executor(ThreadPool.Names.MANAGEMENT).execute(() -> {
try {
deleteSnapshots(snapshotsToDelete, maximumTime, slmStats);
} finally {
latch.countDown();
}
}),
try {
deleteSnapshots(snapshotsToDelete, maximumTime, slmStats);
} finally {
latch.countDown();
}
}),
e -> {
latch.countDown();
throw new ElasticsearchException(e);
Expand All @@ -302,8 +307,6 @@ private void maybeDeleteSnapshots(Map<String, List<SnapshotInfo>> snapshotsToDel
} catch (InterruptedException e) {
throw new ElasticsearchException(e);
}
} else {
deleteSnapshots(snapshotsToDelete, maximumTime, slmStats);
}
}

Expand Down Expand Up @@ -412,14 +415,32 @@ void updateStateWithStats(SnapshotLifecycleStats newStats) {
clusterService.submitStateUpdateTask("update_slm_stats", new UpdateSnapshotLifecycleStatsTask(newStats));
}

public static boolean snapshotInProgress(ClusterState state) {
SnapshotsInProgress snapshotsInProgress = state.custom(SnapshotsInProgress.TYPE);
if (snapshotsInProgress == null || snapshotsInProgress.entries().isEmpty()) {
// No snapshots are running, new state is acceptable to proceed
public static boolean okayToDeleteSnapshots(ClusterState state) {
// Cannot delete during a snapshot
final SnapshotsInProgress snapshotsInProgress = state.custom(SnapshotsInProgress.TYPE);
if (snapshotsInProgress != null && snapshotsInProgress.entries().size() > 0) {
return false;
}

// Cannot delete during an existing delete
final SnapshotDeletionsInProgress deletionsInProgress = state.custom(SnapshotDeletionsInProgress.TYPE);
if (deletionsInProgress != null && deletionsInProgress.hasDeletionsInProgress()) {
return false;
}

// There are snapshots
// Cannot delete while a repository is being cleaned
final RepositoryCleanupInProgress repositoryCleanupInProgress = state.custom(RepositoryCleanupInProgress.TYPE);
if (repositoryCleanupInProgress != null && repositoryCleanupInProgress.cleanupInProgress() == false) {
return false;
}

// Cannot delete during a restore
final RestoreInProgress restoreInProgress = state.custom(RestoreInProgress.TYPE);
if (restoreInProgress != null) {
return false;
}

// It's okay to delete snapshots
return true;
}

Expand All @@ -445,11 +466,11 @@ class NoSnapshotRunningListener implements ClusterStateObserver.Listener {
@Override
public void onNewClusterState(ClusterState state) {
try {
if (snapshotInProgress(state)) {
observer.waitForNextChange(this);
} else {
logger.debug("retrying SLM snapshot retention deletion after snapshot has completed");
if (okayToDeleteSnapshots(state)) {
logger.debug("retrying SLM snapshot retention deletion after snapshot operation has completed");
reRun.accept(state);
} else {
observer.waitForNextChange(this);
}
} catch (Exception e) {
exceptionConsumer.accept(e);
Expand All @@ -464,7 +485,7 @@ public void onClusterServiceClose() {
@Override
public void onTimeout(TimeValue timeout) {
exceptionConsumer.accept(
new IllegalStateException("slm retention snapshot deletion out while waiting for ongoing snapshots to complete"));
new IllegalStateException("slm retention snapshot deletion out while waiting for ongoing snapshot operations to complete"));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ public void testJobsAreScheduled() {
FakeRetentionTask::new, clusterService, clock)) {
assertThat(service.getScheduler().jobCount(), equalTo(0));

service.onMaster();
service.setUpdateSchedule(SnapshotLifecycleServiceTests.randomSchedule());
assertThat(service.getScheduler().scheduledJobIds(), containsInAnyOrder(SnapshotRetentionService.SLM_RETENTION_JOB_ID));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,18 @@
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.RepositoryCleanupInProgress;
import org.elasticsearch.cluster.RestoreInProgress;
import org.elasticsearch.cluster.SnapshotDeletionsInProgress;
import org.elasticsearch.cluster.SnapshotsInProgress;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.collect.ImmutableOpenMap;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.repositories.IndexId;
import org.elasticsearch.snapshots.Snapshot;
import org.elasticsearch.snapshots.SnapshotId;
import org.elasticsearch.snapshots.SnapshotInfo;
import org.elasticsearch.test.ClusterServiceUtils;
Expand Down Expand Up @@ -57,6 +65,7 @@
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.not;
import static org.mockito.Mockito.mock;

public class SnapshotRetentionTaskTests extends ESTestCase {

Expand Down Expand Up @@ -317,6 +326,43 @@ private void timeBoundedDeletion(final boolean deletionSuccess) throws Exception
}
}

public void testOkToDeleteSnapshots() {
final Snapshot snapshot = new Snapshot("repo", new SnapshotId("name", "uuid"));

SnapshotsInProgress inProgress = new SnapshotsInProgress(
new SnapshotsInProgress.Entry(
snapshot, true, false, SnapshotsInProgress.State.INIT,
Collections.singletonList(new IndexId("name", "id")), 0, 0,
ImmutableOpenMap.<ShardId, SnapshotsInProgress.ShardSnapshotStatus>builder().build(), Collections.emptyMap()));
ClusterState state = ClusterState.builder(new ClusterName("cluster"))
.putCustom(SnapshotsInProgress.TYPE, inProgress)
.build();

assertThat(SnapshotRetentionTask.okayToDeleteSnapshots(state), equalTo(false));

SnapshotDeletionsInProgress delInProgress = new SnapshotDeletionsInProgress(
Collections.singletonList(new SnapshotDeletionsInProgress.Entry(snapshot, 0, 0)));
state = ClusterState.builder(new ClusterName("cluster"))
.putCustom(SnapshotDeletionsInProgress.TYPE, delInProgress)
.build();

assertThat(SnapshotRetentionTask.okayToDeleteSnapshots(state), equalTo(false));

RepositoryCleanupInProgress cleanupInProgress = new RepositoryCleanupInProgress(new RepositoryCleanupInProgress.Entry("repo", 0));
state = ClusterState.builder(new ClusterName("cluster"))
.putCustom(RepositoryCleanupInProgress.TYPE, cleanupInProgress)
.build();

assertThat(SnapshotRetentionTask.okayToDeleteSnapshots(state), equalTo(false));

RestoreInProgress restoreInProgress = mock(RestoreInProgress.class);
state = ClusterState.builder(new ClusterName("cluster"))
.putCustom(RestoreInProgress.TYPE, restoreInProgress)
.build();

assertThat(SnapshotRetentionTask.okayToDeleteSnapshots(state), equalTo(false));
}

public void testSkipWhileStopping() throws Exception {
doTestSkipDuringMode(OperationMode.STOPPING);
}
Expand Down

0 comments on commit 7129832

Please sign in to comment.