diff --git a/x-pack/qa/full-cluster-restart/src/test/java/org/elasticsearch/xpack/restart/FullClusterRestartIT.java b/x-pack/qa/full-cluster-restart/src/test/java/org/elasticsearch/xpack/restart/FullClusterRestartIT.java index fb4b2a942e768..6d77d85b1ca7c 100644 --- a/x-pack/qa/full-cluster-restart/src/test/java/org/elasticsearch/xpack/restart/FullClusterRestartIT.java +++ b/x-pack/qa/full-cluster-restart/src/test/java/org/elasticsearch/xpack/restart/FullClusterRestartIT.java @@ -12,6 +12,7 @@ import org.elasticsearch.client.ResponseException; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.concurrent.ThreadContext; +import org.elasticsearch.common.xcontent.ObjectPath; import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.common.xcontent.support.XContentMapValues; import org.elasticsearch.rest.RestStatus; @@ -19,16 +20,13 @@ import org.elasticsearch.test.rest.ESRestTestCase; import org.elasticsearch.upgrades.AbstractFullClusterRestartTestCase; import org.elasticsearch.xpack.core.watcher.client.WatchSourceBuilder; -import org.elasticsearch.common.xcontent.ObjectPath; import org.elasticsearch.xpack.security.support.SecurityIndexManager; -import org.elasticsearch.xpack.test.rest.XPackRestTestHelper; import org.elasticsearch.xpack.watcher.actions.logging.LoggingAction; import org.elasticsearch.xpack.watcher.common.text.TextTemplate; import org.elasticsearch.xpack.watcher.condition.InternalAlwaysCondition; import org.elasticsearch.xpack.watcher.trigger.schedule.IntervalSchedule; import org.elasticsearch.xpack.watcher.trigger.schedule.ScheduleTrigger; import org.hamcrest.Matcher; -import org.junit.Before; import java.io.IOException; import java.nio.charset.StandardCharsets; @@ -57,11 +55,6 @@ public class FullClusterRestartIT extends AbstractFullClusterRestartTestCase { - @Before - public void waitForMlTemplates() throws Exception { - XPackRestTestHelper.waitForTemplates(client(), XPackRestTestHelper.ML_PRE_V660_TEMPLATES); - } - @Override protected Settings restClientSettings() { String token = "Basic " + Base64.getEncoder().encodeToString("test_user:x-pack-test-password".getBytes(StandardCharsets.UTF_8)); diff --git a/x-pack/qa/full-cluster-restart/src/test/java/org/elasticsearch/xpack/restart/MlMigrationFullClusterRestartIT.java b/x-pack/qa/full-cluster-restart/src/test/java/org/elasticsearch/xpack/restart/MlMigrationFullClusterRestartIT.java new file mode 100644 index 0000000000000..6c89927fd9508 --- /dev/null +++ b/x-pack/qa/full-cluster-restart/src/test/java/org/elasticsearch/xpack/restart/MlMigrationFullClusterRestartIT.java @@ -0,0 +1,254 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.restart; + +import org.elasticsearch.Version; +import org.elasticsearch.client.Request; +import org.elasticsearch.client.Response; +import org.elasticsearch.common.Strings; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.common.util.concurrent.ThreadContext; +import org.elasticsearch.common.xcontent.support.XContentMapValues; +import org.elasticsearch.upgrades.AbstractFullClusterRestartTestCase; +import org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfig; +import org.elasticsearch.xpack.core.ml.job.config.AnalysisConfig; +import org.elasticsearch.xpack.core.ml.job.config.DataDescription; +import org.elasticsearch.xpack.core.ml.job.config.Detector; +import org.elasticsearch.xpack.core.ml.job.config.Job; +import org.elasticsearch.xpack.test.rest.XPackRestTestHelper; +import org.junit.Before; + +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.Arrays; +import java.util.Base64; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.TimeUnit; + +import static org.hamcrest.Matchers.isEmptyOrNullString; + +public class MlMigrationFullClusterRestartIT extends AbstractFullClusterRestartTestCase { + + private static final String OLD_CLUSTER_OPEN_JOB_ID = "migration-old-cluster-open-job"; + private static final String OLD_CLUSTER_STARTED_DATAFEED_ID = "migration-old-cluster-started-datafeed"; + private static final String OLD_CLUSTER_CLOSED_JOB_ID = "migration-old-cluster-closed-job"; + private static final String OLD_CLUSTER_STOPPED_DATAFEED_ID = "migration-old-cluster-stopped-datafeed"; + + @Override + protected Settings restClientSettings() { + String token = "Basic " + Base64.getEncoder().encodeToString("test_user:x-pack-test-password".getBytes(StandardCharsets.UTF_8)); + return Settings.builder() + .put(ThreadContext.PREFIX + ".Authorization", token) + .build(); + } + + @Before + public void waitForMlTemplates() throws Exception { + List templatesToWaitFor = XPackRestTestHelper.ML_POST_V660_TEMPLATES; + + // If upgrading from a version prior to v6.6.0 the set of templates + // to wait for is different + if (isRunningAgainstOldCluster() && getOldClusterVersion().before(Version.V_6_6_0) ) { + templatesToWaitFor = XPackRestTestHelper.ML_PRE_V660_TEMPLATES; + } + + XPackRestTestHelper.waitForTemplates(client(), templatesToWaitFor); + } + + private void createTestIndex() throws IOException { + Request createTestIndex = new Request("PUT", "/airline-data"); + createTestIndex.setJsonEntity("{\"mappings\": { \"doc\": {\"properties\": {" + + "\"time\": {\"type\": \"date\"}," + + "\"airline\": {\"type\": \"keyword\"}," + + "\"responsetime\": {\"type\": \"float\"}" + + "}}}}"); + client().performRequest(createTestIndex); + } + + public void testMigration() throws Exception { + if (isRunningAgainstOldCluster()) { + createTestIndex(); + oldClusterTests(); + } else { + upgradedClusterTests(); + } + } + + private void oldClusterTests() throws IOException { + // create jobs and datafeeds + Detector.Builder d = new Detector.Builder("metric", "responsetime"); + d.setByFieldName("airline"); + AnalysisConfig.Builder analysisConfig = new AnalysisConfig.Builder(Collections.singletonList(d.build())); + analysisConfig.setBucketSpan(TimeValue.timeValueMinutes(10)); + Job.Builder openJob = new Job.Builder(OLD_CLUSTER_OPEN_JOB_ID); + openJob.setAnalysisConfig(analysisConfig); + openJob.setDataDescription(new DataDescription.Builder()); + + Request putOpenJob = new Request("PUT", "_xpack/ml/anomaly_detectors/" + OLD_CLUSTER_OPEN_JOB_ID); + putOpenJob.setJsonEntity(Strings.toString(openJob)); + client().performRequest(putOpenJob); + + Request openOpenJob = new Request("POST", "_xpack/ml/anomaly_detectors/" + OLD_CLUSTER_OPEN_JOB_ID + "/_open"); + client().performRequest(openOpenJob); + + DatafeedConfig.Builder dfBuilder = new DatafeedConfig.Builder(OLD_CLUSTER_STARTED_DATAFEED_ID, OLD_CLUSTER_OPEN_JOB_ID); + if (getOldClusterVersion().before(Version.V_6_6_0)) { + dfBuilder.setDelayedDataCheckConfig(null); + } + dfBuilder.setIndices(Collections.singletonList("airline-data")); + dfBuilder.setTypes(Collections.singletonList("doc")); + + Request putDatafeed = new Request("PUT", "_xpack/ml/datafeeds/" + OLD_CLUSTER_STARTED_DATAFEED_ID); + putDatafeed.setJsonEntity(Strings.toString(dfBuilder.build())); + client().performRequest(putDatafeed); + + Request startDatafeed = new Request("POST", "_xpack/ml/datafeeds/" + OLD_CLUSTER_STARTED_DATAFEED_ID + "/_start"); + client().performRequest(startDatafeed); + + Job.Builder closedJob = new Job.Builder(OLD_CLUSTER_CLOSED_JOB_ID); + closedJob.setAnalysisConfig(analysisConfig); + closedJob.setDataDescription(new DataDescription.Builder()); + + Request putClosedJob = new Request("PUT", "_xpack/ml/anomaly_detectors/" + OLD_CLUSTER_CLOSED_JOB_ID); + putClosedJob.setJsonEntity(Strings.toString(closedJob)); + client().performRequest(putClosedJob); + + DatafeedConfig.Builder stoppedDfBuilder = new DatafeedConfig.Builder(OLD_CLUSTER_STOPPED_DATAFEED_ID, OLD_CLUSTER_CLOSED_JOB_ID); + if (getOldClusterVersion().before(Version.V_6_6_0)) { + stoppedDfBuilder.setDelayedDataCheckConfig(null); + } + stoppedDfBuilder.setIndices(Collections.singletonList("airline-data")); + + Request putStoppedDatafeed = new Request("PUT", "_xpack/ml/datafeeds/" + OLD_CLUSTER_STOPPED_DATAFEED_ID); + putStoppedDatafeed.setJsonEntity(Strings.toString(stoppedDfBuilder.build())); + client().performRequest(putStoppedDatafeed); + } + + private void upgradedClusterTests() throws Exception { + // wait for the closed job and datafeed to be migrated + waitForMigration(Collections.singletonList(OLD_CLUSTER_CLOSED_JOB_ID), + Collections.singletonList(OLD_CLUSTER_STOPPED_DATAFEED_ID), + Collections.singletonList(OLD_CLUSTER_OPEN_JOB_ID), + Collections.singletonList(OLD_CLUSTER_STARTED_DATAFEED_ID)); + + // the job and datafeed left open during upgrade should + // be assigned to a node + waitForJobToBeAssigned(OLD_CLUSTER_OPEN_JOB_ID); + waitForDatafeedToBeAssigned(OLD_CLUSTER_STARTED_DATAFEED_ID); + + // open the migrated job and datafeed + Request openJob = new Request("POST", "_xpack/ml/anomaly_detectors/" + OLD_CLUSTER_CLOSED_JOB_ID + "/_open"); + client().performRequest(openJob); + Request startDatafeed = new Request("POST", "_xpack/ml/datafeeds/" + OLD_CLUSTER_STOPPED_DATAFEED_ID + "/_start"); + client().performRequest(startDatafeed); + + // close the job left open during upgrade + Request stopDatafeed = new Request("POST", "_xpack/ml/datafeeds/" + OLD_CLUSTER_STARTED_DATAFEED_ID + "/_stop"); + client().performRequest(stopDatafeed); + + Request closeJob = new Request("POST", "_xpack/ml/anomaly_detectors/" + OLD_CLUSTER_OPEN_JOB_ID + "/_close"); + client().performRequest(closeJob); + + // now all jobs should be migrated + waitForMigration(Arrays.asList(OLD_CLUSTER_CLOSED_JOB_ID, OLD_CLUSTER_OPEN_JOB_ID), + Arrays.asList(OLD_CLUSTER_STOPPED_DATAFEED_ID, OLD_CLUSTER_STARTED_DATAFEED_ID), + Collections.emptyList(), + Collections.emptyList()); + } + + @SuppressWarnings("unchecked") + private void waitForJobToBeAssigned(String jobId) throws Exception { + assertBusy(() -> { + Request getJobStats = new Request("GET", "_xpack/ml/anomaly_detectors/" + jobId + "/_stats"); + Response response = client().performRequest(getJobStats); + + Map stats = entityAsMap(response); + List> jobStats = + (List>) XContentMapValues.extractValue("jobs", stats); + + assertEquals(jobId, XContentMapValues.extractValue("job_id", jobStats.get(0))); + assertEquals("opened", XContentMapValues.extractValue("state", jobStats.get(0))); + assertThat((String) XContentMapValues.extractValue("assignment_explanation", jobStats.get(0)), isEmptyOrNullString()); + assertNotNull(XContentMapValues.extractValue("node", jobStats.get(0))); + }, 30, TimeUnit.SECONDS); + } + + @SuppressWarnings("unchecked") + private void waitForDatafeedToBeAssigned(String datafeedId) throws Exception { + assertBusy(() -> { + Request getDatafeedStats = new Request("GET", "_xpack/ml/datafeeds/" + datafeedId + "/_stats"); + Response response = client().performRequest(getDatafeedStats); + Map stats = entityAsMap(response); + List> datafeedStats = + (List>) XContentMapValues.extractValue("datafeeds", stats); + + assertEquals(datafeedId, XContentMapValues.extractValue("datafeed_id", datafeedStats.get(0))); + assertEquals("started", XContentMapValues.extractValue("state", datafeedStats.get(0))); + assertThat((String) XContentMapValues.extractValue("assignment_explanation", datafeedStats.get(0)), isEmptyOrNullString()); + assertNotNull(XContentMapValues.extractValue("node", datafeedStats.get(0))); + }, 30, TimeUnit.SECONDS); + } + + @SuppressWarnings("unchecked") + private void waitForMigration(List expectedMigratedJobs, List expectedMigratedDatafeeds, + List unMigratedJobs, List unMigratedDatafeeds) throws Exception { + assertBusy(() -> { + // wait for the eligible configs to be moved from the clusterstate + Request getClusterState = new Request("GET", "/_cluster/state/metadata"); + Response response = client().performRequest(getClusterState); + Map responseMap = entityAsMap(response); + + List> jobs = + (List>) XContentMapValues.extractValue("metadata.ml.jobs", responseMap); + assertNotNull(jobs); + + for (String jobId : expectedMigratedJobs) { + assertJob(jobId, jobs, false); + } + + for (String jobId : unMigratedJobs) { + assertJob(jobId, jobs, true); + } + + List> datafeeds = + (List>) XContentMapValues.extractValue("metadata.ml.datafeeds", responseMap); + assertNotNull(datafeeds); + + for (String datafeedId : expectedMigratedDatafeeds) { + assertDatafeed(datafeedId, datafeeds, false); + } + + for (String datafeedId : unMigratedDatafeeds) { + assertDatafeed(datafeedId, datafeeds, true); + } + + }, 30, TimeUnit.SECONDS); + } + + private void assertDatafeed(String datafeedId, List> datafeeds, boolean expectedToBePresent) { + Optional config = datafeeds.stream().map(map -> map.get("datafeed_id")) + .filter(id -> id.equals(datafeedId)).findFirst(); + if (expectedToBePresent) { + assertTrue(config.isPresent()); + } else { + assertFalse(config.isPresent()); + } + } + + private void assertJob(String jobId, List> jobs, boolean expectedToBePresent) { + Optional config = jobs.stream().map(map -> map.get("job_id")) + .filter(id -> id.equals(jobId)).findFirst(); + if (expectedToBePresent) { + assertTrue(config.isPresent()); + } else { + assertFalse(config.isPresent()); + } + } +} diff --git a/x-pack/qa/rolling-upgrade/src/test/java/org/elasticsearch/upgrades/MlMigrationIT.java b/x-pack/qa/rolling-upgrade/src/test/java/org/elasticsearch/upgrades/MlMigrationIT.java index e3498b19455b4..960a2a9549325 100644 --- a/x-pack/qa/rolling-upgrade/src/test/java/org/elasticsearch/upgrades/MlMigrationIT.java +++ b/x-pack/qa/rolling-upgrade/src/test/java/org/elasticsearch/upgrades/MlMigrationIT.java @@ -460,7 +460,7 @@ private void tryUpdates() throws IOException { + "] is temporarily pending migration", true); if (datafeedDeleted && randomBoolean()) { - // delete job if the datafeed that refers to it was deleted + // delete job if the datafeed that refers to it was deleted // otherwise the request is invalid Request deleteJob = new Request("DELETE", "_xpack/ml/anomaly_detectors/" + OLD_CLUSTER_CLOSED_JOB_EXTRA_ID); updateJobExpectingSuccessOr503(OLD_CLUSTER_CLOSED_JOB_EXTRA_ID, deleteJob, "cannot update job as the configuration ["