Skip to content

Commit

Permalink
Do not execute ML CRUD actions when upgrade mode is enabled (#54437)
Browse files Browse the repository at this point in the history
  • Loading branch information
przemekwitek authored Apr 10, 2020
1 parent a7460d6 commit 2c6df68
Show file tree
Hide file tree
Showing 6 changed files with 378 additions and 51 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
package org.elasticsearch.xpack.ml.integration;

import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksRequest;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.ClusterModule;
import org.elasticsearch.cluster.ClusterState;
Expand Down Expand Up @@ -35,6 +36,7 @@
import org.elasticsearch.xpack.core.ml.action.GetFiltersAction;
import org.elasticsearch.xpack.core.ml.action.OpenJobAction;
import org.elasticsearch.xpack.core.ml.action.PutFilterAction;
import org.elasticsearch.xpack.core.ml.action.SetUpgradeModeAction;
import org.elasticsearch.xpack.core.ml.action.StartDataFrameAnalyticsAction;
import org.elasticsearch.xpack.core.ml.action.StartDatafeedAction;
import org.elasticsearch.xpack.core.ml.datafeed.DatafeedState;
Expand Down Expand Up @@ -63,6 +65,7 @@
import static org.elasticsearch.test.XContentTestUtils.differenceBetweenMapsIgnoringArrayOrder;
import static org.elasticsearch.xpack.core.security.authc.support.UsernamePasswordToken.basicAuthHeaderValue;
import static org.elasticsearch.xpack.security.test.SecurityTestUtils.writeFile;
import static org.hamcrest.Matchers.is;

/**
* Base class of ML integration tests that use a native autodetect process
Expand Down Expand Up @@ -136,6 +139,7 @@ protected Settings externalClusterClientSettings() {
}

protected void cleanUp() {
setUpgradeModeTo(false);
cleanUpResources();
waitForPendingTasks();
}
Expand All @@ -154,6 +158,19 @@ private void waitForPendingTasks() {
}
}

protected void setUpgradeModeTo(boolean enabled) {
AcknowledgedResponse response =
client().execute(SetUpgradeModeAction.INSTANCE, new SetUpgradeModeAction.Request(enabled)).actionGet();
assertThat(response.isAcknowledged(), is(true));
assertThat(upgradeMode(), is(enabled));
}

protected boolean upgradeMode() {
ClusterState masterClusterState = client().admin().cluster().prepareState().all().get().getState();
MlMetadata mlMetadata = MlMetadata.getMlMetadata(masterClusterState);
return mlMetadata.isUpgradeMode();
}

protected DeleteExpiredDataAction.Response deleteExpiredData() throws Exception {
DeleteExpiredDataAction.Response response = client().execute(DeleteExpiredDataAction.INSTANCE,
new DeleteExpiredDataAction.Request()).get();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,19 +6,18 @@
package org.elasticsearch.xpack.ml.integration;

import org.elasticsearch.ElasticsearchStatusException;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.persistent.PersistentTasksCustomMetadata;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.xpack.core.ml.MlMetadata;
import org.elasticsearch.xpack.core.ml.MlTasks;
import org.elasticsearch.xpack.core.ml.action.GetDatafeedsStatsAction;
import org.elasticsearch.xpack.core.ml.action.GetJobsStatsAction;
import org.elasticsearch.xpack.core.ml.action.SetUpgradeModeAction;
import org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfig;
import org.elasticsearch.xpack.core.ml.datafeed.DatafeedState;
import org.elasticsearch.xpack.core.ml.datafeed.DatafeedUpdate;
import org.elasticsearch.xpack.core.ml.job.config.Job;
import org.elasticsearch.xpack.core.ml.job.config.JobState;
import org.elasticsearch.xpack.core.ml.job.config.MlFilter;
import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.DataCounts;
import org.junit.After;

Expand All @@ -31,8 +30,10 @@
import static org.elasticsearch.xpack.ml.support.BaseMlIntegTestCase.getDataCounts;
import static org.elasticsearch.xpack.ml.support.BaseMlIntegTestCase.getDatafeedStats;
import static org.elasticsearch.xpack.ml.support.BaseMlIntegTestCase.indexDocs;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.not;
import static org.hamcrest.Matchers.nullValue;
Expand All @@ -49,35 +50,31 @@ public void testEnableUpgradeMode() throws Exception {
String datafeedId = jobId + "-datafeed";
startRealtime(jobId);

assertThat(upgradeMode(), is(false));

// Assert appropriate task state and assignment numbers
assertThat(client().admin()
.cluster()
.prepareListTasks()
.setActions(MlTasks.JOB_TASK_NAME + "[c]", MlTasks.DATAFEED_TASK_NAME + "[c]")
.get()
.getTasks()
.size(), equalTo(2));
.getTasks(), hasSize(2));

ClusterState masterClusterState = client().admin().cluster().prepareState().all().get().getState();

PersistentTasksCustomMetadata persistentTasks = masterClusterState.getMetadata().custom(PersistentTasksCustomMetadata.TYPE);
assertThat(persistentTasks.findTasks(MlTasks.DATAFEED_TASK_NAME, task -> true).size(), equalTo(1));
assertThat(persistentTasks.findTasks(MlTasks.JOB_TASK_NAME, task -> true).size(), equalTo(1));
assertThat(MlMetadata.getMlMetadata(masterClusterState).isUpgradeMode(), equalTo(false));
assertThat(persistentTasks.findTasks(MlTasks.DATAFEED_TASK_NAME, task -> true), hasSize(1));
assertThat(persistentTasks.findTasks(MlTasks.JOB_TASK_NAME, task -> true), hasSize(1));

// Set the upgrade mode setting
AcknowledgedResponse response = client().execute(SetUpgradeModeAction.INSTANCE, new SetUpgradeModeAction.Request(true))
.actionGet();

assertThat(response.isAcknowledged(), equalTo(true));
setUpgradeModeTo(true);

masterClusterState = client().admin().cluster().prepareState().all().get().getState();

// Assert state for tasks still exists and that the upgrade setting is set
persistentTasks = masterClusterState.getMetadata().custom(PersistentTasksCustomMetadata.TYPE);
assertThat(persistentTasks.findTasks(MlTasks.DATAFEED_TASK_NAME, task -> true).size(), equalTo(1));
assertThat(persistentTasks.findTasks(MlTasks.JOB_TASK_NAME, task -> true).size(), equalTo(1));
assertThat(MlMetadata.getMlMetadata(masterClusterState).isUpgradeMode(), equalTo(true));
assertThat(persistentTasks.findTasks(MlTasks.DATAFEED_TASK_NAME, task -> true), hasSize(1));
assertThat(persistentTasks.findTasks(MlTasks.JOB_TASK_NAME, task -> true), hasSize(1));

assertThat(client().admin()
.cluster()
Expand All @@ -87,50 +84,81 @@ public void testEnableUpgradeMode() throws Exception {
.getTasks(), is(empty()));

GetJobsStatsAction.Response.JobStats jobStats = getJobStats(jobId).get(0);
assertThat(jobStats.getState(), equalTo(JobState.OPENED));
assertThat(jobStats.getAssignmentExplanation(), equalTo(AWAITING_UPGRADE.getExplanation()));
assertThat(jobStats.getState(), is(equalTo(JobState.OPENED)));
assertThat(jobStats.getAssignmentExplanation(), is(equalTo(AWAITING_UPGRADE.getExplanation())));
assertThat(jobStats.getNode(), is(nullValue()));

GetDatafeedsStatsAction.Response.DatafeedStats datafeedStats = getDatafeedStats(datafeedId);
assertThat(datafeedStats.getDatafeedState(), equalTo(DatafeedState.STARTED));
assertThat(datafeedStats.getAssignmentExplanation(), equalTo(AWAITING_UPGRADE.getExplanation()));
assertThat(datafeedStats.getDatafeedState(), is(equalTo(DatafeedState.STARTED)));
assertThat(datafeedStats.getAssignmentExplanation(), is(equalTo(AWAITING_UPGRADE.getExplanation())));
assertThat(datafeedStats.getNode(), is(nullValue()));

Job.Builder job = createScheduledJob("job-should-not-open");
registerJob(job);
putJob(job);
ElasticsearchStatusException statusException = expectThrows(ElasticsearchStatusException.class, () -> openJob(job.getId()));
assertThat(statusException.status(), equalTo(RestStatus.TOO_MANY_REQUESTS));
assertThat(statusException.getMessage(), equalTo("Cannot open jobs when upgrade mode is enabled"));

//Disable the setting
response = client().execute(SetUpgradeModeAction.INSTANCE, new SetUpgradeModeAction.Request(false))
.actionGet();

assertThat(response.isAcknowledged(), equalTo(true));
// Disable the setting
setUpgradeModeTo(false);

masterClusterState = client().admin().cluster().prepareState().all().get().getState();

persistentTasks = masterClusterState.getMetadata().custom(PersistentTasksCustomMetadata.TYPE);
assertThat(persistentTasks.findTasks(MlTasks.DATAFEED_TASK_NAME, task -> true).size(), equalTo(1));
assertThat(persistentTasks.findTasks(MlTasks.JOB_TASK_NAME, task -> true).size(), equalTo(1));
assertThat(MlMetadata.getMlMetadata(masterClusterState).isUpgradeMode(), equalTo(false));
assertThat(persistentTasks.findTasks(MlTasks.DATAFEED_TASK_NAME, task -> true), hasSize(1));
assertThat(persistentTasks.findTasks(MlTasks.JOB_TASK_NAME, task -> true), hasSize(1));

assertBusy(() -> assertThat(client().admin()
.cluster()
.prepareListTasks()
.setActions(MlTasks.JOB_TASK_NAME + "[c]", MlTasks.DATAFEED_TASK_NAME + "[c]")
.get()
.getTasks()
.size(), equalTo(2)));
.getTasks(), hasSize(2)));

jobStats = getJobStats(jobId).get(0);
assertThat(jobStats.getState(), equalTo(JobState.OPENED));
assertThat(jobStats.getAssignmentExplanation(), not(equalTo(AWAITING_UPGRADE.getExplanation())));
assertThat(jobStats.getState(), is(equalTo(JobState.OPENED)));
assertThat(jobStats.getAssignmentExplanation(), is(not(equalTo(AWAITING_UPGRADE.getExplanation()))));

datafeedStats = getDatafeedStats(datafeedId);
assertThat(datafeedStats.getDatafeedState(), equalTo(DatafeedState.STARTED));
assertThat(datafeedStats.getAssignmentExplanation(), not(equalTo(AWAITING_UPGRADE.getExplanation())));
assertThat(datafeedStats.getDatafeedState(), is(equalTo(DatafeedState.STARTED)));
assertThat(datafeedStats.getAssignmentExplanation(), is(not(equalTo(AWAITING_UPGRADE.getExplanation()))));
}

public void testJobOpenActionInUpgradeMode() {
String jobId = "job-should-not-open";
Job.Builder job = createScheduledJob(jobId);
registerJob(job);
putJob(job);

setUpgradeModeTo(true);

ElasticsearchStatusException e = expectThrows(ElasticsearchStatusException.class, () -> openJob(jobId));
assertThat(e.getMessage(), is(equalTo("Cannot perform cluster:admin/xpack/ml/job/open action while upgrade mode is enabled")));
assertThat(e.status(), is(equalTo(RestStatus.TOO_MANY_REQUESTS)));
}

public void testAnomalyDetectionActionsInUpgradeMode() {
setUpgradeModeTo(true);

String jobId = "job_id";
expectThrowsUpgradeModeException(() -> putJob(createScheduledJob(jobId)));
expectThrowsUpgradeModeException(() -> updateJob(jobId, null));
expectThrowsUpgradeModeException(() -> deleteJob(jobId));
expectThrowsUpgradeModeException(() -> openJob(jobId));
expectThrowsUpgradeModeException(() -> flushJob(jobId, false));
expectThrowsUpgradeModeException(() -> closeJob(jobId));
expectThrowsUpgradeModeException(() -> persistJob(jobId));
expectThrowsUpgradeModeException(() -> forecast(jobId, null, null));

String snapshotId = "snapshot_id";
expectThrowsUpgradeModeException(() -> revertModelSnapshot(jobId, snapshotId));

String datafeedId = "datafeed_id";
expectThrowsUpgradeModeException(() -> putDatafeed(createDatafeed(datafeedId, jobId, Collections.singletonList("index"))));
expectThrowsUpgradeModeException(() -> updateDatafeed(new DatafeedUpdate.Builder(datafeedId).build()));
expectThrowsUpgradeModeException(() -> deleteDatafeed(datafeedId));
expectThrowsUpgradeModeException(() -> startDatafeed(datafeedId, 0, null));
expectThrowsUpgradeModeException(() -> stopDatafeed(datafeedId));

String filterId = "filter_id";
expectThrowsUpgradeModeException(() -> putMlFilter(MlFilter.builder(filterId).build()));

String calendarId = "calendar_id";
expectThrowsUpgradeModeException(() -> putCalendar(calendarId, Collections.singletonList(jobId), ""));
}

private void startRealtime(String jobId) throws Exception {
Expand All @@ -154,18 +182,23 @@ private void startRealtime(String jobId) throws Exception {
startDatafeed(datafeedConfig.getId(), 0L, null);
assertBusy(() -> {
DataCounts dataCounts = getDataCounts(job.getId());
assertThat(dataCounts.getProcessedRecordCount(), equalTo(numDocs1));
assertThat(dataCounts.getOutOfOrderTimeStampCount(), equalTo(0L));
assertThat(dataCounts.getProcessedRecordCount(), is(equalTo(numDocs1)));
assertThat(dataCounts.getOutOfOrderTimeStampCount(), is(equalTo(0L)));
});

long numDocs2 = randomIntBetween(2, 64);
now = System.currentTimeMillis();
indexDocs(logger, "data", numDocs2, now + 5000, now + 6000);
assertBusy(() -> {
DataCounts dataCounts = getDataCounts(job.getId());
assertThat(dataCounts.getProcessedRecordCount(), equalTo(numDocs1 + numDocs2));
assertThat(dataCounts.getOutOfOrderTimeStampCount(), equalTo(0L));
assertThat(dataCounts.getProcessedRecordCount(), is(equalTo(numDocs1 + numDocs2)));
assertThat(dataCounts.getOutOfOrderTimeStampCount(), is(equalTo(0L)));
}, 30, TimeUnit.SECONDS);
}

private static void expectThrowsUpgradeModeException(ThrowingRunnable actionInvocation) {
ElasticsearchStatusException e = expectThrows(ElasticsearchStatusException.class, actionInvocation);
assertThat(e.getMessage(), containsString("upgrade mode is enabled"));
assertThat(e.status(), is(equalTo(RestStatus.TOO_MANY_REQUESTS)));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.action.support.ActionFilter;
import org.elasticsearch.client.Client;
import org.elasticsearch.client.OriginSettingClient;
import org.elasticsearch.client.node.NodeClient;
Expand Down Expand Up @@ -332,6 +333,7 @@
import java.util.function.UnaryOperator;

import static java.util.Collections.emptyList;
import static java.util.Collections.singletonList;

public class MachineLearning extends Plugin implements SystemIndexPlugin, AnalysisPlugin, IngestPlugin, PersistentTaskPlugin {
public static final String NAME = "ml";
Expand Down Expand Up @@ -428,6 +430,7 @@ public Set<DiscoveryNodeRole> getRoles() {
private final SetOnce<DataFrameAnalyticsManager> dataFrameAnalyticsManager = new SetOnce<>();
private final SetOnce<DataFrameAnalyticsAuditor> dataFrameAnalyticsAuditor = new SetOnce<>();
private final SetOnce<MlMemoryTracker> memoryTracker = new SetOnce<>();
private final SetOnce<ActionFilter> mlUpgradeModeActionFilter = new SetOnce<>();

public MachineLearning(Settings settings, Path configPath) {
this.settings = settings;
Expand Down Expand Up @@ -530,9 +533,11 @@ public Collection<Object> createComponents(Client client, ClusterService cluster
IndexNameExpressionResolver indexNameExpressionResolver) {
if (enabled == false) {
// special holder for @link(MachineLearningFeatureSetUsage) which needs access to job manager, empty if ML is disabled
return Collections.singletonList(new JobManagerHolder());
return singletonList(new JobManagerHolder());
}

this.mlUpgradeModeActionFilter.set(new MlUpgradeModeActionFilter(clusterService));

new MlIndexTemplateRegistry(settings, clusterService, threadPool, client, xContentRegistry);

AnomalyDetectionAuditor anomalyDetectionAuditor = new AnomalyDetectionAuditor(client, clusterService.getNodeName());
Expand Down Expand Up @@ -881,6 +886,15 @@ public List<RestHandler> getRestHandlers(Settings settings, RestController restC
infoAction);
}

@Override
public List<ActionFilter> getActionFilters() {
if (enabled == false) {
return emptyList();
}

return singletonList(this.mlUpgradeModeActionFilter.get());
}

@Override
public List<ExecutorBuilder<?>> getExecutorBuilders(Settings settings) {
if (false == enabled) {
Expand Down
Loading

0 comments on commit 2c6df68

Please sign in to comment.