Skip to content

Commit

Permalink
Hook SLM into ILM's start and stop APIs (#40871)
Browse files Browse the repository at this point in the history
(This pull request is for the `snapshot-lifecycle-management` branch)

This change allows the existing `/_ilm/stop` and `/_ilm/start` APIs to also
manage snapshot lifecycle scheduling. When ILM is stopped all scheduled jobs are
cancelled.

Relates to #38461
  • Loading branch information
dakrone authored Apr 8, 2019
1 parent 3a19a30 commit 68b4387
Show file tree
Hide file tree
Showing 9 changed files with 183 additions and 28 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,15 @@
import org.elasticsearch.common.xcontent.ConstructingObjectParser;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.xpack.core.XPackPlugin.XPackMetaDataCustom;
import org.elasticsearch.xpack.core.indexlifecycle.OperationMode;

import java.io.IOException;
import java.util.Collections;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.TreeMap;
import java.util.function.Function;
import java.util.stream.Collectors;
Expand All @@ -37,13 +39,17 @@
public class SnapshotLifecycleMetadata implements XPackMetaDataCustom {

public static final String TYPE = "snapshot_lifecycle";
public static final ParseField OPERATION_MODE_FIELD = new ParseField("operation_mode");
public static final ParseField POLICIES_FIELD = new ParseField("policies");

public static final SnapshotLifecycleMetadata EMPTY = new SnapshotLifecycleMetadata(Collections.emptyMap(), OperationMode.RUNNING);

@SuppressWarnings("unchecked")
public static final ConstructingObjectParser<SnapshotLifecycleMetadata, Void> PARSER = new ConstructingObjectParser<>(TYPE,
a -> new SnapshotLifecycleMetadata(
((List<SnapshotLifecyclePolicyMetadata>) a[0]).stream()
.collect(Collectors.toMap(m -> m.getPolicy().getId(), Function.identity()))));
.collect(Collectors.toMap(m -> m.getPolicy().getId(), Function.identity())),
OperationMode.valueOf((String) a[1])));

