Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add API to execute SLM policy on demand #41038

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,7 @@
import org.elasticsearch.xpack.core.security.authz.privilege.ConditionalClusterPrivilege;
import org.elasticsearch.xpack.core.security.authz.privilege.ConditionalClusterPrivileges;
import org.elasticsearch.xpack.core.security.transport.netty4.SecurityNetty4Transport;
import org.elasticsearch.xpack.core.snapshotlifecycle.action.ExecuteSnapshotLifecycleAction;
import org.elasticsearch.xpack.core.sql.SqlFeatureSetUsage;
import org.elasticsearch.xpack.core.ssl.SSLService;
import org.elasticsearch.xpack.core.ssl.action.GetCertificateInfoAction;
Expand Down Expand Up @@ -370,6 +371,7 @@ public List<Action<? extends ActionResponse>> getClientActions() {
PutSnapshotLifecycleAction.INSTANCE,
GetSnapshotLifecycleAction.INSTANCE,
DeleteSnapshotLifecycleAction.INSTANCE,
ExecuteSnapshotLifecycleAction.INSTANCE,
// Freeze
TransportFreezeIndexAction.FreezeIndexAction.INSTANCE
);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/

package org.elasticsearch.xpack.core.snapshotlifecycle.action;

import org.elasticsearch.action.Action;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.action.support.master.AcknowledgedRequest;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.xcontent.ToXContentObject;
import org.elasticsearch.common.xcontent.XContentBuilder;

import java.io.IOException;
import java.util.Objects;

/**
* Action used to manually invoke a create snapshot request for a given
* snapshot lifecycle policy regardless of schedule.
*/
public class ExecuteSnapshotLifecycleAction extends Action<ExecuteSnapshotLifecycleAction.Response> {
public static final ExecuteSnapshotLifecycleAction INSTANCE = new ExecuteSnapshotLifecycleAction();
public static final String NAME = "cluster:admin/ilm/snapshot/execute";

protected ExecuteSnapshotLifecycleAction() {
super(NAME);
}

@Override
public ExecuteSnapshotLifecycleAction.Response newResponse() {
throw new UnsupportedOperationException();
}

@Override
public Writeable.Reader<ExecuteSnapshotLifecycleAction.Response> getResponseReader() {
return Response::new;
}

public static class Request extends AcknowledgedRequest<Request> implements ToXContentObject {

private String lifecycleId;

public Request(String lifecycleId) {
this.lifecycleId = lifecycleId;
}

public Request() { }

public String getLifecycleId() {
return this.lifecycleId;
}

@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
lifecycleId = in.readString();
}

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeString(lifecycleId);
}

@Override
public ActionRequestValidationException validate() {
return null;
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
builder.endObject();
return builder;
}

@Override
public int hashCode() {
return Objects.hash(lifecycleId);
}

@Override
public boolean equals(Object obj) {
if (obj == null) {
return false;
}
if (obj.getClass() != getClass()) {
return false;
}
Request other = (Request) obj;
return lifecycleId.equals(other.lifecycleId);
}

@Override
public String toString() {
return Strings.toString(this);
}
}

public static class Response extends ActionResponse implements ToXContentObject {

private final String snapshotName;

public Response(String snapshotName) {
this.snapshotName = snapshotName;
}

public String getSnapshotName() {
return this.snapshotName;
}

public Response(StreamInput in) throws IOException {
this(in.readString());
}

@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeString(this.snapshotName);
}

@Override
public final XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
builder.field("snapshot_name", getSnapshotName());
builder.endObject();
return builder;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,12 @@
import org.elasticsearch.client.ResponseException;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.xcontent.DeprecationHandler;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.common.xcontent.json.JsonXContent;
import org.elasticsearch.test.rest.ESRestTestCase;
Expand Down Expand Up @@ -152,6 +155,57 @@ public void testPolicyFailure() throws Exception {
assertOK(client().performRequest(delReq));
}

