Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Wait for snapshot completion in SLM snapshot invocation #47051

Merged
merged 2 commits into from
Sep 25, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ public class CreateSnapshotResponse extends ActionResponse implements ToXContent

CreateSnapshotResponse() {}

CreateSnapshotResponse(@Nullable SnapshotInfo snapshotInfo) {
public CreateSnapshotResponse(@Nullable SnapshotInfo snapshotInfo) {
this.snapshotInfo = snapshotInfo;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -260,7 +260,7 @@ public CreateSnapshotRequest toRequest() {
Map<String, Object> mergedConfiguration = new HashMap<>(configuration);
mergedConfiguration.put("metadata", metadataWithAddedPolicyName);
req.source(mergedConfiguration);
req.waitForCompletion(false);
req.waitForCompletion(true);
return req;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,8 @@ public List<Setting<?>> getSettings() {
LifecycleSettings.LIFECYCLE_INDEXING_COMPLETE_SETTING,
RolloverAction.LIFECYCLE_ROLLOVER_ALIAS_SETTING,
LifecycleSettings.SLM_HISTORY_INDEX_ENABLED_SETTING,
LifecycleSettings.SLM_RETENTION_SCHEDULE_SETTING);
LifecycleSettings.SLM_RETENTION_SCHEDULE_SETTING,
LifecycleSettings.SLM_RETENTION_DURATION_SETTING);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.json.JsonXContent;
import org.elasticsearch.snapshots.SnapshotException;
import org.elasticsearch.snapshots.SnapshotInfo;
import org.elasticsearch.xpack.core.ClientHelper;
import org.elasticsearch.xpack.core.scheduler.SchedulerEngine;
import org.elasticsearch.xpack.core.slm.SnapshotInvocationRecord;
Expand Down Expand Up @@ -91,16 +93,32 @@ public static Optional<String> maybeTakeSnapshot(final String jobId, final Clien
public void onResponse(CreateSnapshotResponse createSnapshotResponse) {
logger.debug("snapshot response for [{}]: {}",
policyMetadata.getPolicy().getId(), Strings.toString(createSnapshotResponse));
final long timestamp = Instant.now().toEpochMilli();
clusterService.submitStateUpdateTask("slm-record-success-" + policyMetadata.getPolicy().getId(),
WriteJobStatus.success(policyMetadata.getPolicy().getId(), request.snapshot(), timestamp));
historyStore.putAsync(SnapshotHistoryItem.creationSuccessRecord(timestamp, policyMetadata.getPolicy(),
request.snapshot()));
final SnapshotInfo snapInfo = createSnapshotResponse.getSnapshotInfo();

// Check that there are no failed shards, since the request may not entirely
// fail, but may still have failures (such as in the case of an aborted snapshot)
if (snapInfo.failedShards() == 0) {
final long timestamp = Instant.now().toEpochMilli();
clusterService.submitStateUpdateTask("slm-record-success-" + policyMetadata.getPolicy().getId(),
WriteJobStatus.success(policyMetadata.getPolicy().getId(), request.snapshot(), timestamp));
historyStore.putAsync(SnapshotHistoryItem.creationSuccessRecord(timestamp, policyMetadata.getPolicy(),
request.snapshot()));
} else {
int failures = snapInfo.failedShards();
int total = snapInfo.totalShards();
final SnapshotException e = new SnapshotException(request.repository(), request.snapshot(),
"failed to create snapshot successfully, " + failures + " out of " + total + " total shards failed");
// Add each failed shard's exception as suppressed, the exception contains
// information about which shard failed
snapInfo.shardFailures().forEach(failure -> e.addSuppressed(failure.getCause()));
dakrone marked this conversation as resolved.
Show resolved Hide resolved
// Call the failure handler to register this as a failure and persist it
onFailure(e);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Given what Armin was saying on #46988 about partial snapshots sometimes being more like successes, treating them here as a failure may be awkward for some users. That said, I think we can go with it for now and tweak the behavior later if we get strong feedback about it.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think in order to better handle it, we may have to change the history store to store all the states that a snapshot can be in, that way we could still tell if there were errors (PARTIAL snapshots). For now though, I think it's safer to treat PARTIAL snapshots as failures.

}
}

@Override
public void onFailure(Exception e) {
logger.error("failed to issue create snapshot request for snapshot lifecycle policy [{}]: {}",
logger.error("failed to create snapshot for snapshot lifecycle policy [{}]: {}",
policyMetadata.getPolicy().getId(), e);
final long timestamp = Instant.now().toEpochMilli();
clusterService.submitStateUpdateTask("slm-record-failure-" + policyMetadata.getPolicy().getId(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -324,6 +324,7 @@ void deleteSnapshots(Map<String, List<SnapshotInfo>> snapshotsToDelete,
List<SnapshotInfo> snapshots = entry.getValue();
for (SnapshotInfo info : snapshots) {
final String policyId = getPolicyId(info);
final long deleteStartTime = nowNanoSupplier.getAsLong();
deleteSnapshot(policyId, repo, info.snapshotId(), slmStats, ActionListener.wrap(acknowledgedResponse -> {
deleted.incrementAndGet();
if (acknowledgedResponse.isAcknowledged()) {
Expand All @@ -349,13 +350,15 @@ void deleteSnapshots(Map<String, List<SnapshotInfo>> snapshotsToDelete,
}));
// Check whether we have exceeded the maximum time allowed to spend deleting
// snapshots, if we have, short-circuit the rest of the deletions
TimeValue elapsedDeletionTime = TimeValue.timeValueNanos(nowNanoSupplier.getAsLong() - startTime);
logger.debug("elapsed time for deletion of [{}] snapshot: {}", info.snapshotId(), elapsedDeletionTime);
if (elapsedDeletionTime.compareTo(maximumTime) > 0) {
long finishTime = nowNanoSupplier.getAsLong();
TimeValue deletionTime = TimeValue.timeValueNanos(finishTime - deleteStartTime);
logger.debug("elapsed time for deletion of [{}] snapshot: {}", info.snapshotId(), deletionTime);
TimeValue totalDeletionTime = TimeValue.timeValueNanos(finishTime - startTime);
if (totalDeletionTime.compareTo(maximumTime) > 0) {
logger.info("maximum snapshot retention deletion time reached, time spent: [{}]," +
" maximum allowed time: [{}], deleted [{}] out of [{}] snapshots scheduled for deletion, failed to delete [{}]",
elapsedDeletionTime, maximumTime, deleted, count, failed);
slmStats.deletionTime(elapsedDeletionTime);
totalDeletionTime, maximumTime, deleted, count, failed);
slmStats.deletionTime(totalDeletionTime);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These changes aren't really related to the main purpose of this PR. They're very minor so I think it's fine, but going forward I think we should try to keep PRs focused on one conceptual change - this case bugs me a little because this PR otherwise doesn't really touch retention.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, it bugs me a little too, I'll separate them in the future.

slmStats.retentionTimedOut();
return;
}
Expand Down Expand Up @@ -387,8 +390,8 @@ public void onResponse(AcknowledgedResponse acknowledgedResponse) {
} else {
logger.warn("[{}] snapshot [{}] delete issued but the request was not acknowledged", repo, snapshot);
}
listener.onResponse(acknowledgedResponse);
slmStats.snapshotDeleted(slmPolicy);
listener.onResponse(acknowledgedResponse);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,10 @@
import org.elasticsearch.common.TriFunction;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.json.JsonXContent;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.snapshots.SnapshotId;
import org.elasticsearch.snapshots.SnapshotInfo;
import org.elasticsearch.snapshots.SnapshotShardFailure;
import org.elasticsearch.test.ClusterServiceUtils;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.client.NoOpClient;
Expand All @@ -47,6 +51,7 @@
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;

import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.startsWith;
Expand Down Expand Up @@ -196,6 +201,83 @@ public void testCreateSnapshotOnTrigger() {
threadPool.shutdownNow();
}

public void testPartialFailureSnapshot() throws Exception {
final String id = randomAlphaOfLength(4);
final SnapshotLifecyclePolicyMetadata slpm = makePolicyMeta(id);
final SnapshotLifecycleMetadata meta =
new SnapshotLifecycleMetadata(Collections.singletonMap(id, slpm), OperationMode.RUNNING, new SnapshotLifecycleStats());

final ClusterState state = ClusterState.builder(new ClusterName("test"))
.metaData(MetaData.builder()
.putCustom(SnapshotLifecycleMetadata.TYPE, meta)
.build())
.build();

final ThreadPool threadPool = new TestThreadPool("test");
final AtomicBoolean clientCalled = new AtomicBoolean(false);
final SetOnce<String> snapshotName = new SetOnce<>();
try (ClusterService clusterService = ClusterServiceUtils.createClusterService(state, threadPool);
VerifyingClient client = new VerifyingClient(threadPool,
(action, request, listener) -> {
assertFalse(clientCalled.getAndSet(true));
assertThat(action, instanceOf(CreateSnapshotAction.class));
assertThat(request, instanceOf(CreateSnapshotRequest.class));

CreateSnapshotRequest req = (CreateSnapshotRequest) request;

SnapshotLifecyclePolicy policy = slpm.getPolicy();
assertThat(req.snapshot(), startsWith(policy.getName() + "-"));
assertThat(req.repository(), equalTo(policy.getRepository()));
snapshotName.set(req.snapshot());
if (req.indices().length > 0) {
assertThat(Arrays.asList(req.indices()), equalTo(policy.getConfig().get("indices")));
}
boolean globalState = policy.getConfig().get("include_global_state") == null ||
Boolean.parseBoolean((String) policy.getConfig().get("include_global_state"));
assertThat(req.includeGlobalState(), equalTo(globalState));

return new CreateSnapshotResponse(
new SnapshotInfo(
new SnapshotId(req.snapshot(), "uuid"),
Arrays.asList(req.indices()),
randomNonNegativeLong(),
"snapshot started",
randomNonNegativeLong(),
3,
Collections.singletonList(
new SnapshotShardFailure("nodeId", new ShardId("index", "uuid", 0), "forced failure")),
req.includeGlobalState(),
req.userMetadata()
));
})) {
final AtomicBoolean historyStoreCalled = new AtomicBoolean(false);
SnapshotHistoryStore historyStore = new VerifyingHistoryStore(null, ZoneOffset.UTC,
item -> {
assertFalse(historyStoreCalled.getAndSet(true));
final SnapshotLifecyclePolicy policy = slpm.getPolicy();
assertEquals(policy.getId(), item.getPolicyId());
assertEquals(policy.getRepository(), item.getRepository());
assertEquals(policy.getConfig(), item.getSnapshotConfiguration());
assertEquals(snapshotName.get(), item.getSnapshotName());
assertFalse("item should be a failure", item.isSuccess());
assertThat(item.getErrorDetails(),
containsString("failed to create snapshot successfully, 1 out of 3 total shards failed"));
assertThat(item.getErrorDetails(),
containsString("forced failure"));
});

SnapshotLifecycleTask task = new SnapshotLifecycleTask(client, clusterService, historyStore);
// Trigger the event with a matching job name for the policy
task.triggered(new SchedulerEngine.Event(SnapshotLifecycleService.getJobId(slpm),
System.currentTimeMillis(), System.currentTimeMillis()));

assertTrue("snapshot should be triggered once", clientCalled.get());
assertTrue("history store should be called once", historyStoreCalled.get());
}

threadPool.shutdownNow();
}

/**
* A client that delegates to a verifying function for action/request/listener
*/
Expand Down