diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/datafeed/AggProvider.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/datafeed/AggProvider.java index 7fd5b5cb51a61..e992b82896d65 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/datafeed/AggProvider.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/datafeed/AggProvider.java @@ -7,6 +7,7 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import org.elasticsearch.Version; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.Writeable; @@ -34,14 +35,16 @@ class AggProvider implements Writeable, ToXContentObject { private Exception parsingException; private AggregatorFactories.Builder parsedAggs; private Map aggs; + private boolean rewroteAggs; static AggProvider fromXContent(XContentParser parser, boolean lenient) throws IOException { Map aggs = parser.mapOrdered(); // NOTE: Always rewrite potentially old date histogram intervals. // This should occur in 8.x+ but not 7.x. // 7.x is BWC with versions that do not support the new date_histogram fields + boolean rewroteAggs = false; if (lenient) { - rewriteDateHistogramInterval(aggs, false); + rewroteAggs = rewriteDateHistogramInterval(aggs, false); } AggregatorFactories.Builder parsedAggs = null; Exception exception = null; @@ -61,7 +64,7 @@ static AggProvider fromXContent(XContentParser parser, boolean lenient) throws I throw ExceptionsHelper.badRequestException(Messages.DATAFEED_CONFIG_AGG_BAD_FORMAT, ex); } } - return new AggProvider(aggs, parsedAggs, exception); + return new AggProvider(aggs, parsedAggs, exception, rewroteAggs); } @SuppressWarnings("unchecked") @@ -99,23 +102,30 @@ static AggProvider fromParsedAggs(AggregatorFactories.Builder parsedAggs) throws new AggProvider( XContentObjectTransformer.aggregatorTransformer(NamedXContentRegistry.EMPTY).toMap(parsedAggs), parsedAggs, - null); + null, + false); } static AggProvider fromStream(StreamInput in) throws IOException { - return new AggProvider(in.readMap(), in.readOptionalWriteable(AggregatorFactories.Builder::new), in.readException()); + return new AggProvider( + in.readMap(), + in.readOptionalWriteable(AggregatorFactories.Builder::new), + in.readException(), + in.getVersion().onOrAfter(Version.V_8_0_0) ? in.readBoolean() : false); } - AggProvider(Map aggs, AggregatorFactories.Builder parsedAggs, Exception parsingException) { + AggProvider(Map aggs, AggregatorFactories.Builder parsedAggs, Exception parsingException, boolean rewroteAggs) { this.aggs = Collections.unmodifiableMap(new LinkedHashMap<>(Objects.requireNonNull(aggs, "[aggs] must not be null"))); this.parsedAggs = parsedAggs; this.parsingException = parsingException; + this.rewroteAggs = rewroteAggs; } AggProvider(AggProvider other) { this.aggs = new LinkedHashMap<>(other.aggs); this.parsedAggs = other.parsedAggs; this.parsingException = other.parsingException; + this.rewroteAggs = other.rewroteAggs; } @Override @@ -123,6 +133,9 @@ public void writeTo(StreamOutput out) throws IOException { out.writeMap(aggs); out.writeOptionalWriteable(parsedAggs); out.writeException(parsingException); + if (out.getVersion().onOrAfter(Version.V_8_0_0)) { + out.writeBoolean(rewroteAggs); + } } public Exception getParsingException() { @@ -137,6 +150,10 @@ public Map getAggs() { return aggs; } + public boolean isRewroteAggs() { + return rewroteAggs; + } + @Override public boolean equals(Object other) { if (this == other) { @@ -151,12 +168,23 @@ public boolean equals(Object other) { return Objects.equals(this.aggs, that.aggs) && Objects.equals(this.parsedAggs, that.parsedAggs) - && Objects.equals(this.parsingException, that.parsingException); + && equalExceptionMessages(this.parsingException, that.parsingException) + && Objects.equals(this.rewroteAggs, that.rewroteAggs); + } + + private static boolean equalExceptionMessages(Exception lft, Exception rgt) { + if (lft == rgt) { + return true; + } + if (lft == null || rgt == null) { + return false; + } + return Objects.equals(lft.getMessage(), rgt.getMessage()); } @Override public int hashCode() { - return Objects.hash(aggs, parsedAggs, parsingException); + return Objects.hash(aggs, parsedAggs, parsingException == null ? null : parsingException.getMessage(), rewroteAggs); } @Override @@ -164,4 +192,14 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws builder.map(aggs); return builder; } + + @Override + public String toString() { + return "AggProvider{" + + "parsingException=" + parsingException + + ", parsedAggs=" + parsedAggs + + ", aggs=" + aggs + + ", rewroteAggs=" + rewroteAggs + + '}'; + } } diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/datafeed/DatafeedConfig.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/datafeed/DatafeedConfig.java index 8140b20221277..2400a3ec82750 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/datafeed/DatafeedConfig.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/datafeed/DatafeedConfig.java @@ -393,6 +393,14 @@ public boolean hasAggregations() { return aggProvider != null && aggProvider.getAggs() != null && aggProvider.getAggs().size() > 0; } + public boolean aggsRewritten() { + return aggProvider != null && aggProvider.isRewroteAggs(); + } + + public AggProvider getAggProvider() { + return aggProvider; + } + public List getScriptFields() { return scriptFields == null ? Collections.emptyList() : scriptFields; } diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/datafeed/AggProviderTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/datafeed/AggProviderTests.java index aacb9596a4e1c..5f503f9d18ce4 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/datafeed/AggProviderTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/datafeed/AggProviderTests.java @@ -74,7 +74,7 @@ public static AggProvider createRandomValidAggProvider(String name, String field AggregatorFactories.Builder aggs = XContentObjectTransformer.aggregatorTransformer(new NamedXContentRegistry(searchModule.getNamedXContents())) .fromMap(agg); - return new AggProvider(agg, aggs, null); + return new AggProvider(agg, aggs, null, false); } catch (IOException ex) { fail(ex.getMessage()); } @@ -182,6 +182,6 @@ protected AggProvider mutateInstance(AggProvider instance) throws IOException { default: throw new AssertionError("Illegal randomisation branch"); } - return new AggProvider(instance.getAggs(), parsedAggs, parsingException); + return new AggProvider(instance.getAggs(), parsedAggs, parsingException, false); } } diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/datafeed/AggProviderWireSerializationTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/datafeed/AggProviderWireSerializationTests.java new file mode 100644 index 0000000000000..303f61e6ca294 --- /dev/null +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/datafeed/AggProviderWireSerializationTests.java @@ -0,0 +1,73 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.core.ml.datafeed; + +import org.elasticsearch.ElasticsearchException; +import org.elasticsearch.Version; +import org.elasticsearch.common.io.stream.NamedWriteableRegistry; +import org.elasticsearch.common.io.stream.Writeable; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.xcontent.NamedXContentRegistry; +import org.elasticsearch.search.SearchModule; +import org.elasticsearch.search.aggregations.AggregatorFactories; +import org.elasticsearch.xpack.core.ml.AbstractBWCWireSerializationTestCase; +import org.elasticsearch.xpack.core.ml.utils.XContentObjectTransformer; + +import java.io.IOException; +import java.util.Collections; +import java.util.Map; + +public class AggProviderWireSerializationTests extends AbstractBWCWireSerializationTestCase { + + @Override + protected NamedWriteableRegistry writableRegistry() { + SearchModule searchModule = new SearchModule(Settings.EMPTY, Collections.emptyList()); + return new NamedWriteableRegistry(searchModule.getNamedWriteables()); + } + + @Override + protected NamedWriteableRegistry getNamedWriteableRegistry() { + return writableRegistry(); + } + + @Override + protected AggProvider createTestInstance() { + return createRandomValidAggProvider(); + } + + @Override + protected Writeable.Reader instanceReader() { + return AggProvider::fromStream; + } + + public static AggProvider createRandomValidAggProvider() { + Map agg = Collections.singletonMap(randomAlphaOfLengthBetween(1, 10), + Collections.singletonMap("avg", Collections.singletonMap("field", randomAlphaOfLengthBetween(1, 10)))); + try { + SearchModule searchModule = new SearchModule(Settings.EMPTY, Collections.emptyList()); + AggregatorFactories.Builder aggs = + XContentObjectTransformer.aggregatorTransformer(new NamedXContentRegistry(searchModule.getNamedXContents())) + .fromMap(agg); + Exception parsingException = null; + if (randomBoolean()) { + aggs = null; + parsingException = new ElasticsearchException("bad configs"); + } + return new AggProvider(agg, aggs, parsingException, randomBoolean()); + } catch (IOException ex) { + fail(ex.getMessage()); + } + return null; + } + + @Override + protected AggProvider mutateInstanceForVersion(AggProvider instance, Version version) { + if (version.onOrBefore(Version.V_8_0_0)) { + return new AggProvider(instance.getAggs(), instance.getParsedAggs(), instance.getParsingException(), false); + } + return instance; + } +} diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/datafeed/DatafeedUpdateTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/datafeed/DatafeedUpdateTests.java index 9c1e135551bb5..bd23a545456bb 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/datafeed/DatafeedUpdateTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/datafeed/DatafeedUpdateTests.java @@ -336,7 +336,8 @@ public void testSerializationOfComplexAggsBetweenVersions() throws IOException { datafeedUpdateBuilder.setAggregations(new AggProvider( XContentObjectTransformer.aggregatorTransformer(xContentRegistry()).toMap(aggs), aggs, - null)); + null, + false)); // So equality check between the streamed and current passes // Streamed DatafeedConfigs when they are before 6.6.0 require a parsed object for aggs and queries, consequently all the default // values are added between them diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java index ba912dc14469e..840b6ce2e453a 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java @@ -211,6 +211,7 @@ import org.elasticsearch.xpack.ml.action.TransportUpdateProcessAction; import org.elasticsearch.xpack.ml.action.TransportValidateDetectorAction; import org.elasticsearch.xpack.ml.action.TransportValidateJobConfigAction; +import org.elasticsearch.xpack.ml.datafeed.DatafeedConfigAutoUpdater; import org.elasticsearch.xpack.ml.datafeed.DatafeedJobBuilder; import org.elasticsearch.xpack.ml.datafeed.DatafeedManager; import org.elasticsearch.xpack.ml.datafeed.persistence.DatafeedConfigProvider; @@ -680,6 +681,9 @@ public Collection createComponents(Client client, ClusterService cluster MlAssignmentNotifier mlAssignmentNotifier = new MlAssignmentNotifier(anomalyDetectionAuditor, dataFrameAnalyticsAuditor, threadPool, new MlConfigMigrator(settings, client, clusterService, indexNameExpressionResolver), clusterService); + MlAutoUpdateService mlAutoUpdateService = new MlAutoUpdateService(threadPool, + List.of(new DatafeedConfigAutoUpdater(datafeedConfigProvider))); + clusterService.addListener(mlAutoUpdateService); // this object registers as a license state listener, and is never removed, so there's no need to retain another reference to it final InvalidLicenseEnforcer enforcer = new InvalidLicenseEnforcer(getLicenseState(), threadPool, datafeedManager, autodetectProcessManager); @@ -705,6 +709,7 @@ public Collection createComponents(Client client, ClusterService cluster dataFrameAnalyticsAuditor, inferenceAuditor, mlAssignmentNotifier, + mlAutoUpdateService, memoryTracker, analyticsProcessManager, memoryEstimationProcessManager, diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlAutoUpdateService.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlAutoUpdateService.java new file mode 100644 index 0000000000000..8bb87819d045a --- /dev/null +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlAutoUpdateService.java @@ -0,0 +1,74 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +package org.elasticsearch.xpack.ml; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.apache.logging.log4j.message.ParameterizedMessage; +import org.elasticsearch.Version; +import org.elasticsearch.cluster.ClusterChangedEvent; +import org.elasticsearch.cluster.ClusterStateListener; +import org.elasticsearch.threadpool.ThreadPool; + +import java.util.List; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.stream.Collectors; + +public class MlAutoUpdateService implements ClusterStateListener { + private static final Logger logger = LogManager.getLogger(MlAutoUpdateService.class); + + public interface UpdateAction { + boolean isMinNodeVersionSupported(Version minNodeVersion); + String getName(); + void runUpdate(); + } + + private final List updateActions; + private final Set currentlyUpdating; + private final Set completedUpdates; + private final ThreadPool threadPool; + + public MlAutoUpdateService(ThreadPool threadPool, List updateActions) { + this.updateActions = updateActions; + this.completedUpdates = ConcurrentHashMap.newKeySet(); + this.currentlyUpdating = ConcurrentHashMap.newKeySet(); + this.threadPool = threadPool; + } + + @Override + public void clusterChanged(ClusterChangedEvent event) { + if (event.localNodeMaster() == false) { + return; + } + + Version minNodeVersion = event.state().getNodes().getMinNodeVersion(); + final List toRun = updateActions.stream() + .filter(action -> action.isMinNodeVersionSupported(minNodeVersion)) + .filter(action -> completedUpdates.contains(action.getName()) == false) + .filter(action -> currentlyUpdating.add(action.getName())) + .collect(Collectors.toList()); + threadPool.executor(MachineLearning.UTILITY_THREAD_POOL_NAME).execute( + () -> toRun.forEach(this::runUpdate) + ); + } + + private void runUpdate(UpdateAction action) { + try { + logger.debug(() -> new ParameterizedMessage("[{}] starting executing update action", action.getName())); + action.runUpdate(); + this.completedUpdates.add(action.getName()); + logger.debug(() -> new ParameterizedMessage("[{}] succeeded executing update action", action.getName())); + } catch (Exception ex) { + logger.warn(new ParameterizedMessage("[{}] failure executing update action", action.getName()), ex); + } finally { + this.currentlyUpdating.remove(action.getName()); + logger.debug(() -> new ParameterizedMessage("[{}] no longer executing update action", action.getName())); + } + } + +} diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedConfigAutoUpdater.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedConfigAutoUpdater.java new file mode 100644 index 0000000000000..866cd25e90318 --- /dev/null +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedConfigAutoUpdater.java @@ -0,0 +1,90 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +package org.elasticsearch.xpack.ml.datafeed; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.apache.logging.log4j.message.ParameterizedMessage; +import org.elasticsearch.ElasticsearchException; +import org.elasticsearch.Version; +import org.elasticsearch.action.support.PlainActionFuture; +import org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfig; +import org.elasticsearch.xpack.core.ml.datafeed.DatafeedUpdate; +import org.elasticsearch.xpack.ml.MlAutoUpdateService; +import org.elasticsearch.xpack.ml.datafeed.persistence.DatafeedConfigProvider; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.stream.Collectors; + +public class DatafeedConfigAutoUpdater implements MlAutoUpdateService.UpdateAction { + + private static final Logger logger = LogManager.getLogger(DatafeedConfigAutoUpdater.class); + private final DatafeedConfigProvider provider; + + public DatafeedConfigAutoUpdater(DatafeedConfigProvider provider) { + this.provider = provider; + } + + @Override + public boolean isMinNodeVersionSupported(Version minNodeVersion) { + return minNodeVersion.onOrAfter(Version.V_8_0_0); + } + + @Override + public String getName() { + return "datafeed_aggs_updater"; + } + + @Override + public void runUpdate() { + PlainActionFuture> getdatafeeds = PlainActionFuture.newFuture(); + provider.expandDatafeedConfigs("_all", true, getdatafeeds); + List datafeedConfigBuilders = getdatafeeds.actionGet(); + List updates = datafeedConfigBuilders.stream() + .map(DatafeedConfig.Builder::build) + .filter(DatafeedConfig::aggsRewritten) + .map(datafeedConfig -> new DatafeedUpdate.Builder() + .setAggregations(datafeedConfig.getAggProvider()) + .setId(datafeedConfig.getId()) + .build()) + .collect(Collectors.toList()); + if (updates.isEmpty()) { + return; + } + + logger.debug(() -> new ParameterizedMessage("{} datafeeds are currently being updated", + updates.stream().map(DatafeedUpdate::getId).collect(Collectors.toList()))); + + List failures = new ArrayList<>(); + for (DatafeedUpdate update : updates) { + PlainActionFuture updateDatafeeds = PlainActionFuture.newFuture(); + provider.updateDatefeedConfig(update.getId(), + update, + Collections.emptyMap(), + (updatedConfig, listener) -> listener.onResponse(Boolean.TRUE), + updateDatafeeds); + try { + updateDatafeeds.actionGet(); + logger.debug(() -> new ParameterizedMessage("[{}] datafeed successfully updated", update.getId())); + } catch (Exception ex) { + logger.warn(new ParameterizedMessage("[{}] failed being updated", update.getId()), ex); + failures.add(new ElasticsearchException("Failed to update datafeed {}", ex, update.getId())); + } + } + if (failures.isEmpty()) { + logger.debug(() -> new ParameterizedMessage("{} datafeeds are finished being updated", + updates.stream().map(DatafeedUpdate::getId).collect(Collectors.toList()))); + return; + } + + ElasticsearchException exception = new ElasticsearchException("some datafeeds failed being upgraded."); + failures.forEach(exception::addSuppressed); + throw exception; + } +} diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedConfigAutoUpdaterTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedConfigAutoUpdaterTests.java new file mode 100644 index 0000000000000..3a0fbbf3d1aab --- /dev/null +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedConfigAutoUpdaterTests.java @@ -0,0 +1,144 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +package org.elasticsearch.xpack.ml.datafeed; + +import org.elasticsearch.ElasticsearchException; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfig; +import org.elasticsearch.xpack.core.ml.datafeed.DatafeedUpdate; +import org.elasticsearch.xpack.ml.datafeed.persistence.DatafeedConfigProvider; +import org.junit.Before; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +import static org.hamcrest.Matchers.equalTo; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyBoolean; +import static org.mockito.Matchers.eq; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +public class DatafeedConfigAutoUpdaterTests extends ESTestCase { + + private DatafeedConfigProvider provider; + private List datafeeds = new ArrayList<>(); + + @Before + public void setup() { + provider = mock(DatafeedConfigProvider.class); + doAnswer(call -> { + @SuppressWarnings("unchecked") + ActionListener> handler = (ActionListener>) call.getArguments()[2]; + handler.onResponse(datafeeds); + return null; + }).when(provider).expandDatafeedConfigs(any(), anyBoolean(), any()); + doAnswer(call -> { + @SuppressWarnings("unchecked") + ActionListener handler = (ActionListener) call.getArguments()[4]; + handler.onResponse(mock(DatafeedConfig.class)); + return null; + }).when(provider).updateDatefeedConfig(any(), any(), any(), any(), any()); + } + + public void testWithSuccessfulUpdates() { + String datafeedWithRewrite1 = "datafeed-with-rewrite-1"; + String datafeedWithRewrite2 = "datafeed-with-rewrite-2"; + String datafeedWithoutRewrite = "datafeed-without-rewrite"; + withDatafeed(datafeedWithoutRewrite, false); + withDatafeed(datafeedWithRewrite1, true); + withDatafeed(datafeedWithRewrite2, true); + + DatafeedConfigAutoUpdater updater = new DatafeedConfigAutoUpdater(provider); + updater.runUpdate(); + + verify(provider, times(1)).updateDatefeedConfig(eq(datafeedWithRewrite1), + any(DatafeedUpdate.class), + eq(Collections.emptyMap()), + any(), + any()); + verify(provider, times(1)).updateDatefeedConfig(eq(datafeedWithRewrite2), + any(DatafeedUpdate.class), + eq(Collections.emptyMap()), + any(), + any()); + verify(provider, times(0)).updateDatefeedConfig(eq(datafeedWithoutRewrite), + any(DatafeedUpdate.class), + eq(Collections.emptyMap()), + any(), + any()); + } + + public void testWithUpdateFailures() { + String datafeedWithRewrite1 = "datafeed-with-rewrite-1"; + String datafeedWithRewriteFailure = "datafeed-with-rewrite-failure"; + String datafeedWithoutRewrite = "datafeed-without-rewrite"; + withDatafeed(datafeedWithoutRewrite, false); + withDatafeed(datafeedWithRewrite1, true); + withDatafeed(datafeedWithRewriteFailure, true); + + doAnswer(call -> { + @SuppressWarnings("unchecked") + ActionListener handler = (ActionListener) call.getArguments()[4]; + handler.onFailure(new ElasticsearchException("something wrong happened")); + return null; + }).when(provider).updateDatefeedConfig(eq(datafeedWithRewriteFailure), any(), any(), any(), any()); + + DatafeedConfigAutoUpdater updater = new DatafeedConfigAutoUpdater(provider); + ElasticsearchException ex = expectThrows(ElasticsearchException.class, updater::runUpdate); + assertThat(ex.getMessage(), equalTo("some datafeeds failed being upgraded.")); + assertThat(ex.getSuppressed().length, equalTo(1)); + assertThat(ex.getSuppressed()[0].getMessage(), equalTo("Failed to update datafeed " + datafeedWithRewriteFailure)); + + verify(provider, times(1)).updateDatefeedConfig(eq(datafeedWithRewrite1), + any(DatafeedUpdate.class), + eq(Collections.emptyMap()), + any(), + any()); + verify(provider, times(1)).updateDatefeedConfig(eq(datafeedWithRewriteFailure), + any(DatafeedUpdate.class), + eq(Collections.emptyMap()), + any(), + any()); + verify(provider, times(0)).updateDatefeedConfig(eq(datafeedWithoutRewrite), + any(DatafeedUpdate.class), + eq(Collections.emptyMap()), + any(), + any()); + } + + public void testWithNoUpdates() { + String datafeedWithoutRewrite1 = "datafeed-without-rewrite-1"; + String datafeedWithoutRewrite2 = "datafeed-without-rewrite-2"; + withDatafeed(datafeedWithoutRewrite1, false); + withDatafeed(datafeedWithoutRewrite2, false); + + DatafeedConfigAutoUpdater updater = new DatafeedConfigAutoUpdater(provider); + updater.runUpdate(); + + verify(provider, times(0)).updateDatefeedConfig(any(), + any(DatafeedUpdate.class), + eq(Collections.emptyMap()), + any(), + any()); + } + + private void withDatafeed(String datafeedId, boolean aggsRewritten) { + DatafeedConfig.Builder builder = mock(DatafeedConfig.Builder.class); + DatafeedConfig config = mock(DatafeedConfig.class); + when(config.getId()).thenReturn(datafeedId); + when(config.aggsRewritten()).thenReturn(aggsRewritten); + when(builder.build()).thenReturn(config); + datafeeds.add(builder); + } + +} diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/integration/MlAutoUpdateServiceIT.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/integration/MlAutoUpdateServiceIT.java new file mode 100644 index 0000000000000..e8bc9012b5b49 --- /dev/null +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/integration/MlAutoUpdateServiceIT.java @@ -0,0 +1,124 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.ml.integration; + +import org.elasticsearch.Version; +import org.elasticsearch.action.get.GetResponse; +import org.elasticsearch.cluster.ClusterChangedEvent; +import org.elasticsearch.cluster.ClusterName; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.cluster.node.DiscoveryNodeRole; +import org.elasticsearch.cluster.node.DiscoveryNodes; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.transport.TransportAddress; +import org.elasticsearch.common.xcontent.NamedXContentRegistry; +import org.elasticsearch.common.xcontent.XContentType; +import org.elasticsearch.search.SearchModule; +import org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfig; +import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndex; +import org.elasticsearch.xpack.ml.MlAutoUpdateService; +import org.elasticsearch.xpack.ml.MlSingleNodeTestCase; +import org.elasticsearch.xpack.ml.datafeed.DatafeedConfigAutoUpdater; +import org.elasticsearch.xpack.ml.datafeed.persistence.DatafeedConfigProvider; +import org.junit.Before; + +import java.net.InetAddress; +import java.util.Collections; +import java.util.Set; +import java.util.concurrent.atomic.AtomicReference; + +import static org.hamcrest.CoreMatchers.containsString; +import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.Matchers.nullValue; + +public class MlAutoUpdateServiceIT extends MlSingleNodeTestCase { + + private DatafeedConfigProvider datafeedConfigProvider; + + @Before + public void createComponents() throws Exception { + datafeedConfigProvider = new DatafeedConfigProvider(client(), xContentRegistry()); + waitForMlTemplates(); + } + + private static final String AGG_WITH_OLD_DATE_HISTOGRAM_INTERVAL = "{\n" + + " \"datafeed_id\": \"farequote-datafeed-with-old-agg\",\n" + + " \"job_id\": \"farequote\",\n" + + " \"frequency\": \"1h\",\n" + + " \"config_type\": \"datafeed\",\n" + + " \"indices\": [\"farequote1\", \"farequote2\"],\n" + + " \"aggregations\": {\n" + + " \"buckets\": {\n" + + " \"date_histogram\": {\n" + + " \"field\": \"time\",\n" + + " \"interval\": \"360s\",\n" + + " \"time_zone\": \"UTC\"\n" + + " },\n" + + " \"aggregations\": {\n" + + " \"time\": {\n" + + " \"max\": {\"field\": \"time\"}\n" + + " }\n" + + " }\n" + + " }\n" + + " }\n" + + "}"; + + public void testAutomaticModelUpdate() throws Exception { + ensureGreen("_all"); + client().prepareIndex(AnomalyDetectorsIndex.configIndexName()) + .setId(DatafeedConfig.documentId("farequote-datafeed-with-old-agg")) + .setSource(AGG_WITH_OLD_DATE_HISTOGRAM_INTERVAL, XContentType.JSON) + .get(); + AtomicReference getConfigHolder = new AtomicReference<>(); + AtomicReference exceptionHolder = new AtomicReference<>(); + + blockingCall(listener -> datafeedConfigProvider.getDatafeedConfig("farequote-datafeed-with-old-agg", listener), + getConfigHolder, + exceptionHolder); + assertThat(exceptionHolder.get(), is(nullValue())); + client().admin().indices().prepareRefresh(AnomalyDetectorsIndex.configIndexName()).get(); + + DatafeedConfigAutoUpdater autoUpdater = new DatafeedConfigAutoUpdater(datafeedConfigProvider); + MlAutoUpdateService mlAutoUpdateService = new MlAutoUpdateService(client().threadPool(), + Collections.singletonList(autoUpdater)); + + ClusterChangedEvent event = new ClusterChangedEvent("test", + ClusterState.builder(new ClusterName("test")) + .nodes(DiscoveryNodes.builder().add( + new DiscoveryNode("node_name", + "node_id", + new TransportAddress(InetAddress.getLoopbackAddress(), 9300), + Collections.emptyMap(), + Set.of(DiscoveryNodeRole.MASTER_ROLE), + Version.V_8_0_0)) + .localNodeId("node_id") + .masterNodeId("node_id") + .build()) + .build(), + ClusterState.builder(new ClusterName("test")).build()); + + mlAutoUpdateService.clusterChanged(event); + assertBusy(() -> { + try { + GetResponse getResponse = client().prepareGet( + AnomalyDetectorsIndex.configIndexName(), + DatafeedConfig.documentId("farequote-datafeed-with-old-agg") + ).get(); + assertTrue(getResponse.isExists()); + assertThat(getResponse.getSourceAsString(), containsString("fixed_interval")); + } catch (Exception ex) { + fail(ex.getMessage()); + } + }); + } + + @Override + public NamedXContentRegistry xContentRegistry() { + return new NamedXContentRegistry(new SearchModule(Settings.EMPTY, Collections.emptyList()).getNamedXContents()); + } + +}