public void testPolicyManualExecution() throws Exception {
final String indexName = "test";
final String policyName = "test-policy";
final String repoId = "my-repo";
int docCount = randomIntBetween(10, 50);
List<IndexRequestBuilder> indexReqs = new ArrayList<>();
for (int i = 0; i < docCount; i++) {
index(client(), indexName, "" + i, "foo", "bar");
}

// Create a snapshot repo
inializeRepo(repoId);

createSnapshotPolicy(policyName, "snap", "1 2 3 4 5 ?", repoId, indexName, true);

ResponseException badResp = expectThrows(ResponseException.class,
() -> client().performRequest(new Request("PUT", "/_ilm/snapshot/" + policyName + "-bad/_execute")));
assertThat(EntityUtils.toString(badResp.getResponse().getEntity()),
containsString("no such snapshot lifecycle policy [" + policyName + "-bad]"));

Response goodResp = client().performRequest(new Request("PUT", "/_ilm/snapshot/" + policyName + "/_execute"));

try (XContentParser parser = JsonXContent.jsonXContent.createParser(NamedXContentRegistry.EMPTY,
DeprecationHandler.THROW_UNSUPPORTED_OPERATION, EntityUtils.toByteArray(goodResp.getEntity()))) {
final String snapshotName = parser.mapStrings().get("snapshot_name");

// Check that the executed snapshot is created
assertBusy(() -> {
try {
Response response = client().performRequest(new Request("GET", "/_snapshot/" + repoId + "/" + snapshotName));
Map<String, Object> snapshotResponseMap;
try (InputStream is = response.getEntity().getContent()) {
snapshotResponseMap = XContentHelper.convertToMap(XContentType.JSON.xContent(), is, true);
}
assertThat(snapshotResponseMap.size(), greaterThan(0));
} catch (ResponseException e) {
fail("expected snapshot to exist but it does not: " + EntityUtils.toString(e.getResponse().getEntity()));
}
});
}

Request delReq = new Request("DELETE", "/_ilm/snapshot/" + policyName);
assertOK(client().performRequest(delReq));

// It's possible there could have been a snapshot in progress when the
// policy is deleted, so wait for it to be finished
assertBusy(() -> {
assertThat(wipeSnapshots().size(), equalTo(0));
});
}

private void createSnapshotPolicy(String policyName, String snapshotNamePattern, String schedule, String repoId,
String indexPattern, boolean ignoreUnavailable) throws IOException {
Map<String, Object> snapConfig = new HashMap<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@
import org.elasticsearch.xpack.core.indexlifecycle.action.RetryAction;
import org.elasticsearch.xpack.core.indexlifecycle.action.StartILMAction;
import org.elasticsearch.xpack.core.indexlifecycle.action.StopILMAction;
import org.elasticsearch.xpack.core.snapshotlifecycle.action.ExecuteSnapshotLifecycleAction;
import org.elasticsearch.xpack.indexlifecycle.action.RestDeleteLifecycleAction;
import org.elasticsearch.xpack.indexlifecycle.action.RestExplainLifecycleAction;
import org.elasticsearch.xpack.indexlifecycle.action.RestGetLifecycleAction;
Expand Down Expand Up @@ -87,9 +88,11 @@
import org.elasticsearch.xpack.core.snapshotlifecycle.action.GetSnapshotLifecycleAction;
import org.elasticsearch.xpack.core.snapshotlifecycle.action.PutSnapshotLifecycleAction;
import org.elasticsearch.xpack.snapshotlifecycle.action.RestDeleteSnapshotLifecycleAction;
import org.elasticsearch.xpack.snapshotlifecycle.action.RestExecuteSnapshotLifecycleAction;
import org.elasticsearch.xpack.snapshotlifecycle.action.RestGetSnapshotLifecycleAction;
import org.elasticsearch.xpack.snapshotlifecycle.action.RestPutSnapshotLifecycleAction;
import org.elasticsearch.xpack.snapshotlifecycle.action.TransportDeleteSnapshotLifecycleAction;
import org.elasticsearch.xpack.snapshotlifecycle.action.TransportExecuteSnapshotLifecycleAction;
import org.elasticsearch.xpack.snapshotlifecycle.action.TransportGetSnapshotLifecycleAction;
import org.elasticsearch.xpack.snapshotlifecycle.action.TransportPutSnapshotLifecycleAction;

Expand Down Expand Up @@ -208,7 +211,8 @@ public List<RestHandler> getRestHandlers(Settings settings, RestController restC
// Snapshot lifecycle actions
new RestPutSnapshotLifecycleAction(settings, restController),
new RestDeleteSnapshotLifecycleAction(settings, restController),
new RestGetSnapshotLifecycleAction(settings, restController)
new RestGetSnapshotLifecycleAction(settings, restController),
new RestExecuteSnapshotLifecycleAction(settings, restController)
);
}

