diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/XPackClientPlugin.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/XPackClientPlugin.java index 3cef3cc8f0e62..9aa531a70d136 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/XPackClientPlugin.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/XPackClientPlugin.java @@ -203,6 +203,7 @@ import org.elasticsearch.xpack.core.textstructure.action.FindStructureAction; import org.elasticsearch.xpack.core.transform.TransformFeatureSetUsage; import org.elasticsearch.xpack.core.transform.TransformField; +import org.elasticsearch.xpack.core.transform.TransformMetadata; import org.elasticsearch.xpack.core.transform.action.DeleteTransformAction; import org.elasticsearch.xpack.core.transform.action.GetTransformAction; import org.elasticsearch.xpack.core.transform.action.GetTransformStatsAction; @@ -500,6 +501,8 @@ public List getNamedWriteables() { new NamedWriteableRegistry.Entry(LifecycleAction.class, SearchableSnapshotAction.NAME, SearchableSnapshotAction::new), new NamedWriteableRegistry.Entry(LifecycleAction.class, MigrateAction.NAME, MigrateAction::new), // Transforms + new NamedWriteableRegistry.Entry(Metadata.Custom.class, TransformMetadata.TYPE, TransformMetadata::new), + new NamedWriteableRegistry.Entry(NamedDiff.class, TransformMetadata.TYPE, TransformMetadata.TransformMetadataDiff::new), new NamedWriteableRegistry.Entry(XPackFeatureSet.Usage.class, XPackField.TRANSFORM, TransformFeatureSetUsage::new), new NamedWriteableRegistry.Entry(PersistentTaskParams.class, TransformField.TASK_NAME, TransformTaskParams::new), new NamedWriteableRegistry.Entry(Task.Status.class, TransformField.TASK_NAME, TransformState::new), @@ -580,7 +583,9 @@ public List getNamedXContent() { new NamedXContentRegistry.Entry(Task.Status.class, new ParseField(TransformField.TASK_NAME), TransformState::fromXContent), new NamedXContentRegistry.Entry(PersistentTaskState.class, new ParseField(TransformField.TASK_NAME), - TransformState::fromXContent) + TransformState::fromXContent), + new NamedXContentRegistry.Entry(Metadata.Custom.class, new ParseField(TransformMetadata.TYPE), + parser -> TransformMetadata.LENIENT_PARSER.parse(parser, null).build()) ); } } diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/XPackPlugin.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/XPackPlugin.java index d4b8848dddcc3..ee1132c524bef 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/XPackPlugin.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/XPackPlugin.java @@ -82,6 +82,7 @@ import org.elasticsearch.xpack.core.ssl.SSLConfiguration; import org.elasticsearch.xpack.core.ssl.SSLConfigurationReloader; import org.elasticsearch.xpack.core.ssl.SSLService; +import org.elasticsearch.xpack.core.transform.TransformMetadata; import org.elasticsearch.xpack.core.watcher.WatcherMetadata; import org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshotsConstants; @@ -240,7 +241,8 @@ private static boolean alreadyContainsXPackCustomMetadata(ClusterState clusterSt return metadata.custom(LicensesMetadata.TYPE) != null || metadata.custom(MlMetadata.TYPE) != null || metadata.custom(WatcherMetadata.TYPE) != null || - clusterState.custom(TokenMetadata.TYPE) != null; + clusterState.custom(TokenMetadata.TYPE) != null || + metadata.custom(TransformMetadata.TYPE) != null; } @Override diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/action/AbstractTransportSetResetModeAction.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/action/AbstractTransportSetResetModeAction.java new file mode 100644 index 0000000000000..55bf8d8fd3736 --- /dev/null +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/action/AbstractTransportSetResetModeAction.java @@ -0,0 +1,131 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.core.action; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.apache.logging.log4j.message.ParameterizedMessage; +import org.elasticsearch.ElasticsearchTimeoutException; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.support.ActionFilters; +import org.elasticsearch.action.support.master.AcknowledgedResponse; +import org.elasticsearch.action.support.master.AcknowledgedTransportMasterNodeAction; +import org.elasticsearch.cluster.AckedClusterStateUpdateTask; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.block.ClusterBlockException; +import org.elasticsearch.cluster.block.ClusterBlockLevel; +import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.tasks.Task; +import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.TransportService; + +public abstract class AbstractTransportSetResetModeAction extends AcknowledgedTransportMasterNodeAction { + + private static final Logger logger = LogManager.getLogger(AbstractTransportSetResetModeAction.class); + private final ClusterService clusterService; + + @Inject + public AbstractTransportSetResetModeAction( + String actionName, + TransportService transportService, + ThreadPool threadPool, + ClusterService clusterService, + ActionFilters actionFilters, + IndexNameExpressionResolver indexNameExpressionResolver) { + super( + actionName, + transportService, + clusterService, + threadPool, + actionFilters, + SetResetModeActionRequest::new, + indexNameExpressionResolver, + ThreadPool.Names.SAME + ); + this.clusterService = clusterService; + } + + protected abstract boolean isResetMode(ClusterState clusterState); + + protected abstract String featureName(); + + protected abstract ClusterState setState(ClusterState oldState, SetResetModeActionRequest request); + + @Override + protected void masterOperation(Task task, + SetResetModeActionRequest request, + ClusterState state, + ActionListener listener) throws Exception { + + final boolean isResetModeEnabled = isResetMode(state); + // Noop, nothing for us to do, simply return fast to the caller + if (request.isEnabled() == isResetModeEnabled) { + logger.debug(() -> new ParameterizedMessage("Reset mode noop for [{}]", featureName())); + listener.onResponse(AcknowledgedResponse.TRUE); + return; + } + + logger.debug( + () -> new ParameterizedMessage( + "Starting to set [reset_mode] for [{}] to [{}] from [{}]", + featureName(), + request.isEnabled(), + isResetModeEnabled + ) + ); + + ActionListener wrappedListener = ActionListener.wrap( + r -> { + logger.debug(() -> new ParameterizedMessage("Completed reset mode request for [{}]", featureName())); + listener.onResponse(r); + }, + e -> { + logger.debug( + () -> new ParameterizedMessage("Completed reset mode for [{}] request but with failure", featureName()), + e + ); + listener.onFailure(e); + } + ); + + ActionListener clusterStateUpdateListener = ActionListener.wrap( + acknowledgedResponse -> { + if (acknowledgedResponse.isAcknowledged() == false) { + wrappedListener.onFailure(new ElasticsearchTimeoutException("Unknown error occurred while updating cluster state")); + return; + } + wrappedListener.onResponse(acknowledgedResponse); + }, + wrappedListener::onFailure + ); + + clusterService.submitStateUpdateTask(featureName() + "-set-reset-mode", + new AckedClusterStateUpdateTask(request, clusterStateUpdateListener) { + + @Override + protected AcknowledgedResponse newResponse(boolean acknowledged) { + logger.trace(() -> new ParameterizedMessage("Cluster update response built for [{}]: {}", featureName(), acknowledged)); + return AcknowledgedResponse.of(acknowledged); + } + + @Override + public ClusterState execute(ClusterState currentState) { + logger.trace(() -> new ParameterizedMessage("Executing cluster state update for [{}]", featureName())); + return setState(currentState, request); + } + }); + } + + @Override + protected ClusterBlockException checkBlock(SetResetModeActionRequest request, ClusterState state) { + return state.blocks().globalBlockedException(ClusterBlockLevel.METADATA_WRITE); + } + +} diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/action/SetResetModeActionRequest.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/action/SetResetModeActionRequest.java new file mode 100644 index 0000000000000..2f575a8296e92 --- /dev/null +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/action/SetResetModeActionRequest.java @@ -0,0 +1,90 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.core.action; + +import org.elasticsearch.action.ActionRequestValidationException; +import org.elasticsearch.action.support.master.AcknowledgedRequest; +import org.elasticsearch.common.ParseField; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.xcontent.ConstructingObjectParser; +import org.elasticsearch.common.xcontent.ToXContent; +import org.elasticsearch.common.xcontent.ToXContentObject; +import org.elasticsearch.common.xcontent.XContentBuilder; + +import java.io.IOException; +import java.util.Objects; + +public class SetResetModeActionRequest extends AcknowledgedRequest implements ToXContentObject { + public static SetResetModeActionRequest enabled() { + return new SetResetModeActionRequest(true); + } + + public static SetResetModeActionRequest disabled() { + return new SetResetModeActionRequest(false); + } + + private final boolean enabled; + + private static final ParseField ENABLED = new ParseField("enabled"); + public static final ConstructingObjectParser PARSER = + new ConstructingObjectParser<>("set_reset_mode_action_request", a -> new SetResetModeActionRequest((Boolean)a[0])); + + static { + PARSER.declareBoolean(ConstructingObjectParser.constructorArg(), ENABLED); + } + + SetResetModeActionRequest(boolean enabled) { + this.enabled = enabled; + } + + public SetResetModeActionRequest(StreamInput in) throws IOException { + super(in); + this.enabled = in.readBoolean(); + } + + public boolean isEnabled() { + return enabled; + } + + @Override + public ActionRequestValidationException validate() { + return null; + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + out.writeBoolean(enabled); + } + + @Override + public int hashCode() { + return Objects.hash(enabled); + } + + @Override + public boolean equals(Object obj) { + if (this == obj) { + return true; + } + if (obj == null || obj.getClass() != getClass()) { + return false; + } + SetResetModeActionRequest other = (SetResetModeActionRequest) obj; + return Objects.equals(enabled, other.enabled); + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, ToXContent.Params params) throws IOException { + builder.startObject(); + builder.field(ENABLED.getPreferredName(), enabled); + builder.endObject(); + return builder; + } +} diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/SetResetModeAction.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/SetResetModeAction.java index ade9e2337d4a6..0bf17c1fb71c1 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/SetResetModeAction.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/SetResetModeAction.java @@ -6,19 +6,9 @@ */ package org.elasticsearch.xpack.core.ml.action; -import org.elasticsearch.action.ActionRequestValidationException; import org.elasticsearch.action.ActionType; -import org.elasticsearch.action.support.master.AcknowledgedRequest; import org.elasticsearch.action.support.master.AcknowledgedResponse; -import org.elasticsearch.common.ParseField; -import org.elasticsearch.common.io.stream.StreamInput; -import org.elasticsearch.common.io.stream.StreamOutput; -import org.elasticsearch.common.xcontent.ConstructingObjectParser; -import org.elasticsearch.common.xcontent.ToXContentObject; -import org.elasticsearch.common.xcontent.XContentBuilder; -import java.io.IOException; -import java.util.Objects; public class SetResetModeAction extends ActionType { @@ -29,73 +19,4 @@ private SetResetModeAction() { super(NAME, AcknowledgedResponse::readFrom); } - public static class Request extends AcknowledgedRequest implements ToXContentObject { - - public static Request enabled() { - return new Request(true); - } - - public static Request disabled() { - return new Request(false); - } - - private final boolean enabled; - - private static final ParseField ENABLED = new ParseField("enabled"); - public static final ConstructingObjectParser PARSER = - new ConstructingObjectParser<>(NAME, a -> new Request((Boolean)a[0])); - - static { - PARSER.declareBoolean(ConstructingObjectParser.constructorArg(), ENABLED); - } - - Request(boolean enabled) { - this.enabled = enabled; - } - - public Request(StreamInput in) throws IOException { - super(in); - this.enabled = in.readBoolean(); - } - - public boolean isEnabled() { - return enabled; - } - - @Override - public ActionRequestValidationException validate() { - return null; - } - - @Override - public void writeTo(StreamOutput out) throws IOException { - super.writeTo(out); - out.writeBoolean(enabled); - } - - @Override - public int hashCode() { - return Objects.hash(enabled); - } - - @Override - public boolean equals(Object obj) { - if (this == obj) { - return true; - } - if (obj == null || obj.getClass() != getClass()) { - return false; - } - Request other = (Request) obj; - return Objects.equals(enabled, other.enabled); - } - - @Override - public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { - builder.startObject(); - builder.field(ENABLED.getPreferredName(), enabled); - builder.endObject(); - return builder; - } - } } diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/TransformMessages.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/TransformMessages.java index 21457002c03c4..d81d96444f0ad 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/TransformMessages.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/TransformMessages.java @@ -29,6 +29,9 @@ public class TransformMessages { public static final String TRANSFORM_FAILED_TO_PERSIST_STATS = "Failed to persist transform statistics for transform [{0}]"; public static final String UNKNOWN_TRANSFORM_STATS = "Statistics for transform [{0}] could not be found"; + public static final String FAILED_TO_UNSET_RESET_MODE = + "Failed to set [reset_mode] to [false] after {0}. To allow transforms to run, please call the feature reset API again"; + public static final String REST_DEPRECATED_ENDPOINT = "[_data_frame/transforms/] is deprecated, use [_transform/] in the future."; public static final String REST_WARN_NO_TRANSFORM_NODES = "Transform requires the transform node role for at least 1 node, found no transform nodes"; diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/TransformMetadata.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/TransformMetadata.java new file mode 100644 index 0000000000000..78ac0d7589f09 --- /dev/null +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/TransformMetadata.java @@ -0,0 +1,176 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.core.transform; + +import org.elasticsearch.Version; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.Diff; +import org.elasticsearch.cluster.NamedDiff; +import org.elasticsearch.cluster.metadata.Metadata; +import org.elasticsearch.common.Nullable; +import org.elasticsearch.common.ParseField; +import org.elasticsearch.common.Strings; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.xcontent.ObjectParser; +import org.elasticsearch.common.xcontent.ToXContent; +import org.elasticsearch.common.xcontent.XContentBuilder; + +import java.io.IOException; +import java.util.EnumSet; +import java.util.Objects; + + +public class TransformMetadata implements Metadata.Custom { + public static final String TYPE = "transform"; + public static final ParseField RESET_MODE = new ParseField("reset_mode"); + + public static final TransformMetadata EMPTY_METADATA = new TransformMetadata(false); + // This parser follows the pattern that metadata is parsed leniently (to allow for enhancements) + public static final ObjectParser LENIENT_PARSER = new ObjectParser<>("" + + "transform_metadata", + true, + TransformMetadata.Builder::new); + + static { + LENIENT_PARSER.declareBoolean(TransformMetadata.Builder::isResetMode, RESET_MODE); + } + + private final boolean resetMode; + + private TransformMetadata(boolean resetMode) { + this.resetMode = resetMode; + } + + public boolean isResetMode() { + return resetMode; + } + + @Override + public Version getMinimalSupportedVersion() { + return Version.CURRENT.minimumIndexCompatibilityVersion(); + } + + @Override + public String getWriteableName() { + return TYPE; + } + + @Override + public EnumSet context() { + return Metadata.ALL_CONTEXTS; + } + + @Override + public Diff diff(Metadata.Custom previousState) { + return new TransformMetadata.TransformMetadataDiff((TransformMetadata) previousState, this); + } + + public TransformMetadata(StreamInput in) throws IOException { + this.resetMode = in.readBoolean(); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeBoolean(resetMode); + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, ToXContent.Params params) throws IOException { + builder.field(RESET_MODE.getPreferredName(), resetMode); + return builder; + } + + public static class TransformMetadataDiff implements NamedDiff { + + final boolean resetMode; + + TransformMetadataDiff(TransformMetadata before, TransformMetadata after) { + this.resetMode = after.resetMode; + } + + public TransformMetadataDiff(StreamInput in) throws IOException { + resetMode = in.readBoolean(); + } + + /** + * Merge the diff with the transform metadata. + * @param part The current transform metadata. + * @return The new transform metadata. + */ + @Override + public Metadata.Custom apply(Metadata.Custom part) { + return new TransformMetadata(resetMode); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeBoolean(resetMode); + } + + @Override + public String getWriteableName() { + return TYPE; + } + } + + @Override + public boolean equals(Object o) { + if (this == o) + return true; + if (o == null || getClass() != o.getClass()) + return false; + TransformMetadata that = (TransformMetadata) o; + return resetMode == that.resetMode; + } + + @Override + public final String toString() { + return Strings.toString(this); + } + + @Override + public int hashCode() { + return Objects.hash(resetMode); + } + + public static class Builder { + + private boolean resetMode; + + public static TransformMetadata.Builder from(@Nullable TransformMetadata previous) { + return new TransformMetadata.Builder(previous); + } + + public Builder() { + } + + public Builder(@Nullable TransformMetadata previous) { + if (previous != null) { + resetMode = previous.resetMode; + } + } + + public TransformMetadata.Builder isResetMode(boolean resetMode) { + this.resetMode = resetMode; + return this; + } + + public TransformMetadata build() { + return new TransformMetadata(resetMode); + } + } + + public static TransformMetadata getTransformMetadata(ClusterState state) { + TransformMetadata TransformMetadata = (state == null) ? null : state.getMetadata().custom(TYPE); + if (TransformMetadata == null) { + return EMPTY_METADATA; + } + return TransformMetadata; + } +} diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/action/SetResetModeAction.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/action/SetResetModeAction.java new file mode 100644 index 0000000000000..5d688180319a2 --- /dev/null +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/action/SetResetModeAction.java @@ -0,0 +1,22 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ +package org.elasticsearch.xpack.core.transform.action; + +import org.elasticsearch.action.ActionType; +import org.elasticsearch.action.support.master.AcknowledgedResponse; + + +public class SetResetModeAction extends ActionType { + + public static final SetResetModeAction INSTANCE = new SetResetModeAction(); + public static final String NAME = "cluster:internal/xpack/transform/reset_mode"; + + private SetResetModeAction() { + super(NAME, AcknowledgedResponse::readFrom); + } + +} diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/transforms/persistence/TransformInternalIndexConstants.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/transforms/persistence/TransformInternalIndexConstants.java index 1f8472d159e56..293ac9055c3c0 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/transforms/persistence/TransformInternalIndexConstants.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/transforms/persistence/TransformInternalIndexConstants.java @@ -27,25 +27,27 @@ public final class TransformInternalIndexConstants { */ // internal index + public static final String TRANSFORM_PREFIX = ".transform-"; + public static final String TRANSFORM_PREFIX_DEPRECATED = ".data-frame-"; // version is not a rollover pattern, however padded because sort is string based public static final Version INDEX_VERSION_LAST_CHANGED = Version.V_7_12_0; public static final String INDEX_VERSION = "006"; - public static final String INDEX_PATTERN = ".transform-internal-"; + public static final String INDEX_PATTERN = TRANSFORM_PREFIX + "internal-"; public static final String LATEST_INDEX_VERSIONED_NAME = INDEX_PATTERN + INDEX_VERSION; public static final String LATEST_INDEX_NAME = LATEST_INDEX_VERSIONED_NAME; public static final String INDEX_NAME_PATTERN = INDEX_PATTERN + "*"; - public static final String INDEX_NAME_PATTERN_DEPRECATED = ".data-frame-internal-*"; + public static final String INDEX_NAME_PATTERN_DEPRECATED = TRANSFORM_PREFIX_DEPRECATED + "internal-*"; // audit index // gh #49730: upped version of audit index to 000002 public static final String AUDIT_TEMPLATE_VERSION = "000002"; - public static final String AUDIT_INDEX_PREFIX = ".transform-notifications-"; + public static final String AUDIT_INDEX_PREFIX = TRANSFORM_PREFIX + "notifications-"; public static final String AUDIT_INDEX_PATTERN = AUDIT_INDEX_PREFIX + "*"; - public static final String AUDIT_INDEX_DEPRECATED = ".data-frame-notifications-1"; - public static final String AUDIT_INDEX_PATTERN_DEPRECATED = ".data-frame-notifications-*"; + public static final String AUDIT_INDEX_DEPRECATED = TRANSFORM_PREFIX_DEPRECATED + "notifications-1"; + public static final String AUDIT_INDEX_PATTERN_DEPRECATED = TRANSFORM_PREFIX_DEPRECATED + "notifications-*"; - public static final String AUDIT_INDEX_READ_ALIAS = ".transform-notifications-read"; + public static final String AUDIT_INDEX_READ_ALIAS = TRANSFORM_PREFIX + "notifications-read"; public static final String AUDIT_INDEX = AUDIT_INDEX_PREFIX + AUDIT_TEMPLATE_VERSION; private TransformInternalIndexConstants() {} diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/action/SetResetModeActionRequestTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/action/SetResetModeActionRequestTests.java similarity index 56% rename from x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/action/SetResetModeActionRequestTests.java rename to x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/action/SetResetModeActionRequestTests.java index f1dbf1e970ef9..e3cf1c5114d1b 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/action/SetResetModeActionRequestTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/action/SetResetModeActionRequestTests.java @@ -4,18 +4,17 @@ * 2.0; you may not use this file except in compliance with the Elastic License * 2.0. */ -package org.elasticsearch.xpack.core.ml.action; +package org.elasticsearch.xpack.core.action; import org.elasticsearch.common.io.stream.Writeable; import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.test.AbstractSerializingTestCase; -import org.elasticsearch.xpack.core.ml.action.SetResetModeAction.Request; -public class SetResetModeActionRequestTests extends AbstractSerializingTestCase { +public class SetResetModeActionRequestTests extends AbstractSerializingTestCase { @Override - protected Request createTestInstance() { - return new Request(randomBoolean()); + protected SetResetModeActionRequest createTestInstance() { + return new SetResetModeActionRequest(randomBoolean()); } @Override @@ -24,12 +23,12 @@ protected boolean supportsUnknownFields() { } @Override - protected Writeable.Reader instanceReader() { - return Request::new; + protected Writeable.Reader instanceReader() { + return SetResetModeActionRequest::new; } @Override - protected Request doParseInstance(XContentParser parser) { - return Request.PARSER.apply(parser, null); + protected SetResetModeActionRequest doParseInstance(XContentParser parser) { + return SetResetModeActionRequest.PARSER.apply(parser, null); } } diff --git a/x-pack/plugin/ml/src/internalClusterTest/java/org/elasticsearch/xpack/ml/integration/AnnotationIndexIT.java b/x-pack/plugin/ml/src/internalClusterTest/java/org/elasticsearch/xpack/ml/integration/AnnotationIndexIT.java index 08aa48751e7ab..220370702772a 100644 --- a/x-pack/plugin/ml/src/internalClusterTest/java/org/elasticsearch/xpack/ml/integration/AnnotationIndexIT.java +++ b/x-pack/plugin/ml/src/internalClusterTest/java/org/elasticsearch/xpack/ml/integration/AnnotationIndexIT.java @@ -20,6 +20,7 @@ import org.elasticsearch.rest.RestStatus; import org.elasticsearch.test.ESIntegTestCase; import org.elasticsearch.xpack.core.XPackSettings; +import org.elasticsearch.xpack.core.action.SetResetModeActionRequest; import org.elasticsearch.xpack.core.ml.action.SetResetModeAction; import org.elasticsearch.xpack.core.ml.action.SetUpgradeModeAction; import org.elasticsearch.xpack.core.ml.annotations.AnnotationIndex; @@ -98,7 +99,7 @@ public void testNotCreatedWhenAfterOtherMlIndexAndUpgradeInProgress() throws Exc public void testNotCreatedWhenAfterOtherMlIndexAndResetInProgress() throws Exception { - client().execute(SetResetModeAction.INSTANCE, SetResetModeAction.Request.enabled()).actionGet(); + client().execute(SetResetModeAction.INSTANCE, SetResetModeActionRequest.enabled()).actionGet(); try { @@ -117,7 +118,7 @@ public void testNotCreatedWhenAfterOtherMlIndexAndResetInProgress() throws Excep assertEquals(0, numberOfAnnotationsAliases()); }); } finally { - client().execute(SetResetModeAction.INSTANCE, SetResetModeAction.Request.disabled()).actionGet(); + client().execute(SetResetModeAction.INSTANCE, SetResetModeActionRequest.disabled()).actionGet(); } } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java index eb399915d8874..b0cf95c200ce3 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java @@ -77,6 +77,7 @@ import org.elasticsearch.xpack.autoscaling.capacity.AutoscalingDeciderService; import org.elasticsearch.xpack.core.XPackPlugin; import org.elasticsearch.xpack.core.XPackSettings; +import org.elasticsearch.xpack.core.action.SetResetModeActionRequest; import org.elasticsearch.xpack.core.action.XPackInfoFeatureAction; import org.elasticsearch.xpack.core.action.XPackUsageFeatureAction; import org.elasticsearch.xpack.core.ml.MachineLearningField; @@ -1253,7 +1254,7 @@ public void cleanUpFeature( logger.info("Starting machine learning feature reset"); ActionListener unsetResetModeListener = ActionListener.wrap( - success -> client.execute(SetResetModeAction.INSTANCE, SetResetModeAction.Request.disabled(), ActionListener.wrap( + success -> client.execute(SetResetModeAction.INSTANCE, SetResetModeActionRequest.disabled(), ActionListener.wrap( resetSuccess -> finalListener.onResponse(success), resetFailure -> { logger.error("failed to disable reset mode after state otherwise successful machine learning reset", resetFailure); @@ -1265,7 +1266,7 @@ public void cleanUpFeature( ); }) ), - failure -> client.execute(SetResetModeAction.INSTANCE, SetResetModeAction.Request.disabled(), ActionListener.wrap( + failure -> client.execute(SetResetModeAction.INSTANCE, SetResetModeActionRequest.disabled(), ActionListener.wrap( resetSuccess -> finalListener.onFailure(failure), resetFailure -> { logger.error("failed to disable reset mode after state clean up failure", resetFailure); @@ -1385,7 +1386,7 @@ public void cleanUpFeature( ); // Indicate that a reset is now in progress - client.execute(SetResetModeAction.INSTANCE, SetResetModeAction.Request.enabled(), afterResetModeSet); + client.execute(SetResetModeAction.INSTANCE, SetResetModeActionRequest.enabled(), afterResetModeSet); } @Override diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportSetResetModeAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportSetResetModeAction.java index e001b31caf9c3..1d97a9c997481 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportSetResetModeAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportSetResetModeAction.java @@ -6,108 +6,45 @@ */ package org.elasticsearch.xpack.ml.action; -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; -import org.apache.logging.log4j.message.ParameterizedMessage; -import org.elasticsearch.ElasticsearchTimeoutException; -import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.support.ActionFilters; -import org.elasticsearch.action.support.master.AcknowledgedResponse; -import org.elasticsearch.action.support.master.AcknowledgedTransportMasterNodeAction; -import org.elasticsearch.cluster.AckedClusterStateUpdateTask; import org.elasticsearch.cluster.ClusterState; -import org.elasticsearch.cluster.block.ClusterBlockException; -import org.elasticsearch.cluster.block.ClusterBlockLevel; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; import org.elasticsearch.cluster.metadata.Metadata; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.inject.Inject; -import org.elasticsearch.tasks.Task; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; +import org.elasticsearch.xpack.core.action.AbstractTransportSetResetModeAction; +import org.elasticsearch.xpack.core.action.SetResetModeActionRequest; import org.elasticsearch.xpack.core.ml.MlMetadata; import org.elasticsearch.xpack.core.ml.action.SetResetModeAction; -public class TransportSetResetModeAction extends AcknowledgedTransportMasterNodeAction { - - private static final Logger logger = LogManager.getLogger(TransportSetResetModeAction.class); - private final ClusterService clusterService; +public class TransportSetResetModeAction extends AbstractTransportSetResetModeAction { @Inject public TransportSetResetModeAction(TransportService transportService, ThreadPool threadPool, ClusterService clusterService, ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver) { - super(SetResetModeAction.NAME, transportService, clusterService, threadPool, actionFilters, SetResetModeAction.Request::new, - indexNameExpressionResolver, ThreadPool.Names.SAME); - this.clusterService = clusterService; + super(SetResetModeAction.NAME, transportService, threadPool, clusterService, actionFilters, indexNameExpressionResolver); } @Override - protected void masterOperation(Task task, - SetResetModeAction.Request request, - ClusterState state, - ActionListener listener) throws Exception { - - // Noop, nothing for us to do, simply return fast to the caller - if (request.isEnabled() == MlMetadata.getMlMetadata(state).isResetMode()) { - logger.debug("Reset mode noop"); - listener.onResponse(AcknowledgedResponse.TRUE); - return; - } - - logger.debug( - () -> new ParameterizedMessage( - "Starting to set [reset_mode] to [{}] from [{}]", request.isEnabled(), MlMetadata.getMlMetadata(state).isResetMode() - ) - ); - - ActionListener wrappedListener = ActionListener.wrap( - r -> { - logger.debug("Completed reset mode request"); - listener.onResponse(r); - }, - e -> { - logger.debug("Completed reset mode request but with failure", e); - listener.onFailure(e); - } - ); - - ActionListener clusterStateUpdateListener = ActionListener.wrap( - acknowledgedResponse -> { - if (acknowledgedResponse.isAcknowledged() == false) { - logger.info("Cluster state update is NOT acknowledged"); - wrappedListener.onFailure(new ElasticsearchTimeoutException("Unknown error occurred while updating cluster state")); - return; - } - wrappedListener.onResponse(acknowledgedResponse); - }, - wrappedListener::onFailure - ); - - clusterService.submitStateUpdateTask("ml-set-reset-mode", - new AckedClusterStateUpdateTask(request, clusterStateUpdateListener) { - - @Override - protected AcknowledgedResponse newResponse(boolean acknowledged) { - logger.trace(() -> new ParameterizedMessage("Cluster update response built: {}", acknowledged)); - return AcknowledgedResponse.of(acknowledged); - } + protected boolean isResetMode(ClusterState clusterState) { + return MlMetadata.getMlMetadata(clusterState).isResetMode(); + } - @Override - public ClusterState execute(ClusterState currentState) { - logger.trace("Executing cluster state update"); - MlMetadata.Builder builder = MlMetadata.Builder - .from(currentState.metadata().custom(MlMetadata.TYPE)) - .isResetMode(request.isEnabled()); - ClusterState.Builder newState = ClusterState.builder(currentState); - newState.metadata(Metadata.builder(currentState.getMetadata()).putCustom(MlMetadata.TYPE, builder.build()).build()); - return newState.build(); - } - }); + @Override + protected String featureName() { + return "ml"; } @Override - protected ClusterBlockException checkBlock(SetResetModeAction.Request request, ClusterState state) { - return state.blocks().globalBlockedException(ClusterBlockLevel.METADATA_WRITE); + protected ClusterState setState(ClusterState oldState, SetResetModeActionRequest request) { + MlMetadata.Builder builder = MlMetadata.Builder + .from(oldState.metadata().custom(MlMetadata.TYPE)) + .isResetMode(request.isEnabled()); + ClusterState.Builder newState = ClusterState.builder(oldState); + newState.metadata(Metadata.builder(oldState.getMetadata()).putCustom(MlMetadata.TYPE, builder.build()).build()); + return newState.build(); } } diff --git a/x-pack/plugin/security/qa/operator-privileges-tests/src/javaRestTest/java/org/elasticsearch/xpack/security/operator/Constants.java b/x-pack/plugin/security/qa/operator-privileges-tests/src/javaRestTest/java/org/elasticsearch/xpack/security/operator/Constants.java index b676591c87a35..0123523affbf5 100644 --- a/x-pack/plugin/security/qa/operator-privileges-tests/src/javaRestTest/java/org/elasticsearch/xpack/security/operator/Constants.java +++ b/x-pack/plugin/security/qa/operator-privileges-tests/src/javaRestTest/java/org/elasticsearch/xpack/security/operator/Constants.java @@ -217,6 +217,7 @@ public class Constants { "cluster:internal/xpack/ml/job/kill/process", "cluster:internal/xpack/ml/job/update/process", "cluster:internal/xpack/ml/reset_mode", + "cluster:internal/xpack/transform/reset_mode", "cluster:monitor/allocation/explain", "cluster:monitor/async_search/status", "cluster:monitor/ccr/follow_info", diff --git a/x-pack/plugin/transform/qa/multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/transform/integration/TestFeatureResetIT.java b/x-pack/plugin/transform/qa/multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/transform/integration/TestFeatureResetIT.java new file mode 100644 index 0000000000000..3614790a2b2c3 --- /dev/null +++ b/x-pack/plugin/transform/qa/multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/transform/integration/TestFeatureResetIT.java @@ -0,0 +1,95 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ +package org.elasticsearch.xpack.transform.integration; + +import org.elasticsearch.client.Request; +import org.elasticsearch.client.RequestOptions; +import org.elasticsearch.client.Response; +import org.elasticsearch.client.feature.ResetFeaturesRequest; +import org.elasticsearch.client.transform.transforms.TimeSyncConfig; +import org.elasticsearch.client.transform.transforms.TransformConfig; +import org.elasticsearch.client.transform.transforms.pivot.SingleGroupSource; +import org.elasticsearch.client.transform.transforms.pivot.TermsGroupSource; +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.index.query.QueryBuilders; +import org.elasticsearch.search.aggregations.AggregationBuilders; +import org.elasticsearch.search.aggregations.AggregatorFactories; +import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramInterval; +import org.elasticsearch.test.rest.ESRestTestCase; +import org.junit.After; + +import java.util.HashMap; +import java.util.Map; + +import static org.hamcrest.Matchers.anEmptyMap; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.not; +import static org.hamcrest.Matchers.nullValue; + +public class TestFeatureResetIT extends TransformIntegTestCase { + + @After + public void cleanup() throws Exception { + cleanUp(); + } + + @SuppressWarnings("unchecked") + public void testTransformFeatureReset() throws Exception { + String indexName = "basic-crud-reviews"; + String transformId = "batch-transform-feature-reset"; + createReviewsIndex(indexName, 100, 100, TransformIT::getUserIdForRow, TransformIT::getDateStringForRow); + + Map groups = new HashMap<>(); + groups.put("by-day", createDateHistogramGroupSourceWithCalendarInterval("timestamp", DateHistogramInterval.DAY, null)); + groups.put("by-user", TermsGroupSource.builder().setField("user_id").build()); + groups.put("by-business", TermsGroupSource.builder().setField("business_id").build()); + + AggregatorFactories.Builder aggs = AggregatorFactories.builder() + .addAggregator(AggregationBuilders.avg("review_score").field("stars")) + .addAggregator(AggregationBuilders.max("timestamp").field("timestamp")); + + TransformConfig config = + createTransformConfigBuilder(transformId, "reviews-by-user-business-day", QueryBuilders.matchAllQuery(), indexName) + .setPivotConfig(createPivotConfig(groups, aggs)) + .build(); + + assertTrue(putTransform(config, RequestOptions.DEFAULT).isAcknowledged()); + assertTrue(startTransform(config.getId(), RequestOptions.DEFAULT).isAcknowledged()); + + transformId = "continuous-transform-feature-reset"; + config = + createTransformConfigBuilder(transformId, "reviews-by-user-business-day-cont", QueryBuilders.matchAllQuery(), indexName) + .setPivotConfig(createPivotConfig(groups, aggs)) + .setSyncConfig(TimeSyncConfig.builder().setField("timestamp").setDelay(TimeValue.timeValueSeconds(1)).build()) + .build(); + + assertTrue(putTransform(config, RequestOptions.DEFAULT).isAcknowledged()); + assertTrue(startTransform(config.getId(), RequestOptions.DEFAULT).isAcknowledged()); + TestRestHighLevelClient highLevelClient = new TestRestHighLevelClient(); + highLevelClient.features().resetFeatures(new ResetFeaturesRequest(), RequestOptions.DEFAULT); + + Response response = adminClient().performRequest(new Request("GET", "/_cluster/state?metric=metadata")); + Map metadata = (Map)ESRestTestCase.entityAsMap(response).get("metadata"); + assertThat(metadata, is(not(nullValue()))); + + Map transformMetadata = (Map)metadata.get("transform"); + assertThat(transformMetadata, is(not(nullValue()))); + assertThat(transformMetadata.get("reset_mode"), is(false)); + + + // assert transforms are gone + assertThat(getTransform("_all").getCount(), equalTo(0L)); + + // assert transform indices are gone + assertThat( + ESRestTestCase.entityAsMap(adminClient().performRequest(new Request("GET", ".transform-*"))), + is(anEmptyMap()) + ); + } + +} diff --git a/x-pack/plugin/transform/qa/multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/transform/integration/TransformIT.java b/x-pack/plugin/transform/qa/multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/transform/integration/TransformIT.java index 3cdc2ea2771c4..702055ff6358e 100644 --- a/x-pack/plugin/transform/qa/multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/transform/integration/TransformIT.java +++ b/x-pack/plugin/transform/qa/multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/transform/integration/TransformIT.java @@ -54,11 +54,11 @@ public class TransformIT extends TransformIntegTestCase { private static final int NUM_USERS = 28; - private static final Integer getUserIdForRow(int row) { + static Integer getUserIdForRow(int row) { return row % NUM_USERS; } - private static final String getDateStringForRow(int row) { + static String getDateStringForRow(int row) { int day = (11 + (row / 100)) % 28; int hour = 10 + (row % 13); int min = 10 + (row % 49); diff --git a/x-pack/plugin/transform/qa/multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/transform/integration/TransformIntegTestCase.java b/x-pack/plugin/transform/qa/multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/transform/integration/TransformIntegTestCase.java index 1f917b1e2e2e7..1579b1d733343 100644 --- a/x-pack/plugin/transform/qa/multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/transform/integration/TransformIntegTestCase.java +++ b/x-pack/plugin/transform/qa/multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/transform/integration/TransformIntegTestCase.java @@ -120,8 +120,16 @@ private void logAudits() throws IOException { protected void cleanUpTransforms() throws IOException { for (TransformConfig config : transformConfigs.values()) { - stopTransform(config.getId()); - deleteTransform(config.getId()); + try { + stopTransform(config.getId()); + deleteTransform(config.getId()); + } catch (ElasticsearchStatusException ex) { + if (ex.status().equals(RestStatus.NOT_FOUND)) { + logger.info("tried to cleanup already deleted transform [{}]", config.getId()); + } else { + throw ex; + } + } } transformConfigs.clear(); } diff --git a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/Transform.java b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/Transform.java index 2f1ce2cf17921..7022b13efa6d8 100644 --- a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/Transform.java +++ b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/Transform.java @@ -10,10 +10,14 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.apache.lucene.util.SetOnce; +import org.elasticsearch.ElasticsearchException; +import org.elasticsearch.ElasticsearchStatusException; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionRequest; import org.elasticsearch.action.ActionResponse; +import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksResponse; import org.elasticsearch.action.admin.cluster.snapshots.features.ResetFeatureStateResponse; +import org.elasticsearch.action.support.master.AcknowledgedResponse; import org.elasticsearch.client.Client; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; import org.elasticsearch.cluster.metadata.IndexTemplateMetadata; @@ -43,6 +47,7 @@ import org.elasticsearch.repositories.RepositoriesService; import org.elasticsearch.rest.RestController; import org.elasticsearch.rest.RestHandler; +import org.elasticsearch.rest.RestStatus; import org.elasticsearch.script.ScriptService; import org.elasticsearch.threadpool.ExecutorBuilder; import org.elasticsearch.threadpool.FixedExecutorBuilder; @@ -50,15 +55,19 @@ import org.elasticsearch.watcher.ResourceWatcherService; import org.elasticsearch.xpack.core.DataTier; import org.elasticsearch.xpack.core.XPackPlugin; +import org.elasticsearch.xpack.core.action.SetResetModeActionRequest; import org.elasticsearch.xpack.core.action.XPackInfoFeatureAction; import org.elasticsearch.xpack.core.action.XPackUsageFeatureAction; import org.elasticsearch.xpack.core.scheduler.SchedulerEngine; +import org.elasticsearch.xpack.core.transform.TransformField; +import org.elasticsearch.xpack.core.transform.TransformMessages; import org.elasticsearch.xpack.core.transform.TransformNamedXContentProvider; import org.elasticsearch.xpack.core.transform.action.DeleteTransformAction; import org.elasticsearch.xpack.core.transform.action.GetTransformAction; import org.elasticsearch.xpack.core.transform.action.GetTransformStatsAction; import org.elasticsearch.xpack.core.transform.action.PreviewTransformAction; import org.elasticsearch.xpack.core.transform.action.PutTransformAction; +import org.elasticsearch.xpack.core.transform.action.SetResetModeAction; import org.elasticsearch.xpack.core.transform.action.StartTransformAction; import org.elasticsearch.xpack.core.transform.action.StopTransformAction; import org.elasticsearch.xpack.core.transform.action.UpdateTransformAction; @@ -76,6 +85,7 @@ import org.elasticsearch.xpack.transform.action.TransportGetTransformStatsAction; import org.elasticsearch.xpack.transform.action.TransportPreviewTransformAction; import org.elasticsearch.xpack.transform.action.TransportPutTransformAction; +import org.elasticsearch.xpack.transform.action.TransportSetTransformResetModeAction; import org.elasticsearch.xpack.transform.action.TransportStartTransformAction; import org.elasticsearch.xpack.transform.action.TransportStopTransformAction; import org.elasticsearch.xpack.transform.action.TransportUpdateTransformAction; @@ -123,7 +133,10 @@ import java.util.function.Supplier; import java.util.function.UnaryOperator; +import static org.elasticsearch.xpack.core.transform.TransformMessages.FAILED_TO_UNSET_RESET_MODE; import static org.elasticsearch.xpack.core.transform.transforms.persistence.TransformInternalIndexConstants.AUDIT_INDEX_PATTERN; +import static org.elasticsearch.xpack.core.transform.transforms.persistence.TransformInternalIndexConstants.TRANSFORM_PREFIX; +import static org.elasticsearch.xpack.core.transform.transforms.persistence.TransformInternalIndexConstants.TRANSFORM_PREFIX_DEPRECATED; public class Transform extends Plugin implements SystemIndexPlugin, PersistentTaskPlugin { @@ -213,6 +226,7 @@ public List getRestHandlers( new ActionHandler<>(GetTransformStatsAction.INSTANCE, TransportGetTransformStatsAction.class), new ActionHandler<>(PreviewTransformAction.INSTANCE, TransportPreviewTransformAction.class), new ActionHandler<>(UpdateTransformAction.INSTANCE, TransportUpdateTransformAction.class), + new ActionHandler<>(SetResetModeAction.INSTANCE, TransportSetTransformResetModeAction.class), // deprecated actions, to be removed for 8.0.0 new ActionHandler<>(PutTransformActionDeprecated.INSTANCE, TransportPutTransformActionDeprecated.class), @@ -342,14 +356,71 @@ public Collection getSystemIndexDescriptors(Settings sett public void cleanUpFeature( ClusterService clusterService, Client client, - ActionListener listener + ActionListener finalListener ) { + + ActionListener unsetResetModeListener = ActionListener.wrap( + success -> client.execute(SetResetModeAction.INSTANCE, SetResetModeActionRequest.disabled(), ActionListener.wrap( + resetSuccess -> finalListener.onResponse(success), + resetFailure -> { + logger.error("failed to disable reset mode after otherwise successful transform reset", resetFailure); + finalListener.onFailure( + new ElasticsearchStatusException( + TransformMessages.getMessage(FAILED_TO_UNSET_RESET_MODE, "a successful feature reset"), + RestStatus.INTERNAL_SERVER_ERROR, + resetFailure + ) + ); + }) + ), + failure -> client.execute(SetResetModeAction.INSTANCE, SetResetModeActionRequest.disabled(), ActionListener.wrap( + resetSuccess -> finalListener.onFailure(failure), + resetFailure -> { + logger.error( + TransformMessages.getMessage(FAILED_TO_UNSET_RESET_MODE, "a failed feature reset"), + resetFailure + ); + Exception ex = new ElasticsearchException( + TransformMessages.getMessage(FAILED_TO_UNSET_RESET_MODE, "a failed feature reset") + ); + ex.addSuppressed(resetFailure); + failure.addSuppressed(ex); + finalListener.onFailure(failure); + }) + ) + ); + + ActionListener afterWaitingForTasks = ActionListener.wrap( + listTasksResponse -> { + listTasksResponse.rethrowFailures("Waiting for transform indexing tasks"); + SystemIndexPlugin.super.cleanUpFeature(clusterService, client, unsetResetModeListener); + }, + unsetResetModeListener::onFailure + ); + ActionListener afterStoppingTransforms = ActionListener.wrap(stopTransformsResponse -> { if (stopTransformsResponse.isAcknowledged() && stopTransformsResponse.getTaskFailures().isEmpty() && stopTransformsResponse.getNodeFailures().isEmpty()) { - - SystemIndexPlugin.super.cleanUpFeature(clusterService, client, listener); + client.admin() + .cluster() + .prepareListTasks() + .setActions(TransformField.TASK_NAME) + .setWaitForCompletion(true) + .execute(ActionListener.wrap( + listTransformTasks -> { + listTransformTasks.rethrowFailures("Waiting for transform tasks"); + client.admin() + .cluster() + .prepareListTasks() + .setActions("indices:data/write/bulk") + .setDetailed(true) + .setWaitForCompletion(true) + .setDescriptions("*" + TRANSFORM_PREFIX + "*", "*" + TRANSFORM_PREFIX_DEPRECATED + "*") + .execute(afterWaitingForTasks); + }, + unsetResetModeListener::onFailure + )); } else { String errMsg = "Failed to reset Transform: " + (stopTransformsResponse.isAcknowledged() ? "" : "not acknowledged ") @@ -359,12 +430,26 @@ public void cleanUpFeature( + (stopTransformsResponse.getTaskFailures().isEmpty() ? "" : "task failures: " + stopTransformsResponse.getTaskFailures()); - listener.onResponse(new ResetFeatureStateResponse.ResetFeatureStateStatus(this.getFeatureName(), errMsg)); + unsetResetModeListener.onResponse(new ResetFeatureStateResponse.ResetFeatureStateStatus(this.getFeatureName(), errMsg)); } - }, listener::onFailure); + }, unsetResetModeListener::onFailure); + + ActionListener afterResetModeSet = ActionListener.wrap( + response -> { + StopTransformAction.Request stopTransformsRequest = new StopTransformAction.Request( + Metadata.ALL, + true, + true, + null, + true, + false + ); + client.execute(StopTransformAction.INSTANCE, stopTransformsRequest, afterStoppingTransforms); + }, + finalListener::onFailure + ); - StopTransformAction.Request stopTransformsRequest = new StopTransformAction.Request(Metadata.ALL, true, true, null, true, false); - client.execute(StopTransformAction.INSTANCE, stopTransformsRequest, afterStoppingTransforms); + client.execute(SetResetModeAction.INSTANCE, SetResetModeActionRequest.enabled(), afterResetModeSet); } @Override diff --git a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportSetTransformResetModeAction.java b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportSetTransformResetModeAction.java new file mode 100644 index 0000000000000..d48cfa217530f --- /dev/null +++ b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportSetTransformResetModeAction.java @@ -0,0 +1,51 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ +package org.elasticsearch.xpack.transform.action; + +import org.elasticsearch.action.support.ActionFilters; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; +import org.elasticsearch.cluster.metadata.Metadata; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.TransportService; +import org.elasticsearch.xpack.core.action.AbstractTransportSetResetModeAction; +import org.elasticsearch.xpack.core.action.SetResetModeActionRequest; +import org.elasticsearch.xpack.core.transform.TransformMetadata; +import org.elasticsearch.xpack.core.transform.action.SetResetModeAction; + + +public class TransportSetTransformResetModeAction extends AbstractTransportSetResetModeAction { + + @Inject + public TransportSetTransformResetModeAction(TransportService transportService, ThreadPool threadPool, ClusterService clusterService, + ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver) { + super(SetResetModeAction.NAME, transportService, threadPool, clusterService, actionFilters, indexNameExpressionResolver); + } + + @Override + protected boolean isResetMode(ClusterState clusterState) { + return TransformMetadata.getTransformMetadata(clusterState).isResetMode(); + } + + @Override + protected String featureName() { + return "transform"; + } + + @Override + protected ClusterState setState(ClusterState oldState, SetResetModeActionRequest request) { + TransformMetadata.Builder builder = TransformMetadata.Builder + .from(oldState.metadata().custom(TransformMetadata.TYPE)) + .isResetMode(request.isEnabled()); + ClusterState.Builder newState = ClusterState.builder(oldState); + newState.metadata(Metadata.builder(oldState.getMetadata()) + .putCustom(TransformMetadata.TYPE, builder.build()).build()); + return newState.build(); + } +} diff --git a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/notifications/TransformAuditor.java b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/notifications/TransformAuditor.java index 56668ec553b65..8688f8573194a 100644 --- a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/notifications/TransformAuditor.java +++ b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/notifications/TransformAuditor.java @@ -16,6 +16,7 @@ import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.xpack.core.common.notifications.AbstractAuditor; +import org.elasticsearch.xpack.core.transform.TransformMetadata; import org.elasticsearch.xpack.core.transform.notifications.TransformAuditMessage; import org.elasticsearch.xpack.core.transform.transforms.persistence.TransformInternalIndexConstants; import org.elasticsearch.xpack.transform.persistence.TransformInternalIndex; @@ -29,6 +30,8 @@ */ public class TransformAuditor extends AbstractAuditor { + private volatile boolean isResetMode = false; + public TransformAuditor(Client client, String nodeName, ClusterService clusterService) { super(new OriginSettingClient(client, TRANSFORM_ORIGIN), TransformInternalIndexConstants.AUDIT_INDEX, TransformInternalIndexConstants.AUDIT_INDEX, @@ -62,5 +65,19 @@ public TransformAuditor(Client client, String nodeName, ClusterService clusterSe } }, nodeName, TransformAuditMessage::new, clusterService); + clusterService.addListener(event -> { + if (event.metadataChanged()) { + isResetMode = TransformMetadata.getTransformMetadata(event.state()).isResetMode(); + } + }); + } + + @Override + protected void writeBacklog() { + if (isResetMode) { + clearBacklog(); + } else { + super.writeBacklog(); + } } } diff --git a/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/TransformMetadataTests.java b/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/TransformMetadataTests.java new file mode 100644 index 0000000000000..417b7e58a45bf --- /dev/null +++ b/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/TransformMetadataTests.java @@ -0,0 +1,36 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.transform; + +import org.elasticsearch.common.io.stream.Writeable; +import org.elasticsearch.common.xcontent.XContentParser; +import org.elasticsearch.test.AbstractSerializingTestCase; +import org.elasticsearch.xpack.core.transform.TransformMetadata; + +public class TransformMetadataTests extends AbstractSerializingTestCase { + + @Override + protected TransformMetadata createTestInstance() { + return new TransformMetadata.Builder().isResetMode(randomBoolean()).build(); + } + + @Override + protected Writeable.Reader instanceReader() { + return TransformMetadata::new; + } + + @Override + protected TransformMetadata doParseInstance(XContentParser parser) { + return TransformMetadata.LENIENT_PARSER.apply(parser, null).build(); + } + + @Override + protected TransformMetadata mutateInstance(TransformMetadata instance) { + return new TransformMetadata.Builder().isResetMode(instance.isResetMode() == false).build(); + } +}