From 2c6df68ea797689afb408accdd01db99417f0612 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Przemys=C5=82aw=20Witek?= Date: Fri, 10 Apr 2020 13:39:42 +0200 Subject: [PATCH] Do not execute ML CRUD actions when upgrade mode is enabled (#54437) --- .../ml/integration/MlNativeIntegTestCase.java | 17 ++ .../ml/integration/SetUpgradeModeIT.java | 121 ++++++++------ .../xpack/ml/MachineLearning.java | 16 +- .../xpack/ml/MlUpgradeModeActionFilter.java | 148 ++++++++++++++++++ .../ml/MlUpgradeModeActionFilterTests.java | 115 ++++++++++++++ .../test/ml/set_upgrade_mode.yml | 12 +- 6 files changed, 378 insertions(+), 51 deletions(-) create mode 100644 x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlUpgradeModeActionFilter.java create mode 100644 x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/MlUpgradeModeActionFilterTests.java diff --git a/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/MlNativeIntegTestCase.java b/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/MlNativeIntegTestCase.java index 2dd9e7611b9fc..a908a5a222f41 100644 --- a/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/MlNativeIntegTestCase.java +++ b/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/MlNativeIntegTestCase.java @@ -6,6 +6,7 @@ package org.elasticsearch.xpack.ml.integration; import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksRequest; +import org.elasticsearch.action.support.master.AcknowledgedResponse; import org.elasticsearch.client.Client; import org.elasticsearch.cluster.ClusterModule; import org.elasticsearch.cluster.ClusterState; @@ -35,6 +36,7 @@ import org.elasticsearch.xpack.core.ml.action.GetFiltersAction; import org.elasticsearch.xpack.core.ml.action.OpenJobAction; import org.elasticsearch.xpack.core.ml.action.PutFilterAction; +import org.elasticsearch.xpack.core.ml.action.SetUpgradeModeAction; import org.elasticsearch.xpack.core.ml.action.StartDataFrameAnalyticsAction; import org.elasticsearch.xpack.core.ml.action.StartDatafeedAction; import org.elasticsearch.xpack.core.ml.datafeed.DatafeedState; @@ -63,6 +65,7 @@ import static org.elasticsearch.test.XContentTestUtils.differenceBetweenMapsIgnoringArrayOrder; import static org.elasticsearch.xpack.core.security.authc.support.UsernamePasswordToken.basicAuthHeaderValue; import static org.elasticsearch.xpack.security.test.SecurityTestUtils.writeFile; +import static org.hamcrest.Matchers.is; /** * Base class of ML integration tests that use a native autodetect process @@ -136,6 +139,7 @@ protected Settings externalClusterClientSettings() { } protected void cleanUp() { + setUpgradeModeTo(false); cleanUpResources(); waitForPendingTasks(); } @@ -154,6 +158,19 @@ private void waitForPendingTasks() { } } + protected void setUpgradeModeTo(boolean enabled) { + AcknowledgedResponse response = + client().execute(SetUpgradeModeAction.INSTANCE, new SetUpgradeModeAction.Request(enabled)).actionGet(); + assertThat(response.isAcknowledged(), is(true)); + assertThat(upgradeMode(), is(enabled)); + } + + protected boolean upgradeMode() { + ClusterState masterClusterState = client().admin().cluster().prepareState().all().get().getState(); + MlMetadata mlMetadata = MlMetadata.getMlMetadata(masterClusterState); + return mlMetadata.isUpgradeMode(); + } + protected DeleteExpiredDataAction.Response deleteExpiredData() throws Exception { DeleteExpiredDataAction.Response response = client().execute(DeleteExpiredDataAction.INSTANCE, new DeleteExpiredDataAction.Request()).get(); diff --git a/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/SetUpgradeModeIT.java b/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/SetUpgradeModeIT.java index d6134feae68bd..c4a2e5db76e93 100644 --- a/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/SetUpgradeModeIT.java +++ b/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/SetUpgradeModeIT.java @@ -6,19 +6,18 @@ package org.elasticsearch.xpack.ml.integration; import org.elasticsearch.ElasticsearchStatusException; -import org.elasticsearch.action.support.master.AcknowledgedResponse; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.persistent.PersistentTasksCustomMetadata; import org.elasticsearch.rest.RestStatus; -import org.elasticsearch.xpack.core.ml.MlMetadata; import org.elasticsearch.xpack.core.ml.MlTasks; import org.elasticsearch.xpack.core.ml.action.GetDatafeedsStatsAction; import org.elasticsearch.xpack.core.ml.action.GetJobsStatsAction; -import org.elasticsearch.xpack.core.ml.action.SetUpgradeModeAction; import org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfig; import org.elasticsearch.xpack.core.ml.datafeed.DatafeedState; +import org.elasticsearch.xpack.core.ml.datafeed.DatafeedUpdate; import org.elasticsearch.xpack.core.ml.job.config.Job; import org.elasticsearch.xpack.core.ml.job.config.JobState; +import org.elasticsearch.xpack.core.ml.job.config.MlFilter; import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.DataCounts; import org.junit.After; @@ -31,8 +30,10 @@ import static org.elasticsearch.xpack.ml.support.BaseMlIntegTestCase.getDataCounts; import static org.elasticsearch.xpack.ml.support.BaseMlIntegTestCase.getDatafeedStats; import static org.elasticsearch.xpack.ml.support.BaseMlIntegTestCase.indexDocs; +import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.empty; import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.hasSize; import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.not; import static org.hamcrest.Matchers.nullValue; @@ -49,35 +50,31 @@ public void testEnableUpgradeMode() throws Exception { String datafeedId = jobId + "-datafeed"; startRealtime(jobId); + assertThat(upgradeMode(), is(false)); + // Assert appropriate task state and assignment numbers assertThat(client().admin() .cluster() .prepareListTasks() .setActions(MlTasks.JOB_TASK_NAME + "[c]", MlTasks.DATAFEED_TASK_NAME + "[c]") .get() - .getTasks() - .size(), equalTo(2)); + .getTasks(), hasSize(2)); ClusterState masterClusterState = client().admin().cluster().prepareState().all().get().getState(); PersistentTasksCustomMetadata persistentTasks = masterClusterState.getMetadata().custom(PersistentTasksCustomMetadata.TYPE); - assertThat(persistentTasks.findTasks(MlTasks.DATAFEED_TASK_NAME, task -> true).size(), equalTo(1)); - assertThat(persistentTasks.findTasks(MlTasks.JOB_TASK_NAME, task -> true).size(), equalTo(1)); - assertThat(MlMetadata.getMlMetadata(masterClusterState).isUpgradeMode(), equalTo(false)); + assertThat(persistentTasks.findTasks(MlTasks.DATAFEED_TASK_NAME, task -> true), hasSize(1)); + assertThat(persistentTasks.findTasks(MlTasks.JOB_TASK_NAME, task -> true), hasSize(1)); // Set the upgrade mode setting - AcknowledgedResponse response = client().execute(SetUpgradeModeAction.INSTANCE, new SetUpgradeModeAction.Request(true)) - .actionGet(); - - assertThat(response.isAcknowledged(), equalTo(true)); + setUpgradeModeTo(true); masterClusterState = client().admin().cluster().prepareState().all().get().getState(); // Assert state for tasks still exists and that the upgrade setting is set persistentTasks = masterClusterState.getMetadata().custom(PersistentTasksCustomMetadata.TYPE); - assertThat(persistentTasks.findTasks(MlTasks.DATAFEED_TASK_NAME, task -> true).size(), equalTo(1)); - assertThat(persistentTasks.findTasks(MlTasks.JOB_TASK_NAME, task -> true).size(), equalTo(1)); - assertThat(MlMetadata.getMlMetadata(masterClusterState).isUpgradeMode(), equalTo(true)); + assertThat(persistentTasks.findTasks(MlTasks.DATAFEED_TASK_NAME, task -> true), hasSize(1)); + assertThat(persistentTasks.findTasks(MlTasks.JOB_TASK_NAME, task -> true), hasSize(1)); assertThat(client().admin() .cluster() @@ -87,50 +84,81 @@ public void testEnableUpgradeMode() throws Exception { .getTasks(), is(empty())); GetJobsStatsAction.Response.JobStats jobStats = getJobStats(jobId).get(0); - assertThat(jobStats.getState(), equalTo(JobState.OPENED)); - assertThat(jobStats.getAssignmentExplanation(), equalTo(AWAITING_UPGRADE.getExplanation())); + assertThat(jobStats.getState(), is(equalTo(JobState.OPENED))); + assertThat(jobStats.getAssignmentExplanation(), is(equalTo(AWAITING_UPGRADE.getExplanation()))); assertThat(jobStats.getNode(), is(nullValue())); GetDatafeedsStatsAction.Response.DatafeedStats datafeedStats = getDatafeedStats(datafeedId); - assertThat(datafeedStats.getDatafeedState(), equalTo(DatafeedState.STARTED)); - assertThat(datafeedStats.getAssignmentExplanation(), equalTo(AWAITING_UPGRADE.getExplanation())); + assertThat(datafeedStats.getDatafeedState(), is(equalTo(DatafeedState.STARTED))); + assertThat(datafeedStats.getAssignmentExplanation(), is(equalTo(AWAITING_UPGRADE.getExplanation()))); assertThat(datafeedStats.getNode(), is(nullValue())); - Job.Builder job = createScheduledJob("job-should-not-open"); - registerJob(job); - putJob(job); - ElasticsearchStatusException statusException = expectThrows(ElasticsearchStatusException.class, () -> openJob(job.getId())); - assertThat(statusException.status(), equalTo(RestStatus.TOO_MANY_REQUESTS)); - assertThat(statusException.getMessage(), equalTo("Cannot open jobs when upgrade mode is enabled")); - - //Disable the setting - response = client().execute(SetUpgradeModeAction.INSTANCE, new SetUpgradeModeAction.Request(false)) - .actionGet(); - - assertThat(response.isAcknowledged(), equalTo(true)); + // Disable the setting + setUpgradeModeTo(false); masterClusterState = client().admin().cluster().prepareState().all().get().getState(); persistentTasks = masterClusterState.getMetadata().custom(PersistentTasksCustomMetadata.TYPE); - assertThat(persistentTasks.findTasks(MlTasks.DATAFEED_TASK_NAME, task -> true).size(), equalTo(1)); - assertThat(persistentTasks.findTasks(MlTasks.JOB_TASK_NAME, task -> true).size(), equalTo(1)); - assertThat(MlMetadata.getMlMetadata(masterClusterState).isUpgradeMode(), equalTo(false)); + assertThat(persistentTasks.findTasks(MlTasks.DATAFEED_TASK_NAME, task -> true), hasSize(1)); + assertThat(persistentTasks.findTasks(MlTasks.JOB_TASK_NAME, task -> true), hasSize(1)); assertBusy(() -> assertThat(client().admin() .cluster() .prepareListTasks() .setActions(MlTasks.JOB_TASK_NAME + "[c]", MlTasks.DATAFEED_TASK_NAME + "[c]") .get() - .getTasks() - .size(), equalTo(2))); + .getTasks(), hasSize(2))); jobStats = getJobStats(jobId).get(0); - assertThat(jobStats.getState(), equalTo(JobState.OPENED)); - assertThat(jobStats.getAssignmentExplanation(), not(equalTo(AWAITING_UPGRADE.getExplanation()))); + assertThat(jobStats.getState(), is(equalTo(JobState.OPENED))); + assertThat(jobStats.getAssignmentExplanation(), is(not(equalTo(AWAITING_UPGRADE.getExplanation())))); datafeedStats = getDatafeedStats(datafeedId); - assertThat(datafeedStats.getDatafeedState(), equalTo(DatafeedState.STARTED)); - assertThat(datafeedStats.getAssignmentExplanation(), not(equalTo(AWAITING_UPGRADE.getExplanation()))); + assertThat(datafeedStats.getDatafeedState(), is(equalTo(DatafeedState.STARTED))); + assertThat(datafeedStats.getAssignmentExplanation(), is(not(equalTo(AWAITING_UPGRADE.getExplanation())))); + } + + public void testJobOpenActionInUpgradeMode() { + String jobId = "job-should-not-open"; + Job.Builder job = createScheduledJob(jobId); + registerJob(job); + putJob(job); + + setUpgradeModeTo(true); + + ElasticsearchStatusException e = expectThrows(ElasticsearchStatusException.class, () -> openJob(jobId)); + assertThat(e.getMessage(), is(equalTo("Cannot perform cluster:admin/xpack/ml/job/open action while upgrade mode is enabled"))); + assertThat(e.status(), is(equalTo(RestStatus.TOO_MANY_REQUESTS))); + } + + public void testAnomalyDetectionActionsInUpgradeMode() { + setUpgradeModeTo(true); + + String jobId = "job_id"; + expectThrowsUpgradeModeException(() -> putJob(createScheduledJob(jobId))); + expectThrowsUpgradeModeException(() -> updateJob(jobId, null)); + expectThrowsUpgradeModeException(() -> deleteJob(jobId)); + expectThrowsUpgradeModeException(() -> openJob(jobId)); + expectThrowsUpgradeModeException(() -> flushJob(jobId, false)); + expectThrowsUpgradeModeException(() -> closeJob(jobId)); + expectThrowsUpgradeModeException(() -> persistJob(jobId)); + expectThrowsUpgradeModeException(() -> forecast(jobId, null, null)); + + String snapshotId = "snapshot_id"; + expectThrowsUpgradeModeException(() -> revertModelSnapshot(jobId, snapshotId)); + + String datafeedId = "datafeed_id"; + expectThrowsUpgradeModeException(() -> putDatafeed(createDatafeed(datafeedId, jobId, Collections.singletonList("index")))); + expectThrowsUpgradeModeException(() -> updateDatafeed(new DatafeedUpdate.Builder(datafeedId).build())); + expectThrowsUpgradeModeException(() -> deleteDatafeed(datafeedId)); + expectThrowsUpgradeModeException(() -> startDatafeed(datafeedId, 0, null)); + expectThrowsUpgradeModeException(() -> stopDatafeed(datafeedId)); + + String filterId = "filter_id"; + expectThrowsUpgradeModeException(() -> putMlFilter(MlFilter.builder(filterId).build())); + + String calendarId = "calendar_id"; + expectThrowsUpgradeModeException(() -> putCalendar(calendarId, Collections.singletonList(jobId), "")); } private void startRealtime(String jobId) throws Exception { @@ -154,8 +182,8 @@ private void startRealtime(String jobId) throws Exception { startDatafeed(datafeedConfig.getId(), 0L, null); assertBusy(() -> { DataCounts dataCounts = getDataCounts(job.getId()); - assertThat(dataCounts.getProcessedRecordCount(), equalTo(numDocs1)); - assertThat(dataCounts.getOutOfOrderTimeStampCount(), equalTo(0L)); + assertThat(dataCounts.getProcessedRecordCount(), is(equalTo(numDocs1))); + assertThat(dataCounts.getOutOfOrderTimeStampCount(), is(equalTo(0L))); }); long numDocs2 = randomIntBetween(2, 64); @@ -163,9 +191,14 @@ private void startRealtime(String jobId) throws Exception { indexDocs(logger, "data", numDocs2, now + 5000, now + 6000); assertBusy(() -> { DataCounts dataCounts = getDataCounts(job.getId()); - assertThat(dataCounts.getProcessedRecordCount(), equalTo(numDocs1 + numDocs2)); - assertThat(dataCounts.getOutOfOrderTimeStampCount(), equalTo(0L)); + assertThat(dataCounts.getProcessedRecordCount(), is(equalTo(numDocs1 + numDocs2))); + assertThat(dataCounts.getOutOfOrderTimeStampCount(), is(equalTo(0L))); }, 30, TimeUnit.SECONDS); } + private static void expectThrowsUpgradeModeException(ThrowingRunnable actionInvocation) { + ElasticsearchStatusException e = expectThrows(ElasticsearchStatusException.class, actionInvocation); + assertThat(e.getMessage(), containsString("upgrade mode is enabled")); + assertThat(e.status(), is(equalTo(RestStatus.TOO_MANY_REQUESTS))); + } } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java index e5b80ec84ecda..47d818eed201f 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java @@ -11,6 +11,7 @@ import org.elasticsearch.ElasticsearchException; import org.elasticsearch.action.ActionRequest; import org.elasticsearch.action.ActionResponse; +import org.elasticsearch.action.support.ActionFilter; import org.elasticsearch.client.Client; import org.elasticsearch.client.OriginSettingClient; import org.elasticsearch.client.node.NodeClient; @@ -332,6 +333,7 @@ import java.util.function.UnaryOperator; import static java.util.Collections.emptyList; +import static java.util.Collections.singletonList; public class MachineLearning extends Plugin implements SystemIndexPlugin, AnalysisPlugin, IngestPlugin, PersistentTaskPlugin { public static final String NAME = "ml"; @@ -428,6 +430,7 @@ public Set getRoles() { private final SetOnce dataFrameAnalyticsManager = new SetOnce<>(); private final SetOnce dataFrameAnalyticsAuditor = new SetOnce<>(); private final SetOnce memoryTracker = new SetOnce<>(); + private final SetOnce mlUpgradeModeActionFilter = new SetOnce<>(); public MachineLearning(Settings settings, Path configPath) { this.settings = settings; @@ -530,9 +533,11 @@ public Collection createComponents(Client client, ClusterService cluster IndexNameExpressionResolver indexNameExpressionResolver) { if (enabled == false) { // special holder for @link(MachineLearningFeatureSetUsage) which needs access to job manager, empty if ML is disabled - return Collections.singletonList(new JobManagerHolder()); + return singletonList(new JobManagerHolder()); } + this.mlUpgradeModeActionFilter.set(new MlUpgradeModeActionFilter(clusterService)); + new MlIndexTemplateRegistry(settings, clusterService, threadPool, client, xContentRegistry); AnomalyDetectionAuditor anomalyDetectionAuditor = new AnomalyDetectionAuditor(client, clusterService.getNodeName()); @@ -881,6 +886,15 @@ public List getRestHandlers(Settings settings, RestController restC infoAction); } + @Override + public List getActionFilters() { + if (enabled == false) { + return emptyList(); + } + + return singletonList(this.mlUpgradeModeActionFilter.get()); + } + @Override public List> getExecutorBuilders(Settings settings) { if (false == enabled) { diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlUpgradeModeActionFilter.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlUpgradeModeActionFilter.java new file mode 100644 index 0000000000000..9466ebc17729d --- /dev/null +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlUpgradeModeActionFilter.java @@ -0,0 +1,148 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.ml; + +import org.elasticsearch.ElasticsearchStatusException; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.ActionRequest; +import org.elasticsearch.action.support.ActionFilter; +import org.elasticsearch.cluster.ClusterChangedEvent; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.rest.RestStatus; +import org.elasticsearch.xpack.core.ml.MlMetadata; +import org.elasticsearch.xpack.core.ml.action.CloseJobAction; +import org.elasticsearch.xpack.core.ml.action.DeleteCalendarAction; +import org.elasticsearch.xpack.core.ml.action.DeleteCalendarEventAction; +import org.elasticsearch.xpack.core.ml.action.DeleteDataFrameAnalyticsAction; +import org.elasticsearch.xpack.core.ml.action.DeleteDatafeedAction; +import org.elasticsearch.xpack.core.ml.action.DeleteExpiredDataAction; +import org.elasticsearch.xpack.core.ml.action.DeleteFilterAction; +import org.elasticsearch.xpack.core.ml.action.DeleteForecastAction; +import org.elasticsearch.xpack.core.ml.action.DeleteJobAction; +import org.elasticsearch.xpack.core.ml.action.DeleteModelSnapshotAction; +import org.elasticsearch.xpack.core.ml.action.DeleteTrainedModelAction; +import org.elasticsearch.xpack.core.ml.action.FinalizeJobExecutionAction; +import org.elasticsearch.xpack.core.ml.action.FlushJobAction; +import org.elasticsearch.xpack.core.ml.action.ForecastJobAction; +import org.elasticsearch.xpack.core.ml.action.KillProcessAction; +import org.elasticsearch.xpack.core.ml.action.OpenJobAction; +import org.elasticsearch.xpack.core.ml.action.PersistJobAction; +import org.elasticsearch.xpack.core.ml.action.PostCalendarEventsAction; +import org.elasticsearch.xpack.core.ml.action.PostDataAction; +import org.elasticsearch.xpack.core.ml.action.PutCalendarAction; +import org.elasticsearch.xpack.core.ml.action.PutDataFrameAnalyticsAction; +import org.elasticsearch.xpack.core.ml.action.PutDatafeedAction; +import org.elasticsearch.xpack.core.ml.action.PutFilterAction; +import org.elasticsearch.xpack.core.ml.action.PutJobAction; +import org.elasticsearch.xpack.core.ml.action.PutTrainedModelAction; +import org.elasticsearch.xpack.core.ml.action.RevertModelSnapshotAction; +import org.elasticsearch.xpack.core.ml.action.StartDataFrameAnalyticsAction; +import org.elasticsearch.xpack.core.ml.action.StartDatafeedAction; +import org.elasticsearch.xpack.core.ml.action.StopDataFrameAnalyticsAction; +import org.elasticsearch.xpack.core.ml.action.StopDatafeedAction; +import org.elasticsearch.xpack.core.ml.action.UpdateCalendarJobAction; +import org.elasticsearch.xpack.core.ml.action.UpdateDatafeedAction; +import org.elasticsearch.xpack.core.ml.action.UpdateFilterAction; +import org.elasticsearch.xpack.core.ml.action.UpdateJobAction; +import org.elasticsearch.xpack.core.ml.action.UpdateModelSnapshotAction; +import org.elasticsearch.xpack.core.ml.action.UpdateProcessAction; + +import java.util.Arrays; +import java.util.Collections; +import java.util.HashSet; +import java.util.Objects; +import java.util.Set; +import java.util.concurrent.atomic.AtomicBoolean; + +/** + * {@link MlUpgradeModeActionFilter} disallows certain actions if the cluster is currently in upgrade mode. + * + * Disallowed actions are the ones which can access/alter the state of ML internal indices. + */ +class MlUpgradeModeActionFilter extends ActionFilter.Simple { + + private static final Set ACTIONS_DISALLOWED_IN_UPGRADE_MODE = + Collections.unmodifiableSet(new HashSet<>(Arrays.asList( + PutJobAction.NAME, + UpdateJobAction.NAME, + DeleteJobAction.NAME, + OpenJobAction.NAME, + FlushJobAction.NAME, + CloseJobAction.NAME, + PersistJobAction.NAME, + + FinalizeJobExecutionAction.NAME, + PostDataAction.NAME, + + RevertModelSnapshotAction.NAME, + UpdateModelSnapshotAction.NAME, + DeleteModelSnapshotAction.NAME, + + PutDatafeedAction.NAME, + UpdateDatafeedAction.NAME, + DeleteDatafeedAction.NAME, + StartDatafeedAction.NAME, + StopDatafeedAction.NAME, + + PutFilterAction.NAME, + UpdateFilterAction.NAME, + DeleteFilterAction.NAME, + + PutCalendarAction.NAME, + UpdateCalendarJobAction.NAME, + PostCalendarEventsAction.NAME, + DeleteCalendarAction.NAME, + DeleteCalendarEventAction.NAME, + + UpdateProcessAction.NAME, + KillProcessAction.NAME, + + DeleteExpiredDataAction.NAME, + + ForecastJobAction.NAME, + DeleteForecastAction.NAME, + + PutDataFrameAnalyticsAction.NAME, + DeleteDataFrameAnalyticsAction.NAME, + StartDataFrameAnalyticsAction.NAME, + StopDataFrameAnalyticsAction.NAME, + + PutTrainedModelAction.NAME, + DeleteTrainedModelAction.NAME + ))); + + private final AtomicBoolean isUpgradeMode = new AtomicBoolean(); + + MlUpgradeModeActionFilter(ClusterService clusterService) { + Objects.requireNonNull(clusterService); + clusterService.addListener(this::setIsUpgradeMode); + } + + @Override + protected boolean apply(String action, ActionRequest request, ActionListener listener) { + if (isUpgradeMode.get() && ACTIONS_DISALLOWED_IN_UPGRADE_MODE.contains(action)) { + throw new ElasticsearchStatusException( + "Cannot perform {} action while upgrade mode is enabled", RestStatus.TOO_MANY_REQUESTS, action); + } + return true; + } + + /** + * To prevent leaking information to unauthorized users, it is extremely important that this filter is executed *after* the + * {@code SecurityActionFilter}. + * To achieve that, the number returned by this method must be greater than the number returned by the + * {@code SecurityActionFilter::order} method. + */ + @Override + public int order() { + return Integer.MAX_VALUE; + } + + // Visible for testing + void setIsUpgradeMode(ClusterChangedEvent event) { + isUpgradeMode.set(MlMetadata.getMlMetadata(event.state()).isUpgradeMode()); + } +} diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/MlUpgradeModeActionFilterTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/MlUpgradeModeActionFilterTests.java new file mode 100644 index 0000000000000..6058906de284e --- /dev/null +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/MlUpgradeModeActionFilterTests.java @@ -0,0 +1,115 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.ml; + +import org.elasticsearch.ElasticsearchStatusException; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.ActionRequest; +import org.elasticsearch.action.ActionResponse; +import org.elasticsearch.action.support.ActionFilter; +import org.elasticsearch.action.support.ActionFilterChain; +import org.elasticsearch.action.support.ActionFilters; +import org.elasticsearch.cluster.ClusterChangedEvent; +import org.elasticsearch.cluster.ClusterName; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.metadata.Metadata; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.util.set.Sets; +import org.elasticsearch.rest.RestStatus; +import org.elasticsearch.tasks.Task; +import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.xpack.core.ml.MlMetadata; +import org.elasticsearch.xpack.core.ml.action.PutJobAction; +import org.elasticsearch.xpack.core.ml.action.SetUpgradeModeAction; +import org.elasticsearch.xpack.security.action.filter.SecurityActionFilter; +import org.junit.After; +import org.junit.Before; + +import static org.hamcrest.Matchers.arrayContaining; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.is; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoMoreInteractions; + +public class MlUpgradeModeActionFilterTests extends ESTestCase { + + private static final String DISALLOWED_ACTION = PutJobAction.NAME; + private static final String ALLOWED_ACTION = SetUpgradeModeAction.NAME; + + private ClusterService clusterService; + private Task task; + private ActionRequest request; + private ActionListener listener; + private ActionFilterChain chain; + + @Before + @SuppressWarnings("unchecked") + public void setUpMocks() { + clusterService = mock(ClusterService.class); + task = mock(Task.class); + request = mock(ActionRequest.class); + listener = mock(ActionListener.class); + chain = mock(ActionFilterChain.class); + } + + @After + public void assertNoMoreInteractions() { + verifyNoMoreInteractions(task, request, listener, chain); + } + + public void testApply_ActionDisallowedInUpgradeMode() { + MlUpgradeModeActionFilter filter = new MlUpgradeModeActionFilter(clusterService); + filter.apply(task, DISALLOWED_ACTION, request, listener, chain); + + filter.setIsUpgradeMode(createClusterChangedEvent(createClusterState(true))); + ElasticsearchStatusException e = + expectThrows( + ElasticsearchStatusException.class, + () -> filter.apply(task, DISALLOWED_ACTION, request, listener, chain)); + + filter.setIsUpgradeMode(createClusterChangedEvent(createClusterState(false))); + filter.apply(task, DISALLOWED_ACTION, request, listener, chain); + + assertThat(e.getMessage(), is(equalTo("Cannot perform " + DISALLOWED_ACTION + " action while upgrade mode is enabled"))); + assertThat(e.status(), is(equalTo(RestStatus.TOO_MANY_REQUESTS))); + + verify(chain, times(2)).proceed(task, DISALLOWED_ACTION, request, listener); + } + + public void testApply_ActionAllowedInUpgradeMode() { + MlUpgradeModeActionFilter filter = new MlUpgradeModeActionFilter(clusterService); + filter.apply(task, ALLOWED_ACTION, request, listener, chain); + + filter.setIsUpgradeMode(createClusterChangedEvent(createClusterState(true))); + filter.apply(task, ALLOWED_ACTION, request, listener, chain); + + filter.setIsUpgradeMode(createClusterChangedEvent(createClusterState(false))); + filter.apply(task, ALLOWED_ACTION, request, listener, chain); + + verify(chain, times(3)).proceed(task, ALLOWED_ACTION, request, listener); + } + + public void testOrder_UpgradeFilterIsExecutedAfterSecurityFilter() { + MlUpgradeModeActionFilter upgradeModeFilter = new MlUpgradeModeActionFilter(clusterService); + SecurityActionFilter securityFilter = new SecurityActionFilter(null, null, null, mock(ThreadPool.class), null, null); + + ActionFilter[] actionFiltersInOrderOfExecution = new ActionFilters(Sets.newHashSet(upgradeModeFilter, securityFilter)).filters(); + assertThat(actionFiltersInOrderOfExecution, is(arrayContaining(securityFilter, upgradeModeFilter))); + } + + private static ClusterChangedEvent createClusterChangedEvent(ClusterState clusterState) { + return new ClusterChangedEvent("created-from-test", clusterState, clusterState); + } + + private static ClusterState createClusterState(boolean isUpgradeMode) { + return ClusterState.builder(new ClusterName("MlUpgradeModeActionFilterTests")) + .metadata(Metadata.builder().putCustom(MlMetadata.TYPE, new MlMetadata.Builder().isUpgradeMode(isUpgradeMode).build())) + .build(); + } +} diff --git a/x-pack/plugin/src/test/resources/rest-api-spec/test/ml/set_upgrade_mode.yml b/x-pack/plugin/src/test/resources/rest-api-spec/test/ml/set_upgrade_mode.yml index 4a93e46c6b491..f893c6e987241 100644 --- a/x-pack/plugin/src/test/resources/rest-api-spec/test/ml/set_upgrade_mode.yml +++ b/x-pack/plugin/src/test/resources/rest-api-spec/test/ml/set_upgrade_mode.yml @@ -220,11 +220,6 @@ teardown: --- "Attempt to open job when upgrade_mode is enabled": - - do: - ml.set_upgrade_mode: - enabled: true - - match: { acknowledged: true } - - do: ml.put_job: job_id: failing-set-upgrade-mode-job @@ -243,6 +238,11 @@ teardown: } - do: - catch: /Cannot open jobs when upgrade mode is enabled/ + ml.set_upgrade_mode: + enabled: true + - match: { acknowledged: true } + + - do: + catch: /Cannot perform cluster:admin/xpack/ml/job/open action while upgrade mode is enabled/ ml.open_job: job_id: failing-set-upgrade-mode-job