Expand All @@ -231,7 +235,8 @@ public List<RestHandler> getRestHandlers(Settings settings, RestController restC
// Snapshot lifecycle actions
new ActionHandler<>(PutSnapshotLifecycleAction.INSTANCE, TransportPutSnapshotLifecycleAction.class),
new ActionHandler<>(DeleteSnapshotLifecycleAction.INSTANCE, TransportDeleteSnapshotLifecycleAction.class),
new ActionHandler<>(GetSnapshotLifecycleAction.INSTANCE, TransportGetSnapshotLifecycleAction.class));
new ActionHandler<>(GetSnapshotLifecycleAction.INSTANCE, TransportGetSnapshotLifecycleAction.class),
new ActionHandler<>(ExecuteSnapshotLifecycleAction.INSTANCE, TransportExecuteSnapshotLifecycleAction.class));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ public void maybeScheduleSnapshot(final SnapshotLifecyclePolicyMetadata snapshot
/**
* Generate the job id for a given policy metadata. The job id is {@code <policyid>-<version>}
*/
static String getJobId(SnapshotLifecyclePolicyMetadata policyMeta) {
public static String getJobId(SnapshotLifecyclePolicyMetadata policyMeta) {
return policyMeta.getPolicy().getId() + "-" + policyMeta.getVersion();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,17 +53,37 @@ public SnapshotLifecycleTask(final Client client, final ClusterService clusterSe
@Override
public void triggered(SchedulerEngine.Event event) {
logger.debug("snapshot lifecycle policy task triggered from job [{}]", event.getJobName());
Optional<SnapshotLifecyclePolicyMetadata> maybeMetadata = getSnapPolicyMetadata(event.getJobName(), clusterService.state());
// If we were on JDK 9 and could use ifPresentOrElse this would be simpler.
boolean successful = maybeMetadata.map(policyMetadata -> {

final Optional<String> snapshotName = maybeTakeSnapshot(event.getJobName(), client, clusterService);

// Would be cleaner if we could use Optional#ifPresentOrElse
snapshotName.ifPresent(name ->
logger.info("snapshot lifecycle policy job [{}] issued new snapshot creation for [{}] successfully",
event.getJobName(), name));

if (snapshotName.isPresent() == false) {
logger.warn("snapshot lifecycle policy for job [{}] no longer exists, snapshot not created", event.getJobName());
}
}

/**
* For the given job id (a combination of policy id and version), issue a create snapshot
* request. On a successful or failed create snapshot issuing the state is stored in the cluster
* state in the policy's metadata
* @return An optional snapshot name if the request was issued successfully
*/
public static Optional<String> maybeTakeSnapshot(final String jobId, final Client client, final ClusterService clusterService) {
Optional<SnapshotLifecyclePolicyMetadata> maybeMetadata = getSnapPolicyMetadata(jobId, clusterService.state());
String snapshotName = maybeMetadata.map(policyMetadata -> {
CreateSnapshotRequest request = policyMetadata.getPolicy().toRequest();
final LifecyclePolicySecurityClient clientWithHeaders = new LifecyclePolicySecurityClient(this.client,
final LifecyclePolicySecurityClient clientWithHeaders = new LifecyclePolicySecurityClient(client,
ClientHelper.INDEX_LIFECYCLE_ORIGIN, policyMetadata.getHeaders());
logger.info("triggering periodic snapshot for policy [{}]", policyMetadata.getPolicy().getId());
clientWithHeaders.admin().cluster().createSnapshot(request, new ActionListener<CreateSnapshotResponse>() {
logger.info("snapshot lifecycle policy [{}] issuing create snapshot [{}]",
policyMetadata.getPolicy().getId(), request.snapshot());
clientWithHeaders.admin().cluster().createSnapshot(request, new ActionListener<>() {
@Override
public void onResponse(CreateSnapshotResponse createSnapshotResponse) {
logger.info("snapshot response for [{}]: {}",
logger.debug("snapshot response for [{}]: {}",
policyMetadata.getPolicy().getId(), Strings.toString(createSnapshotResponse));
clusterService.submitStateUpdateTask("slm-record-success-" + policyMetadata.getPolicy().getId(),
WriteJobStatus.success(policyMetadata.getPolicy().getId(), request.snapshot(), Instant.now().toEpochMilli()));
Expand All @@ -77,12 +97,10 @@ public void onFailure(Exception e) {
WriteJobStatus.failure(policyMetadata.getPolicy().getId(), request.snapshot(), Instant.now().toEpochMilli(), e));
}
});
return true;
}).orElse(false);
return request.snapshot();
}).orElse(null);

if (successful == false) {
logger.warn("snapshot lifecycle policy for job [{}] no longer exists, snapshot not created", event.getJobName());
}
return Optional.ofNullable(snapshotName);
}

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/

package org.elasticsearch.xpack.snapshotlifecycle.action;

import org.elasticsearch.client.node.NodeClient;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.rest.BaseRestHandler;
import org.elasticsearch.rest.RestController;
import org.elasticsearch.rest.RestRequest;
import org.elasticsearch.rest.action.RestToXContentListener;
import org.elasticsearch.xpack.core.snapshotlifecycle.action.ExecuteSnapshotLifecycleAction;

import java.io.IOException;

public class RestExecuteSnapshotLifecycleAction extends BaseRestHandler {

public RestExecuteSnapshotLifecycleAction(Settings settings, RestController controller) {
super(settings);
controller.registerHandler(RestRequest.Method.PUT, "/_ilm/snapshot/{name}/_execute", this);
}

@Override
public String getName() {
return "ilm_execute_snapshot_lifecycle";
}

@Override
protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient client) throws IOException {
String snapLifecycleId = request.param("name");
ExecuteSnapshotLifecycleAction.Request req = new ExecuteSnapshotLifecycleAction.Request(snapLifecycleId);
req.timeout(request.paramAsTime("timeout", req.timeout()));
req.masterNodeTimeout(request.paramAsTime("master_timeout", req.masterNodeTimeout()));
return channel -> client.execute(ExecuteSnapshotLifecycleAction.INSTANCE, req, new RestToXContentListener<>(channel));
}
}
Loading