diff --git a/server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/create/CreateSnapshotResponse.java b/server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/create/CreateSnapshotResponse.java index a63f2cf810475..b16959e9d18de 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/create/CreateSnapshotResponse.java +++ b/server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/create/CreateSnapshotResponse.java @@ -53,7 +53,7 @@ public class CreateSnapshotResponse extends ActionResponse implements ToXContent CreateSnapshotResponse() {} - CreateSnapshotResponse(@Nullable SnapshotInfo snapshotInfo) { + public CreateSnapshotResponse(@Nullable SnapshotInfo snapshotInfo) { this.snapshotInfo = snapshotInfo; } diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/slm/SnapshotLifecyclePolicy.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/slm/SnapshotLifecyclePolicy.java index e038d3bb6e3bf..0ef7912c58017 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/slm/SnapshotLifecyclePolicy.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/slm/SnapshotLifecyclePolicy.java @@ -260,7 +260,7 @@ public CreateSnapshotRequest toRequest() { Map mergedConfiguration = new HashMap<>(configuration); mergedConfiguration.put("metadata", metadataWithAddedPolicyName); req.source(mergedConfiguration); - req.waitForCompletion(false); + req.waitForCompletion(true); return req; } diff --git a/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/ilm/IndexLifecycle.java b/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/ilm/IndexLifecycle.java index 2fc2033ab441a..ebf7c3b42822f 100644 --- a/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/ilm/IndexLifecycle.java +++ b/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/ilm/IndexLifecycle.java @@ -158,7 +158,8 @@ public List> getSettings() { LifecycleSettings.LIFECYCLE_INDEXING_COMPLETE_SETTING, RolloverAction.LIFECYCLE_ROLLOVER_ALIAS_SETTING, LifecycleSettings.SLM_HISTORY_INDEX_ENABLED_SETTING, - LifecycleSettings.SLM_RETENTION_SCHEDULE_SETTING); + LifecycleSettings.SLM_RETENTION_SCHEDULE_SETTING, + LifecycleSettings.SLM_RETENTION_DURATION_SETTING); } @Override diff --git a/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/slm/SnapshotLifecycleTask.java b/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/slm/SnapshotLifecycleTask.java index 4c740f4278604..ec3baaf2be583 100644 --- a/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/slm/SnapshotLifecycleTask.java +++ b/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/slm/SnapshotLifecycleTask.java @@ -23,6 +23,8 @@ import org.elasticsearch.common.xcontent.ToXContent; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.json.JsonXContent; +import org.elasticsearch.snapshots.SnapshotException; +import org.elasticsearch.snapshots.SnapshotInfo; import org.elasticsearch.xpack.core.ClientHelper; import org.elasticsearch.xpack.core.scheduler.SchedulerEngine; import org.elasticsearch.xpack.core.slm.SnapshotInvocationRecord; @@ -91,16 +93,32 @@ public static Optional maybeTakeSnapshot(final String jobId, final Clien public void onResponse(CreateSnapshotResponse createSnapshotResponse) { logger.debug("snapshot response for [{}]: {}", policyMetadata.getPolicy().getId(), Strings.toString(createSnapshotResponse)); - final long timestamp = Instant.now().toEpochMilli(); - clusterService.submitStateUpdateTask("slm-record-success-" + policyMetadata.getPolicy().getId(), - WriteJobStatus.success(policyMetadata.getPolicy().getId(), request.snapshot(), timestamp)); - historyStore.putAsync(SnapshotHistoryItem.creationSuccessRecord(timestamp, policyMetadata.getPolicy(), - request.snapshot())); + final SnapshotInfo snapInfo = createSnapshotResponse.getSnapshotInfo(); + + // Check that there are no failed shards, since the request may not entirely + // fail, but may still have failures (such as in the case of an aborted snapshot) + if (snapInfo.failedShards() == 0) { + final long timestamp = Instant.now().toEpochMilli(); + clusterService.submitStateUpdateTask("slm-record-success-" + policyMetadata.getPolicy().getId(), + WriteJobStatus.success(policyMetadata.getPolicy().getId(), request.snapshot(), timestamp)); + historyStore.putAsync(SnapshotHistoryItem.creationSuccessRecord(timestamp, policyMetadata.getPolicy(), + request.snapshot())); + } else { + int failures = snapInfo.failedShards(); + int total = snapInfo.totalShards(); + final SnapshotException e = new SnapshotException(request.repository(), request.snapshot(), + "failed to create snapshot successfully, " + failures + " out of " + total + " total shards failed"); + // Add each failed shard's exception as suppressed, the exception contains + // information about which shard failed + snapInfo.shardFailures().forEach(failure -> e.addSuppressed(failure.getCause())); + // Call the failure handler to register this as a failure and persist it + onFailure(e); + } } @Override public void onFailure(Exception e) { - logger.error("failed to issue create snapshot request for snapshot lifecycle policy [{}]: {}", + logger.error("failed to create snapshot for snapshot lifecycle policy [{}]: {}", policyMetadata.getPolicy().getId(), e); final long timestamp = Instant.now().toEpochMilli(); clusterService.submitStateUpdateTask("slm-record-failure-" + policyMetadata.getPolicy().getId(), diff --git a/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/slm/SnapshotRetentionTask.java b/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/slm/SnapshotRetentionTask.java index 368dbcae6789e..f09b793a1a654 100644 --- a/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/slm/SnapshotRetentionTask.java +++ b/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/slm/SnapshotRetentionTask.java @@ -339,6 +339,7 @@ void deleteSnapshots(Map> snapshotsToDelete, List snapshots = entry.getValue(); for (SnapshotInfo info : snapshots) { final String policyId = getPolicyId(info); + final long deleteStartTime = nowNanoSupplier.getAsLong(); deleteSnapshot(policyId, repo, info.snapshotId(), slmStats, ActionListener.wrap(acknowledgedResponse -> { deleted.incrementAndGet(); if (acknowledgedResponse.isAcknowledged()) { @@ -364,13 +365,15 @@ void deleteSnapshots(Map> snapshotsToDelete, })); // Check whether we have exceeded the maximum time allowed to spend deleting // snapshots, if we have, short-circuit the rest of the deletions - TimeValue elapsedDeletionTime = TimeValue.timeValueNanos(nowNanoSupplier.getAsLong() - startTime); - logger.debug("elapsed time for deletion of [{}] snapshot: {}", info.snapshotId(), elapsedDeletionTime); - if (elapsedDeletionTime.compareTo(maximumTime) > 0) { + long finishTime = nowNanoSupplier.getAsLong(); + TimeValue deletionTime = TimeValue.timeValueNanos(finishTime - deleteStartTime); + logger.debug("elapsed time for deletion of [{}] snapshot: {}", info.snapshotId(), deletionTime); + TimeValue totalDeletionTime = TimeValue.timeValueNanos(finishTime - startTime); + if (totalDeletionTime.compareTo(maximumTime) > 0) { logger.info("maximum snapshot retention deletion time reached, time spent: [{}]," + " maximum allowed time: [{}], deleted [{}] out of [{}] snapshots scheduled for deletion, failed to delete [{}]", - elapsedDeletionTime, maximumTime, deleted, count, failed); - slmStats.deletionTime(elapsedDeletionTime); + totalDeletionTime, maximumTime, deleted, count, failed); + slmStats.deletionTime(totalDeletionTime); slmStats.retentionTimedOut(); return; } @@ -402,8 +405,8 @@ public void onResponse(AcknowledgedResponse acknowledgedResponse) { } else { logger.warn("[{}] snapshot [{}] delete issued but the request was not acknowledged", repo, snapshot); } - listener.onResponse(acknowledgedResponse); slmStats.snapshotDeleted(slmPolicy); + listener.onResponse(acknowledgedResponse); } @Override diff --git a/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/slm/SnapshotLifecycleTaskTests.java b/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/slm/SnapshotLifecycleTaskTests.java index 84c1d12cce65e..5474602cdfda6 100644 --- a/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/slm/SnapshotLifecycleTaskTests.java +++ b/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/slm/SnapshotLifecycleTaskTests.java @@ -23,6 +23,10 @@ import org.elasticsearch.common.TriFunction; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.xcontent.json.JsonXContent; +import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.snapshots.SnapshotId; +import org.elasticsearch.snapshots.SnapshotInfo; +import org.elasticsearch.snapshots.SnapshotShardFailure; import org.elasticsearch.test.ClusterServiceUtils; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.client.NoOpClient; @@ -47,6 +51,7 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.Consumer; +import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.instanceOf; import static org.hamcrest.Matchers.startsWith; @@ -196,6 +201,83 @@ public void testCreateSnapshotOnTrigger() { threadPool.shutdownNow(); } + public void testPartialFailureSnapshot() throws Exception { + final String id = randomAlphaOfLength(4); + final SnapshotLifecyclePolicyMetadata slpm = makePolicyMeta(id); + final SnapshotLifecycleMetadata meta = + new SnapshotLifecycleMetadata(Collections.singletonMap(id, slpm), OperationMode.RUNNING, new SnapshotLifecycleStats()); + + final ClusterState state = ClusterState.builder(new ClusterName("test")) + .metaData(MetaData.builder() + .putCustom(SnapshotLifecycleMetadata.TYPE, meta) + .build()) + .build(); + + final ThreadPool threadPool = new TestThreadPool("test"); + final AtomicBoolean clientCalled = new AtomicBoolean(false); + final SetOnce snapshotName = new SetOnce<>(); + try (ClusterService clusterService = ClusterServiceUtils.createClusterService(state, threadPool); + VerifyingClient client = new VerifyingClient(threadPool, + (action, request, listener) -> { + assertFalse(clientCalled.getAndSet(true)); + assertThat(action, instanceOf(CreateSnapshotAction.class)); + assertThat(request, instanceOf(CreateSnapshotRequest.class)); + + CreateSnapshotRequest req = (CreateSnapshotRequest) request; + + SnapshotLifecyclePolicy policy = slpm.getPolicy(); + assertThat(req.snapshot(), startsWith(policy.getName() + "-")); + assertThat(req.repository(), equalTo(policy.getRepository())); + snapshotName.set(req.snapshot()); + if (req.indices().length > 0) { + assertThat(Arrays.asList(req.indices()), equalTo(policy.getConfig().get("indices"))); + } + boolean globalState = policy.getConfig().get("include_global_state") == null || + Boolean.parseBoolean((String) policy.getConfig().get("include_global_state")); + assertThat(req.includeGlobalState(), equalTo(globalState)); + + return new CreateSnapshotResponse( + new SnapshotInfo( + new SnapshotId(req.snapshot(), "uuid"), + Arrays.asList(req.indices()), + randomNonNegativeLong(), + "snapshot started", + randomNonNegativeLong(), + 3, + Collections.singletonList( + new SnapshotShardFailure("nodeId", new ShardId("index", "uuid", 0), "forced failure")), + req.includeGlobalState(), + req.userMetadata() + )); + })) { + final AtomicBoolean historyStoreCalled = new AtomicBoolean(false); + SnapshotHistoryStore historyStore = new VerifyingHistoryStore(null, ZoneOffset.UTC, + item -> { + assertFalse(historyStoreCalled.getAndSet(true)); + final SnapshotLifecyclePolicy policy = slpm.getPolicy(); + assertEquals(policy.getId(), item.getPolicyId()); + assertEquals(policy.getRepository(), item.getRepository()); + assertEquals(policy.getConfig(), item.getSnapshotConfiguration()); + assertEquals(snapshotName.get(), item.getSnapshotName()); + assertFalse("item should be a failure", item.isSuccess()); + assertThat(item.getErrorDetails(), + containsString("failed to create snapshot successfully, 1 out of 3 total shards failed")); + assertThat(item.getErrorDetails(), + containsString("forced failure")); + }); + + SnapshotLifecycleTask task = new SnapshotLifecycleTask(client, clusterService, historyStore); + // Trigger the event with a matching job name for the policy + task.triggered(new SchedulerEngine.Event(SnapshotLifecycleService.getJobId(slpm), + System.currentTimeMillis(), System.currentTimeMillis())); + + assertTrue("snapshot should be triggered once", clientCalled.get()); + assertTrue("history store should be called once", historyStoreCalled.get()); + } + + threadPool.shutdownNow(); + } + /** * A client that delegates to a verifying function for action/request/listener */