static {
PARSER.declareNamedObjects(ConstructingObjectParser.constructorArg(), (p, c, n) -> SnapshotLifecyclePolicyMetadata.parse(p, n),
Expand All @@ -53,20 +59,26 @@ public class SnapshotLifecycleMetadata implements XPackMetaDataCustom {
}

private final Map<String, SnapshotLifecyclePolicyMetadata> snapshotConfigurations;
private final OperationMode operationMode;

public SnapshotLifecycleMetadata(Map<String, SnapshotLifecyclePolicyMetadata> snapshotConfigurations) {
public SnapshotLifecycleMetadata(Map<String, SnapshotLifecyclePolicyMetadata> snapshotConfigurations, OperationMode operationMode) {
this.snapshotConfigurations = new HashMap<>(snapshotConfigurations);
// TODO: maybe operation mode here so it can be disabled/re-enabled separately like ILM is
this.operationMode = operationMode;
}

public SnapshotLifecycleMetadata(StreamInput in) throws IOException {
this.snapshotConfigurations = in.readMap(StreamInput::readString, SnapshotLifecyclePolicyMetadata::new);
this.operationMode = in.readEnum(OperationMode.class);
}

public Map<String, SnapshotLifecyclePolicyMetadata> getSnapshotConfigurations() {
return Collections.unmodifiableMap(this.snapshotConfigurations);
}

public OperationMode getOperationMode() {
return operationMode;
}

@Override
public EnumSet<MetaData.XContentContext> context() {
return MetaData.ALL_CONTEXTS;
Expand All @@ -90,11 +102,13 @@ public Version getMinimalSupportedVersion() {
@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeMap(this.snapshotConfigurations, StreamOutput::writeString, (out1, value) -> value.writeTo(out1));
out.writeEnum(this.operationMode);
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.field(POLICIES_FIELD.getPreferredName(), this.snapshotConfigurations);
builder.field(OPERATION_MODE_FIELD.getPreferredName(), operationMode);
return builder;
}

Expand All @@ -103,26 +117,47 @@ public String toString() {
return Strings.toString(this);
}

@Override
public int hashCode() {
return Objects.hash(this.snapshotConfigurations, this.operationMode);
}

@Override
public boolean equals(Object obj) {
if (obj == null) {
return false;
}
if (obj.getClass() != getClass()) {
return false;
}
SnapshotLifecycleMetadata other = (SnapshotLifecycleMetadata) obj;
return this.snapshotConfigurations.equals(other.snapshotConfigurations) &&
this.operationMode.equals(other.operationMode);
}

public static class SnapshotLifecycleMetadataDiff implements NamedDiff<MetaData.Custom> {

final Diff<Map<String, SnapshotLifecyclePolicyMetadata>> lifecycles;
final OperationMode operationMode;

SnapshotLifecycleMetadataDiff(SnapshotLifecycleMetadata before, SnapshotLifecycleMetadata after) {
this.lifecycles = DiffableUtils.diff(before.snapshotConfigurations, after.snapshotConfigurations,
DiffableUtils.getStringKeySerializer());
this.operationMode = after.operationMode;
}

public SnapshotLifecycleMetadataDiff(StreamInput in) throws IOException {
this.lifecycles = DiffableUtils.readJdkMapDiff(in, DiffableUtils.getStringKeySerializer(),
SnapshotLifecyclePolicyMetadata::new,
SnapshotLifecycleMetadataDiff::readLifecyclePolicyDiffFrom);
this.operationMode = in.readEnum(OperationMode.class);
}

@Override
public MetaData.Custom apply(MetaData.Custom part) {
TreeMap<String, SnapshotLifecyclePolicyMetadata> newLifecycles = new TreeMap<>(
lifecycles.apply(((SnapshotLifecycleMetadata) part).snapshotConfigurations));
return new SnapshotLifecycleMetadata(newLifecycles);
return new SnapshotLifecycleMetadata(newLifecycles, this.operationMode);
}

@Override
Expand All @@ -133,6 +168,7 @@ public String getWriteableName() {
@Override
public void writeTo(StreamOutput out) throws IOException {
lifecycles.writeTo(out);
out.writeEnum(this.operationMode);
}

static Diff<SnapshotLifecyclePolicyMetadata> readLifecyclePolicyDiffFrom(StreamInput in) throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.xpack.core.indexlifecycle.OperationMode;
import org.elasticsearch.xpack.core.indexlifecycle.IndexLifecycleMetadata;
import org.elasticsearch.xpack.core.snapshotlifecycle.SnapshotLifecycleMetadata;

public class OperationModeUpdateTask extends ClusterStateUpdateTask {
private static final Logger logger = LogManager.getLogger(OperationModeUpdateTask.class);
Expand All @@ -27,6 +28,13 @@ OperationMode getOperationMode() {

@Override
public ClusterState execute(ClusterState currentState) {
ClusterState newState = currentState;
newState = updateILMState(newState);
newState = updateSLMState(newState);
return newState;
}

private ClusterState updateILMState(final ClusterState currentState) {
IndexLifecycleMetadata currentMetadata = currentState.metaData().custom(IndexLifecycleMetadata.TYPE);
if (currentMetadata != null && currentMetadata.getOperationMode().isValidChange(mode) == false) {
return currentState;
Expand All @@ -41,12 +49,33 @@ public ClusterState execute(ClusterState currentState) {
newMode = currentMetadata.getOperationMode();
}

ClusterState.Builder builder = new ClusterState.Builder(currentState);
MetaData.Builder metadataBuilder = MetaData.builder(currentState.metaData());
metadataBuilder.putCustom(IndexLifecycleMetadata.TYPE,
new IndexLifecycleMetadata(currentMetadata.getPolicyMetadatas(), newMode));
builder.metaData(metadataBuilder.build());
return builder.build();
return ClusterState.builder(currentState)
.metaData(MetaData.builder(currentState.metaData())
.putCustom(IndexLifecycleMetadata.TYPE,
new IndexLifecycleMetadata(currentMetadata.getPolicyMetadatas(), newMode)))
.build();
}

private ClusterState updateSLMState(final ClusterState currentState) {
SnapshotLifecycleMetadata currentMetadata = currentState.metaData().custom(SnapshotLifecycleMetadata.TYPE);
if (currentMetadata != null && currentMetadata.getOperationMode().isValidChange(mode) == false) {
return currentState;
} else if (currentMetadata == null) {
currentMetadata = SnapshotLifecycleMetadata.EMPTY;
}

final OperationMode newMode;
if (currentMetadata.getOperationMode().isValidChange(mode)) {
newMode = mode;
} else {
newMode = currentMetadata.getOperationMode();
}

return ClusterState.builder(currentState)
.metaData(MetaData.builder(currentState.metaData())
.putCustom(SnapshotLifecycleMetadata.TYPE,
new SnapshotLifecycleMetadata(currentMetadata.getSnapshotConfigurations(), newMode)))
.build();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xpack.core.indexlifecycle.OperationMode;
import org.elasticsearch.xpack.core.scheduler.CronSchedule;
import org.elasticsearch.xpack.core.scheduler.SchedulerEngine;
import org.elasticsearch.xpack.core.snapshotlifecycle.SnapshotLifecycleMetadata;
Expand Down Expand Up @@ -63,6 +64,14 @@ public SnapshotLifecycleService(Settings settings,
public void clusterChanged(final ClusterChangedEvent event) {
if (this.isMaster) {
final ClusterState state = event.state();

if (ilmStoppedOrStopping(state)) {
if (scheduler.scheduledJobIds().size() > 0) {
cancelSnapshotJobs();
}
return;
}

scheduleSnapshotJobs(state);
cleanupDeletedPolicies(state);
}
Expand All @@ -72,7 +81,12 @@ public void clusterChanged(final ClusterChangedEvent event) {
public void onMaster() {
this.isMaster = true;
scheduler.register(snapshotTask);
scheduleSnapshotJobs(clusterService.state());
final ClusterState state = clusterService.state();
if (ilmStoppedOrStopping(state)) {
// ILM is currently stopped, so don't schedule jobs
return;
}
scheduleSnapshotJobs(state);
}

@Override
Expand All @@ -87,6 +101,16 @@ SchedulerEngine getScheduler() {
return this.scheduler;
}

/**
* Returns true if ILM is in the stopped or stopped state
*/
private static boolean ilmStoppedOrStopping(ClusterState state) {
return Optional.ofNullable((SnapshotLifecycleMetadata) state.metaData().custom(SnapshotLifecycleMetadata.TYPE))
.map(SnapshotLifecycleMetadata::getOperationMode)
.map(mode -> OperationMode.STOPPING == mode || OperationMode.STOPPED == mode)
.orElse(false);
}

/**
* Schedule all non-scheduled snapshot jobs contained in the cluster state
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ public ClusterState execute(ClusterState currentState) throws Exception {
}

snapLifecycles.put(policyName, newPolicyMetadata.build());
SnapshotLifecycleMetadata lifecycleMetadata = new SnapshotLifecycleMetadata(snapLifecycles);
SnapshotLifecycleMetadata lifecycleMetadata = new SnapshotLifecycleMetadata(snapLifecycles, snapMeta.getOperationMode());
MetaData currentMeta = currentState.metaData();
return ClusterState.builder(currentState)
.metaData(MetaData.builder(currentMeta)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ public ClusterState execute(ClusterState currentState) {
return ClusterState.builder(currentState)
.metaData(MetaData.builder(metaData)
.putCustom(SnapshotLifecycleMetadata.TYPE,
new SnapshotLifecycleMetadata(newConfigs)))
new SnapshotLifecycleMetadata(newConfigs, snapMeta.getOperationMode())))
.build();
}
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,9 @@
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.core.ClientHelper;
import org.elasticsearch.xpack.core.indexlifecycle.IndexLifecycleMetadata;
import org.elasticsearch.xpack.core.indexlifecycle.LifecyclePolicy;
import org.elasticsearch.xpack.core.indexlifecycle.OperationMode;
import org.elasticsearch.xpack.core.snapshotlifecycle.SnapshotLifecycleMetadata;
import org.elasticsearch.xpack.core.snapshotlifecycle.SnapshotLifecyclePolicyMetadata;
import org.elasticsearch.xpack.core.snapshotlifecycle.action.PutSnapshotLifecycleAction;
Expand All @@ -34,6 +36,7 @@
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.stream.Collectors;

public class TransportPutSnapshotLifecycleAction extends
Expand Down Expand Up @@ -90,7 +93,11 @@ public ClusterState execute(ClusterState currentState) {
.setHeaders(filteredHeaders)
.setModifiedDate(Instant.now().toEpochMilli())
.build();
lifecycleMetadata = new SnapshotLifecycleMetadata(Collections.singletonMap(id, meta));
IndexLifecycleMetadata ilmMeta = currentState.metaData().custom(IndexLifecycleMetadata.TYPE);
OperationMode mode = Optional.ofNullable(ilmMeta)
.map(IndexLifecycleMetadata::getOperationMode)
.orElse(OperationMode.RUNNING);
lifecycleMetadata = new SnapshotLifecycleMetadata(Collections.singletonMap(id, meta), mode);
logger.info("adding new snapshot lifecycle [{}]", id);
} else {
Map<String, SnapshotLifecyclePolicyMetadata> snapLifecycles = new HashMap<>(snapMeta.getSnapshotConfigurations());
Expand All @@ -102,7 +109,7 @@ public ClusterState execute(ClusterState currentState) {
.setModifiedDate(Instant.now().toEpochMilli())
.build();
snapLifecycles.put(id, newLifecycle);
lifecycleMetadata = new SnapshotLifecycleMetadata(snapLifecycles);
lifecycleMetadata = new SnapshotLifecycleMetadata(snapLifecycles, snapMeta.getOperationMode());
if (oldLifecycle == null) {
logger.info("adding new snapshot lifecycle [{}]", id);
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,10 @@
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.common.collect.ImmutableOpenMap;
import org.elasticsearch.xpack.core.indexlifecycle.OperationMode;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.xpack.core.indexlifecycle.IndexLifecycleMetadata;
import org.elasticsearch.xpack.core.indexlifecycle.OperationMode;
import org.elasticsearch.xpack.core.snapshotlifecycle.SnapshotLifecycleMetadata;

import java.util.Collections;

Expand Down Expand Up @@ -57,11 +58,15 @@ private void assertNoMove(OperationMode currentMode, OperationMode requestedMode
private OperationMode executeUpdate(boolean metadataInstalled, OperationMode currentMode, OperationMode requestMode,
boolean assertSameClusterState) {
IndexLifecycleMetadata indexLifecycleMetadata = new IndexLifecycleMetadata(Collections.emptyMap(), currentMode);
SnapshotLifecycleMetadata snapshotLifecycleMetadata = new SnapshotLifecycleMetadata(Collections.emptyMap(), currentMode);
ImmutableOpenMap.Builder<String, MetaData.Custom> customsMapBuilder = ImmutableOpenMap.builder();
MetaData.Builder metaData = MetaData.builder()
.persistentSettings(settings(Version.CURRENT).build());
if (metadataInstalled) {
metaData.customs(customsMapBuilder.fPut(IndexLifecycleMetadata.TYPE, indexLifecycleMetadata).build());
metaData.customs(customsMapBuilder
.fPut(IndexLifecycleMetadata.TYPE, indexLifecycleMetadata)
.fPut(SnapshotLifecycleMetadata.TYPE, snapshotLifecycleMetadata)
.build());
}
ClusterState state = ClusterState.builder(ClusterName.DEFAULT).metaData(metaData).build();
OperationModeUpdateTask task = new OperationModeUpdateTask(requestMode);
Expand Down
Loading

0 comments on commit 68b4387

Please sign in to comment.