From 0cd088b0707c367e2426ea2c0e5cf32979b8c4bb Mon Sep 17 00:00:00 2001 From: David Kyle Date: Tue, 18 Dec 2018 20:18:51 +0000 Subject: [PATCH] [ML] ensure the ml-config index (#36792) --- .../persistence/AnomalyDetectorsIndex.java | 2 + .../xpack/ml/MachineLearning.java | 4 +- .../xpack/ml/MlAssignmentNotifier.java | 25 ++--- .../ml/MlConfigMigrationEligibilityCheck.java | 18 +++ .../xpack/ml/MlConfigMigrator.java | 78 +++++++++---- .../persistence/DatafeedConfigProvider.java | 22 ++-- .../ml/job/persistence/JobConfigProvider.java | 22 ++-- .../xpack/ml/MlAssignmentNotifierTests.java | 6 +- ...lConfigMigrationEligibilityCheckTests.java | 106 +++++++++++++++--- .../ml/integration/MlConfigMigratorIT.java | 69 +++++++++++- .../xpack/ml/job/JobManagerTests.java | 34 +++++- 11 files changed, 303 insertions(+), 83 deletions(-) diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/persistence/AnomalyDetectorsIndex.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/persistence/AnomalyDetectorsIndex.java index b7b104e35cdec..673e796ef7e1f 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/persistence/AnomalyDetectorsIndex.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/persistence/AnomalyDetectorsIndex.java @@ -10,6 +10,8 @@ */ public final class AnomalyDetectorsIndex { + public static final int CONFIG_INDEX_MAX_RESULTS_WINDOW = 10_000; + private AnomalyDetectorsIndex() { } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java index 4e635fe82ad48..41f50024ac344 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java @@ -675,7 +675,9 @@ public UnaryOperator> getIndexTemplateMetaDat // least possible burden on Elasticsearch .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1) .put(IndexMetaData.SETTING_AUTO_EXPAND_REPLICAS, "0-1") - .put(UnassignedInfo.INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING.getKey(), delayedNodeTimeOutSetting)) + .put(UnassignedInfo.INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING.getKey(), delayedNodeTimeOutSetting) + .put(IndexSettings.MAX_RESULT_WINDOW_SETTING.getKey(), + AnomalyDetectorsIndex.CONFIG_INDEX_MAX_RESULTS_WINDOW)) .version(Version.CURRENT.id) .putMapping(ElasticsearchMappings.DOC_TYPE, Strings.toString(configMapping)) .build(); diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlAssignmentNotifier.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlAssignmentNotifier.java index ba1000135191e..9db17ed448433 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlAssignmentNotifier.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlAssignmentNotifier.java @@ -10,7 +10,6 @@ import org.elasticsearch.action.ActionListener; import org.elasticsearch.client.Client; import org.elasticsearch.cluster.ClusterChangedEvent; -import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterStateListener; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.service.ClusterService; @@ -58,23 +57,23 @@ public void clusterChanged(ClusterChangedEvent event) { return; } - if (event.metaDataChanged() == false) { - return; - } - PersistentTasksCustomMetaData previous = event.previousState().getMetaData().custom(PersistentTasksCustomMetaData.TYPE); - PersistentTasksCustomMetaData current = event.state().getMetaData().custom(PersistentTasksCustomMetaData.TYPE); - mlConfigMigrator.migrateConfigsWithoutTasks(event.state(), ActionListener.wrap( - response -> threadPool.executor(executorName()).execute(() -> auditChangesToMlTasks(current, previous, event.state())), + response -> threadPool.executor(executorName()).execute(() -> auditChangesToMlTasks(event)), e -> { logger.error("error migrating ml configurations", e); - threadPool.executor(executorName()).execute(() -> auditChangesToMlTasks(current, previous, event.state())); + threadPool.executor(executorName()).execute(() -> auditChangesToMlTasks(event)); } )); } - private void auditChangesToMlTasks(PersistentTasksCustomMetaData current, PersistentTasksCustomMetaData previous, - ClusterState state) { + private void auditChangesToMlTasks(ClusterChangedEvent event) { + + if (event.metaDataChanged() == false) { + return; + } + + PersistentTasksCustomMetaData previous = event.previousState().getMetaData().custom(PersistentTasksCustomMetaData.TYPE); + PersistentTasksCustomMetaData current = event.state().getMetaData().custom(PersistentTasksCustomMetaData.TYPE); if (Objects.equals(previous, current)) { return; @@ -92,7 +91,7 @@ private void auditChangesToMlTasks(PersistentTasksCustomMetaData current, Persis if (currentAssignment.getExecutorNode() == null) { auditor.warning(jobId, "No node found to open job. Reasons [" + currentAssignment.getExplanation() + "]"); } else { - DiscoveryNode node = state.nodes().get(currentAssignment.getExecutorNode()); + DiscoveryNode node = event.state().nodes().get(currentAssignment.getExecutorNode()); auditor.info(jobId, "Opening job on node [" + node.toString() + "]"); } } else if (MlTasks.DATAFEED_TASK_NAME.equals(currentTask.getTaskName())) { @@ -106,7 +105,7 @@ private void auditChangesToMlTasks(PersistentTasksCustomMetaData current, Persis auditor.warning(jobId, msg); } } else { - DiscoveryNode node = state.nodes().get(currentAssignment.getExecutorNode()); + DiscoveryNode node = event.state().nodes().get(currentAssignment.getExecutorNode()); if (jobId != null) { auditor.info(jobId, "Starting datafeed [" + datafeedParams.getDatafeedId() + "] on node [" + node + "]"); } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlConfigMigrationEligibilityCheck.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlConfigMigrationEligibilityCheck.java index 0f127919ac3d0..72cb52424c3b1 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlConfigMigrationEligibilityCheck.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlConfigMigrationEligibilityCheck.java @@ -7,6 +7,7 @@ import org.elasticsearch.Version; import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.routing.IndexRoutingTable; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Settings; @@ -14,6 +15,7 @@ import org.elasticsearch.xpack.core.ml.MlMetadata; import org.elasticsearch.xpack.core.ml.MlTasks; import org.elasticsearch.xpack.core.ml.job.config.Job; +import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndex; /** * Checks whether migration can start and whether ML resources (e.g. jobs, datafeeds) @@ -37,10 +39,12 @@ private void setConfigMigrationEnabled(boolean configMigrationEnabled) { this.isConfigMigrationEnabled = configMigrationEnabled; } + /** * Can migration start? Returns: * False if config migration is disabled via the setting {@link #ENABLE_CONFIG_MIGRATION} * False if the min node version of the cluster is before {@link #MIN_NODE_VERSION} + * False if the .ml-config index shards are not active * True otherwise * @param clusterState The cluster state * @return A boolean that dictates if config migration can start @@ -54,12 +58,26 @@ public boolean canStartMigration(ClusterState clusterState) { if (minNodeVersion.before(MIN_NODE_VERSION)) { return false; } + + return mlConfigIndexIsAllocated(clusterState); + } + + static boolean mlConfigIndexIsAllocated(ClusterState clusterState) { + if (clusterState.metaData().hasIndex(AnomalyDetectorsIndex.configIndexName()) == false) { + return false; + } + + IndexRoutingTable routingTable = clusterState.getRoutingTable().index(AnomalyDetectorsIndex.configIndexName()); + if (routingTable == null || routingTable.allPrimaryShardsActive() == false) { + return false; + } return true; } /** * Is the job a eligible for migration? Returns: * False if {@link #canStartMigration(ClusterState)} returns {@code false} + * False if the job is not in the cluster state * False if the {@link Job#isDeleting()} * False if the job has a persistent task * True otherwise i.e. the job is present, not deleting diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlConfigMigrator.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlConfigMigrator.java index c3b9626ffd042..f2fe80f377649 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlConfigMigrator.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlConfigMigrator.java @@ -11,6 +11,8 @@ import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.DocWriteRequest; import org.elasticsearch.action.DocWriteResponse; +import org.elasticsearch.action.admin.indices.create.CreateIndexRequest; +import org.elasticsearch.action.admin.indices.create.CreateIndexResponse; import org.elasticsearch.action.bulk.BulkItemResponse; import org.elasticsearch.action.bulk.BulkRequestBuilder; import org.elasticsearch.action.bulk.BulkResponse; @@ -21,6 +23,7 @@ import org.elasticsearch.client.Client; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterStateUpdateTask; +import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.settings.Settings; @@ -29,6 +32,7 @@ import org.elasticsearch.common.xcontent.ToXContentObject; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentFactory; +import org.elasticsearch.index.IndexSettings; import org.elasticsearch.persistent.PersistentTasksCustomMetaData; import org.elasticsearch.xpack.core.ml.MlMetadata; import org.elasticsearch.xpack.core.ml.MlTasks; @@ -126,19 +130,11 @@ public MlConfigMigrator(Settings settings, Client client, ClusterService cluster * @param listener The success listener */ public void migrateConfigsWithoutTasks(ClusterState clusterState, ActionListener listener) { - - if (migrationEligibilityCheck.canStartMigration(clusterState) == false) { - listener.onResponse(false); - return; - } - if (migrationInProgress.compareAndSet(false, true) == false) { listener.onResponse(Boolean.FALSE); return; } - logger.debug("migrating ml configurations"); - ActionListener unMarkMigrationInProgress = ActionListener.wrap( response -> { migrationInProgress.set(false); @@ -150,19 +146,34 @@ public void migrateConfigsWithoutTasks(ClusterState clusterState, ActionListener } ); + List batches = splitInBatches(clusterState); + if (batches.isEmpty()) { + unMarkMigrationInProgress.onResponse(Boolean.FALSE); + return; + } + + if (clusterState.metaData().hasIndex(AnomalyDetectorsIndex.configIndexName()) == false) { + createConfigIndex(ActionListener.wrap( + response -> { + unMarkMigrationInProgress.onResponse(Boolean.FALSE); + }, + unMarkMigrationInProgress::onFailure + )); + return; + } + + if (migrationEligibilityCheck.canStartMigration(clusterState) == false) { + unMarkMigrationInProgress.onResponse(Boolean.FALSE); + return; + } + snapshotMlMeta(MlMetadata.getMlMetadata(clusterState), ActionListener.wrap( - response -> { - // We have successfully snapshotted the ML configs so we don't need to try again - tookConfigSnapshot.set(true); - - List batches = splitInBatches(clusterState); - if (batches.isEmpty()) { - unMarkMigrationInProgress.onResponse(Boolean.FALSE); - return; - } - migrateBatches(batches, unMarkMigrationInProgress); - }, - unMarkMigrationInProgress::onFailure + response -> { + // We have successfully snapshotted the ML configs so we don't need to try again + tookConfigSnapshot.set(true); + migrateBatches(batches, unMarkMigrationInProgress); + }, + unMarkMigrationInProgress::onFailure )); } @@ -296,6 +307,7 @@ static RemovalResult removeJobsAndDatafeeds(List jobsToRemove, List jobs, BulkRequestBuilder bulkRequestBuilder) { ToXContent.Params params = new ToXContent.MapParams(JobConfigProvider.TO_XCONTENT_PARAMS); for (Job job : jobs) { + logger.debug("adding job to migrate: " + job.getId()); bulkRequestBuilder.add(indexRequest(job, Job.documentId(job.getId()), params)); } } @@ -303,6 +315,7 @@ private void addJobIndexRequests(Collection jobs, BulkRequestBuilder bulkRe private void addDatafeedIndexRequests(Collection datafeedConfigs, BulkRequestBuilder bulkRequestBuilder) { ToXContent.Params params = new ToXContent.MapParams(DatafeedConfigProvider.TO_XCONTENT_PARAMS); for (DatafeedConfig datafeedConfig : datafeedConfigs) { + logger.debug("adding datafeed to migrate: " + datafeedConfig.getId()); bulkRequestBuilder.add(indexRequest(datafeedConfig, DatafeedConfig.documentId(datafeedConfig.getId()), params)); } } @@ -318,7 +331,6 @@ private IndexRequest indexRequest(ToXContentObject source, String documentId, To return indexRequest; } - // public for testing public void snapshotMlMeta(MlMetadata mlMetadata, ActionListener listener) { @@ -361,6 +373,30 @@ public void snapshotMlMeta(MlMetadata mlMetadata, ActionListener listen ); } + private void createConfigIndex(ActionListener listener) { + logger.info("creating the .ml-config index"); + CreateIndexRequest createIndexRequest = new CreateIndexRequest(AnomalyDetectorsIndex.configIndexName()); + try + { + createIndexRequest.settings( + Settings.builder() + .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1) + .put(IndexMetaData.SETTING_AUTO_EXPAND_REPLICAS, "0-1") + .put(IndexSettings.MAX_RESULT_WINDOW_SETTING.getKey(), AnomalyDetectorsIndex.CONFIG_INDEX_MAX_RESULTS_WINDOW) + ); + createIndexRequest.mapping(ElasticsearchMappings.DOC_TYPE, ElasticsearchMappings.configMapping()); + } catch (Exception e) { + logger.error("error writing the .ml-config mappings", e); + listener.onFailure(e); + return; + } + + executeAsyncWithOrigin(client.threadPool().getThreadContext(), ML_ORIGIN, createIndexRequest, + ActionListener.wrap( + r -> listener.onResponse(r.isAcknowledged()), + listener::onFailure + ), client.admin().indices()::create); + } public static Job updateJobForMigration(Job job) { Job.Builder builder = new Job.Builder(job); diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/persistence/DatafeedConfigProvider.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/persistence/DatafeedConfigProvider.java index 15432f8a0ee3f..2e620204e228a 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/persistence/DatafeedConfigProvider.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/persistence/DatafeedConfigProvider.java @@ -73,6 +73,15 @@ import static org.elasticsearch.xpack.core.ClientHelper.ML_ORIGIN; import static org.elasticsearch.xpack.core.ClientHelper.executeAsyncWithOrigin; +/** + * This class implements CRUD operation for the + * datafeed configuration document + * + * The number of datafeeds returned in a search it limited to + * {@link AnomalyDetectorsIndex#CONFIG_INDEX_MAX_RESULTS_WINDOW}. + * In most cases we expect 10s or 100s of datafeeds to be defined and + * a search for all datafeeds should return all. + */ public class DatafeedConfigProvider { private static final Logger logger = LogManager.getLogger(DatafeedConfigProvider.class); @@ -88,13 +97,6 @@ public class DatafeedConfigProvider { TO_XCONTENT_PARAMS = Collections.unmodifiableMap(modifiable); } - /** - * In most cases we expect 10s or 100s of datafeeds to be defined and - * a search for all datafeeds should return all. - * TODO this is a temporary fix - */ - public int searchSize = 1000; - public DatafeedConfigProvider(Client client, NamedXContentRegistry xContentRegistry) { this.client = client; this.xContentRegistry = xContentRegistry; @@ -433,7 +435,7 @@ private SearchRequest buildExpandDatafeedIdsSearch(String expression) { return client.prepareSearch(AnomalyDetectorsIndex.configIndexName()) .setIndicesOptions(IndicesOptions.lenientExpandOpen()) .setSource(sourceBuilder) - .setSize(searchSize) + .setSize(AnomalyDetectorsIndex.CONFIG_INDEX_MAX_RESULTS_WINDOW) .request(); } @@ -458,7 +460,7 @@ public void expandDatafeedConfigs(String expression, boolean allowNoDatafeeds, A SearchRequest searchRequest = client.prepareSearch(AnomalyDetectorsIndex.configIndexName()) .setIndicesOptions(IndicesOptions.lenientExpandOpen()) .setSource(sourceBuilder) - .setSize(searchSize) + .setSize(AnomalyDetectorsIndex.CONFIG_INDEX_MAX_RESULTS_WINDOW) .request(); ExpandedIdsMatcher requiredMatches = new ExpandedIdsMatcher(tokens, allowNoDatafeeds); @@ -514,7 +516,7 @@ public void expandDatafeedConfigsWithoutMissingCheck(String expression, ActionLi SearchRequest searchRequest = client.prepareSearch(AnomalyDetectorsIndex.configIndexName()) .setIndicesOptions(IndicesOptions.lenientExpandOpen()) .setSource(sourceBuilder) - .setSize(searchSize) + .setSize(AnomalyDetectorsIndex.CONFIG_INDEX_MAX_RESULTS_WINDOW) .request(); executeAsyncWithOrigin(client.threadPool().getThreadContext(), ML_ORIGIN, searchRequest, diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobConfigProvider.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobConfigProvider.java index 9fae5da178f37..5fbc58d730528 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobConfigProvider.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobConfigProvider.java @@ -89,6 +89,11 @@ /** * This class implements CRUD operation for the * anomaly detector job configuration document + * + * The number of jobs returned in a search it limited to + * {@link AnomalyDetectorsIndex#CONFIG_INDEX_MAX_RESULTS_WINDOW}. + * In most cases we expect 10s or 100s of jobs to be defined and + * a search for all jobs should return all. */ public class JobConfigProvider { @@ -101,13 +106,6 @@ public class JobConfigProvider { TO_XCONTENT_PARAMS = Collections.unmodifiableMap(modifiable); } - /** - * In most cases we expect 10s or 100s of jobs to be defined and - * a search for all jobs should return all. - * TODO this is a temporary fix - */ - private int searchSize = 1000; - private final Client client; public JobConfigProvider(Client client) { @@ -669,7 +667,7 @@ private SearchRequest makeExpandIdsSearchRequest(String expression, boolean excl return client.prepareSearch(AnomalyDetectorsIndex.configIndexName()) .setIndicesOptions(IndicesOptions.lenientExpandOpen()) .setSource(sourceBuilder) - .setSize(searchSize) + .setSize(AnomalyDetectorsIndex.CONFIG_INDEX_MAX_RESULTS_WINDOW) .request(); } @@ -695,7 +693,7 @@ public void expandJobs(String expression, boolean allowNoJobs, boolean excludeDe SearchRequest searchRequest = client.prepareSearch(AnomalyDetectorsIndex.configIndexName()) .setIndicesOptions(IndicesOptions.lenientExpandOpen()) .setSource(sourceBuilder) - .setSize(searchSize) + .setSize(AnomalyDetectorsIndex.CONFIG_INDEX_MAX_RESULTS_WINDOW) .request(); ExpandedIdsMatcher requiredMatches = new ExpandedIdsMatcher(tokens, allowNoJobs); @@ -754,7 +752,7 @@ public void expandJobsWithoutMissingcheck(String expression, boolean excludeDele SearchRequest searchRequest = client.prepareSearch(AnomalyDetectorsIndex.configIndexName()) .setIndicesOptions(IndicesOptions.lenientExpandOpen()) .setSource(sourceBuilder) - .setSize(searchSize) + .setSize(AnomalyDetectorsIndex.CONFIG_INDEX_MAX_RESULTS_WINDOW) .request(); executeAsyncWithOrigin(client.threadPool().getThreadContext(), ML_ORIGIN, searchRequest, @@ -800,7 +798,7 @@ public void expandGroupIds(List groupIds, ActionListener> listener) { SearchRequest searchRequest = client.prepareSearch(AnomalyDetectorsIndex.configIndexName()) .setIndicesOptions(IndicesOptions.lenientExpandOpen()) .setSource(sourceBuilder) - .setSize(searchSize) + .setSize(AnomalyDetectorsIndex.CONFIG_INDEX_MAX_RESULTS_WINDOW) .request(); executeAsyncWithOrigin(client.threadPool().getThreadContext(), ML_ORIGIN, searchRequest, diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/MlAssignmentNotifierTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/MlAssignmentNotifierTests.java index e43197eb06302..5c8c253794794 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/MlAssignmentNotifierTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/MlAssignmentNotifierTests.java @@ -31,7 +31,6 @@ import static org.mockito.Matchers.eq; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.never; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verifyNoMoreInteractions; @@ -154,7 +153,8 @@ public void testClusterChanged_noPersistentTaskChanges() { .build(); notifier.clusterChanged(new ClusterChangedEvent("_test", newState, previous)); - verify(configMigrator, never()).migrateConfigsWithoutTasks(any(), any()); + verify(configMigrator, times(1)).migrateConfigsWithoutTasks(any(), any()); + verifyNoMoreInteractions(auditor); // no longer master newState = ClusterState.builder(new ClusterName("_name")) @@ -163,6 +163,6 @@ public void testClusterChanged_noPersistentTaskChanges() { .add(new DiscoveryNode("_node_id", new TransportAddress(InetAddress.getLoopbackAddress(), 9200), Version.CURRENT))) .build(); notifier.clusterChanged(new ClusterChangedEvent("_test", newState, previous)); - verify(configMigrator, never()).migrateConfigsWithoutTasks(any(), any()); + verifyNoMoreInteractions(configMigrator); } } diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/MlConfigMigrationEligibilityCheckTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/MlConfigMigrationEligibilityCheckTests.java index fec071c464104..4785f9f75a5c3 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/MlConfigMigrationEligibilityCheckTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/MlConfigMigrationEligibilityCheckTests.java @@ -8,13 +8,22 @@ import org.elasticsearch.Version; import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNodes; +import org.elasticsearch.cluster.routing.IndexRoutingTable; +import org.elasticsearch.cluster.routing.IndexShardRoutingTable; +import org.elasticsearch.cluster.routing.RecoverySource; +import org.elasticsearch.cluster.routing.RoutingTable; +import org.elasticsearch.cluster.routing.ShardRouting; +import org.elasticsearch.cluster.routing.UnassignedInfo; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.transport.TransportAddress; +import org.elasticsearch.index.Index; +import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.persistent.PersistentTasksCustomMetaData; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.xpack.core.ml.MlMetadata; @@ -24,6 +33,7 @@ import org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfig; import org.elasticsearch.xpack.core.ml.job.config.Job; import org.elasticsearch.xpack.core.ml.job.config.JobTests; +import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndex; import org.junit.Before; import java.net.InetAddress; @@ -53,12 +63,18 @@ public void testCanStartMigration_givenMigrationIsDisabled() { } public void testCanStartMigration_givenNodesNotUpToVersion() { + MetaData.Builder metaData = MetaData.builder(); + RoutingTable.Builder routingTable = RoutingTable.builder(); + addMlConfigIndex(metaData, routingTable); + // mixed 6.5 and 6.6 nodes ClusterState clusterState = ClusterState.builder(new ClusterName("_name")) - .nodes(DiscoveryNodes.builder() - .add(new DiscoveryNode("node_id1", new TransportAddress(InetAddress.getLoopbackAddress(), 9300), Version.V_6_5_0)) - .add(new DiscoveryNode("node_id2", new TransportAddress(InetAddress.getLoopbackAddress(), 9301), Version.V_6_6_0))) - .build(); + .nodes(DiscoveryNodes.builder() + .add(new DiscoveryNode("node_id1", new TransportAddress(InetAddress.getLoopbackAddress(), 9300), Version.V_6_5_0)) + .add(new DiscoveryNode("node_id2", new TransportAddress(InetAddress.getLoopbackAddress(), 9301), Version.V_6_6_0))) + .routingTable(routingTable.build()) + .metaData(metaData) + .build(); Settings settings = newSettings(true); givenClusterSettings(settings); @@ -69,12 +85,18 @@ public void testCanStartMigration_givenNodesNotUpToVersion() { } public void testCanStartMigration_givenNodesNotUpToVersionAndMigrationIsEnabled() { + MetaData.Builder metaData = MetaData.builder(); + RoutingTable.Builder routingTable = RoutingTable.builder(); + addMlConfigIndex(metaData, routingTable); + // mixed 6.5 and 6.6 nodes ClusterState clusterState = ClusterState.builder(new ClusterName("_name")) - .nodes(DiscoveryNodes.builder() - .add(new DiscoveryNode("node_id1", new TransportAddress(InetAddress.getLoopbackAddress(), 9300), Version.V_6_6_0)) - .add(new DiscoveryNode("node_id2", new TransportAddress(InetAddress.getLoopbackAddress(), 9301), Version.V_6_6_0))) - .build(); + .nodes(DiscoveryNodes.builder() + .add(new DiscoveryNode("node_id1", new TransportAddress(InetAddress.getLoopbackAddress(), 9300), Version.V_6_6_0)) + .add(new DiscoveryNode("node_id2", new TransportAddress(InetAddress.getLoopbackAddress(), 9301), Version.V_6_6_0))) + .routingTable(routingTable.build()) + .metaData(metaData) + .build(); Settings settings = newSettings(true); givenClusterSettings(settings); @@ -84,6 +106,52 @@ public void testCanStartMigration_givenNodesNotUpToVersionAndMigrationIsEnabled( assertTrue(check.canStartMigration(clusterState)); } + public void testCanStartMigration_givenMissingIndex() { + Settings settings = newSettings(true); + givenClusterSettings(settings); + + ClusterState clusterState = ClusterState.builder(new ClusterName("migratortests")) + .build(); + + MlConfigMigrationEligibilityCheck check = new MlConfigMigrationEligibilityCheck(settings, clusterService); + assertFalse(check.canStartMigration(clusterState)); + } + + public void testCanStartMigration_givenInactiveShards() { + Settings settings = newSettings(true); + givenClusterSettings(settings); + + // index is present but no routing + MetaData.Builder metaData = MetaData.builder(); + RoutingTable.Builder routingTable = RoutingTable.builder(); + addMlConfigIndex(metaData, routingTable); + ClusterState clusterState = ClusterState.builder(new ClusterName("migratortests")) + .metaData(metaData) + .build(); + + MlConfigMigrationEligibilityCheck check = new MlConfigMigrationEligibilityCheck(settings, clusterService); + assertFalse(check.canStartMigration(clusterState)); + } + + private void addMlConfigIndex(MetaData.Builder metaData, RoutingTable.Builder routingTable) { + IndexMetaData.Builder indexMetaData = IndexMetaData.builder(AnomalyDetectorsIndex.configIndexName()); + indexMetaData.settings(Settings.builder() + .put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT) + .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1) + .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0) + ); + metaData.put(indexMetaData); + Index index = new Index(AnomalyDetectorsIndex.configIndexName(), "_uuid"); + ShardId shardId = new ShardId(index, 0); + ShardRouting shardRouting = ShardRouting.newUnassigned(shardId, true, RecoverySource.EmptyStoreRecoverySource.INSTANCE, + new UnassignedInfo(UnassignedInfo.Reason.INDEX_CREATED, "")); + shardRouting = shardRouting.initialize("node_id", null, 0L); + shardRouting = shardRouting.moveToStarted(); + routingTable.add(IndexRoutingTable.builder(index) + .addIndexShard(new IndexShardRoutingTable.Builder(shardId).addShard(shardRouting).build())); + } + + public void testJobIsEligibleForMigration_givenNodesNotUpToVersion() { // mixed 6.5 and 6.6 nodes ClusterState clusterState = ClusterState.builder(new ClusterName("_name")) @@ -185,11 +253,14 @@ public void testJobIsEligibleForMigration_givenClosedJob() { Job closedJob = JobTests.buildJobBuilder("closed-job").build(); MlMetadata.Builder mlMetadata = new MlMetadata.Builder().putJob(closedJob, false); + MetaData.Builder metaData = MetaData.builder(); + RoutingTable.Builder routingTable = RoutingTable.builder(); + addMlConfigIndex(metaData, routingTable); + ClusterState clusterState = ClusterState.builder(new ClusterName("migratortests")) - .metaData(MetaData.builder() - .putCustom(MlMetadata.TYPE, mlMetadata.build()) - ) - .build(); + .metaData(metaData.putCustom(MlMetadata.TYPE, mlMetadata.build())) + .routingTable(routingTable.build()) + .build(); Settings settings = newSettings(true); givenClusterSettings(settings); @@ -283,11 +354,14 @@ public void testDatafeedIsEligibleForMigration_givenStoppedDatafeed() { mlMetadata.putDatafeed(createCompatibleDatafeed(job.getId()), Collections.emptyMap()); String datafeedId = "df-" + job.getId(); + MetaData.Builder metaData = MetaData.builder(); + RoutingTable.Builder routingTable = RoutingTable.builder(); + addMlConfigIndex(metaData, routingTable); + ClusterState clusterState = ClusterState.builder(new ClusterName("migratortests")) - .metaData(MetaData.builder() - .putCustom(MlMetadata.TYPE, mlMetadata.build()) - ) - .build(); + .metaData(metaData.putCustom(MlMetadata.TYPE, mlMetadata.build())) + .routingTable(routingTable.build()) + .build(); Settings settings = newSettings(true); givenClusterSettings(settings); diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/integration/MlConfigMigratorIT.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/integration/MlConfigMigratorIT.java index d98abea55535c..87c0e4ac824ce 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/integration/MlConfigMigratorIT.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/integration/MlConfigMigratorIT.java @@ -5,12 +5,20 @@ */ package org.elasticsearch.xpack.ml.integration; +import org.elasticsearch.Version; import org.elasticsearch.action.get.GetResponse; import org.elasticsearch.action.index.IndexResponse; import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterStateUpdateTask; +import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.MetaData; +import org.elasticsearch.cluster.routing.IndexRoutingTable; +import org.elasticsearch.cluster.routing.IndexShardRoutingTable; +import org.elasticsearch.cluster.routing.RecoverySource; +import org.elasticsearch.cluster.routing.RoutingTable; +import org.elasticsearch.cluster.routing.ShardRouting; +import org.elasticsearch.cluster.routing.UnassignedInfo; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.Settings; @@ -19,6 +27,8 @@ import org.elasticsearch.common.xcontent.XContentFactory; import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.common.xcontent.XContentType; +import org.elasticsearch.index.Index; +import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.xpack.core.ml.MlMetadata; import org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfig; import org.elasticsearch.xpack.core.ml.job.config.Job; @@ -117,9 +127,12 @@ public void testMigrateConfigs() throws InterruptedException, IOException { builder.setIndices(Collections.singletonList("beats*")); mlMetadata.putDatafeed(builder.build(), Collections.emptyMap()); + MetaData.Builder metaData = MetaData.builder(); + RoutingTable.Builder routingTable = RoutingTable.builder(); + addMlConfigIndex(metaData, routingTable); ClusterState clusterState = ClusterState.builder(new ClusterName("_name")) - .metaData(MetaData.builder() - .putCustom(MlMetadata.TYPE, mlMetadata.build())) + .metaData(metaData.putCustom(MlMetadata.TYPE, mlMetadata.build())) + .routingTable(routingTable.build()) .build(); doAnswer(invocation -> { @@ -180,10 +193,13 @@ public void testMigrateConfigs_GivenLargeNumberOfJobsAndDatafeeds() throws Inter mlMetadata.putDatafeed(builder.build(), Collections.emptyMap()); } + MetaData.Builder metaData = MetaData.builder(); + RoutingTable.Builder routingTable = RoutingTable.builder(); + addMlConfigIndex(metaData, routingTable); ClusterState clusterState = ClusterState.builder(new ClusterName("_name")) - .metaData(MetaData.builder() - .putCustom(MlMetadata.TYPE, mlMetadata.build())) - .build(); + .metaData(metaData.putCustom(MlMetadata.TYPE, mlMetadata.build())) + .routingTable(routingTable.build()) + .build(); doAnswer(invocation -> { ClusterStateUpdateTask listener = (ClusterStateUpdateTask) invocation.getArguments()[1]; @@ -304,6 +320,49 @@ public void assertSnapshot(MlMetadata expectedMlMetadata) throws IOException { assertEquals(expectedMlMetadata, recoveredMeta); } } + + private void addMlConfigIndex(MetaData.Builder metaData, RoutingTable.Builder routingTable) { + IndexMetaData.Builder indexMetaData = IndexMetaData.builder(AnomalyDetectorsIndex.configIndexName()); + indexMetaData.settings(Settings.builder() + .put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT) + .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1) + .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0) + ); + metaData.put(indexMetaData); + Index index = new Index(AnomalyDetectorsIndex.configIndexName(), "_uuid"); + ShardId shardId = new ShardId(index, 0); + ShardRouting shardRouting = ShardRouting.newUnassigned(shardId, true, RecoverySource.EmptyStoreRecoverySource.INSTANCE, + new UnassignedInfo(UnassignedInfo.Reason.INDEX_CREATED, "")); + shardRouting = shardRouting.initialize("node_id", null, 0L); + shardRouting = shardRouting.moveToStarted(); + routingTable.add(IndexRoutingTable.builder(index) + .addIndexShard(new IndexShardRoutingTable.Builder(shardId).addShard(shardRouting).build())); + } + + public void testConfigIndexIsCreated() throws Exception { + // and jobs and datafeeds clusterstate + MlMetadata.Builder mlMetadata = new MlMetadata.Builder(); + mlMetadata.putJob(buildJobBuilder("job-foo").build(), false); + + ClusterState clusterState = ClusterState.builder(new ClusterName("_name")) + .metaData(MetaData.builder().putCustom(MlMetadata.TYPE, mlMetadata.build())) + .build(); + + AtomicReference exceptionHolder = new AtomicReference<>(); + AtomicReference responseHolder = new AtomicReference<>(); + MlConfigMigrator mlConfigMigrator = new MlConfigMigrator(nodeSettings(), client(), clusterService); + + // if the cluster state has a job config and the index does not + // exist it should be created + blockingCall(actionListener -> mlConfigMigrator.migrateConfigsWithoutTasks(clusterState, actionListener), + responseHolder, exceptionHolder); + + assertBusy(() -> assertTrue(configIndexExists())); + } + + private boolean configIndexExists() { + return client().admin().indices().prepareExists(AnomalyDetectorsIndex.configIndexName()).get().isExists(); + } } diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/JobManagerTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/JobManagerTests.java index 871affade508d..a2962faf945e9 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/JobManagerTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/JobManagerTests.java @@ -8,13 +8,21 @@ import org.elasticsearch.ElasticsearchStatusException; import org.elasticsearch.ResourceAlreadyExistsException; import org.elasticsearch.ResourceNotFoundException; +import org.elasticsearch.Version; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.get.GetResponse; import org.elasticsearch.client.Client; import org.elasticsearch.cluster.AckedClusterStateUpdateTask; import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.MetaData; +import org.elasticsearch.cluster.routing.IndexRoutingTable; +import org.elasticsearch.cluster.routing.IndexShardRoutingTable; +import org.elasticsearch.cluster.routing.RecoverySource; +import org.elasticsearch.cluster.routing.RoutingTable; +import org.elasticsearch.cluster.routing.ShardRouting; +import org.elasticsearch.cluster.routing.UnassignedInfo; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.document.DocumentField; @@ -25,7 +33,9 @@ import org.elasticsearch.common.xcontent.XContentFactory; import org.elasticsearch.env.Environment; import org.elasticsearch.env.TestEnvironment; +import org.elasticsearch.index.Index; import org.elasticsearch.index.analysis.AnalysisRegistry; +import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.persistent.PersistentTasksCustomMetaData; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.threadpool.ThreadPool; @@ -850,9 +860,29 @@ public void testNotifyFilterChangedGivenOnlyRemovedItems() throws IOException { public void testUpdateJob_notAllowedPreMigration() { MlMetadata.Builder mlmetadata = new MlMetadata.Builder().putJob(buildJobBuilder("closed-job-not-migrated").build(), false); + + MetaData.Builder metaData = MetaData.builder(); + RoutingTable.Builder routingTable = RoutingTable.builder(); + + IndexMetaData.Builder indexMetaData = IndexMetaData.builder(AnomalyDetectorsIndex.configIndexName()); + indexMetaData.settings(Settings.builder() + .put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT) + .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1) + .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0) + ); + metaData.put(indexMetaData); + Index index = new Index(AnomalyDetectorsIndex.configIndexName(), "_uuid"); + ShardId shardId = new ShardId(index, 0); + ShardRouting shardRouting = ShardRouting.newUnassigned(shardId, true, RecoverySource.EmptyStoreRecoverySource.INSTANCE, + new UnassignedInfo(UnassignedInfo.Reason.INDEX_CREATED, "")); + shardRouting = shardRouting.initialize("node_id", null, 0L); + shardRouting = shardRouting.moveToStarted(); + routingTable.add(IndexRoutingTable.builder(index) + .addIndexShard(new IndexShardRoutingTable.Builder(shardId).addShard(shardRouting).build())); + ClusterState clusterState = ClusterState.builder(new ClusterName("_name")) - .metaData(MetaData.builder() - .putCustom(MlMetadata.TYPE, mlmetadata.build())) + .metaData(metaData.putCustom(MlMetadata.TYPE, mlmetadata.build())) + .routingTable(routingTable.build()) .build(); when(clusterService.state()).thenReturn(clusterState);