From d43cbdab9793587daf30256fd33ba4f2a1d6b7d2 Mon Sep 17 00:00:00 2001 From: David Kyle Date: Wed, 19 Dec 2018 13:43:43 +0000 Subject: [PATCH] [ML] ensure the ml-config index (#36792) (#36832) --- .../persistence/AnomalyDetectorsIndex.java | 2 + .../xpack/ml/MachineLearning.java | 4 +- .../ml/MlConfigMigrationEligibilityCheck.java | 18 +++ .../xpack/ml/MlConfigMigrator.java | 78 +++++++++---- .../persistence/DatafeedConfigProvider.java | 23 ++-- .../ml/job/persistence/JobConfigProvider.java | 36 ++++-- .../autodetect/AutodetectProcessManager.java | 5 +- ...lConfigMigrationEligibilityCheckTests.java | 106 +++++++++++++++--- .../ml/integration/MlConfigMigratorIT.java | 69 +++++++++++- .../xpack/ml/job/JobManagerTests.java | 34 +++++- 10 files changed, 305 insertions(+), 70 deletions(-) diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/persistence/AnomalyDetectorsIndex.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/persistence/AnomalyDetectorsIndex.java index b7b104e35cdec..673e796ef7e1f 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/persistence/AnomalyDetectorsIndex.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/persistence/AnomalyDetectorsIndex.java @@ -10,6 +10,8 @@ */ public final class AnomalyDetectorsIndex { + public static final int CONFIG_INDEX_MAX_RESULTS_WINDOW = 10_000; + private AnomalyDetectorsIndex() { } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java index 65147abf20b46..5310a92fc2063 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java @@ -670,7 +670,9 @@ public UnaryOperator> getIndexTemplateMetaDat // least possible burden on Elasticsearch .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1) .put(IndexMetaData.SETTING_AUTO_EXPAND_REPLICAS, "0-1") - .put(UnassignedInfo.INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING.getKey(), delayedNodeTimeOutSetting)) + .put(UnassignedInfo.INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING.getKey(), delayedNodeTimeOutSetting) + .put(IndexSettings.MAX_RESULT_WINDOW_SETTING.getKey(), + AnomalyDetectorsIndex.CONFIG_INDEX_MAX_RESULTS_WINDOW)) .version(Version.CURRENT.id) .putMapping(ElasticsearchMappings.DOC_TYPE, Strings.toString(configMapping)) .build(); diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlConfigMigrationEligibilityCheck.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlConfigMigrationEligibilityCheck.java index 0f127919ac3d0..72cb52424c3b1 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlConfigMigrationEligibilityCheck.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlConfigMigrationEligibilityCheck.java @@ -7,6 +7,7 @@ import org.elasticsearch.Version; import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.routing.IndexRoutingTable; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Settings; @@ -14,6 +15,7 @@ import org.elasticsearch.xpack.core.ml.MlMetadata; import org.elasticsearch.xpack.core.ml.MlTasks; import org.elasticsearch.xpack.core.ml.job.config.Job; +import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndex; /** * Checks whether migration can start and whether ML resources (e.g. jobs, datafeeds) @@ -37,10 +39,12 @@ private void setConfigMigrationEnabled(boolean configMigrationEnabled) { this.isConfigMigrationEnabled = configMigrationEnabled; } + /** * Can migration start? Returns: * False if config migration is disabled via the setting {@link #ENABLE_CONFIG_MIGRATION} * False if the min node version of the cluster is before {@link #MIN_NODE_VERSION} + * False if the .ml-config index shards are not active * True otherwise * @param clusterState The cluster state * @return A boolean that dictates if config migration can start @@ -54,12 +58,26 @@ public boolean canStartMigration(ClusterState clusterState) { if (minNodeVersion.before(MIN_NODE_VERSION)) { return false; } + + return mlConfigIndexIsAllocated(clusterState); + } + + static boolean mlConfigIndexIsAllocated(ClusterState clusterState) { + if (clusterState.metaData().hasIndex(AnomalyDetectorsIndex.configIndexName()) == false) { + return false; + } + + IndexRoutingTable routingTable = clusterState.getRoutingTable().index(AnomalyDetectorsIndex.configIndexName()); + if (routingTable == null || routingTable.allPrimaryShardsActive() == false) { + return false; + } return true; } /** * Is the job a eligible for migration? Returns: * False if {@link #canStartMigration(ClusterState)} returns {@code false} + * False if the job is not in the cluster state * False if the {@link Job#isDeleting()} * False if the job has a persistent task * True otherwise i.e. the job is present, not deleting diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlConfigMigrator.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlConfigMigrator.java index c3b9626ffd042..f2fe80f377649 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlConfigMigrator.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlConfigMigrator.java @@ -11,6 +11,8 @@ import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.DocWriteRequest; import org.elasticsearch.action.DocWriteResponse; +import org.elasticsearch.action.admin.indices.create.CreateIndexRequest; +import org.elasticsearch.action.admin.indices.create.CreateIndexResponse; import org.elasticsearch.action.bulk.BulkItemResponse; import org.elasticsearch.action.bulk.BulkRequestBuilder; import org.elasticsearch.action.bulk.BulkResponse; @@ -21,6 +23,7 @@ import org.elasticsearch.client.Client; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterStateUpdateTask; +import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.settings.Settings; @@ -29,6 +32,7 @@ import org.elasticsearch.common.xcontent.ToXContentObject; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentFactory; +import org.elasticsearch.index.IndexSettings; import org.elasticsearch.persistent.PersistentTasksCustomMetaData; import org.elasticsearch.xpack.core.ml.MlMetadata; import org.elasticsearch.xpack.core.ml.MlTasks; @@ -126,19 +130,11 @@ public MlConfigMigrator(Settings settings, Client client, ClusterService cluster * @param listener The success listener */ public void migrateConfigsWithoutTasks(ClusterState clusterState, ActionListener listener) { - - if (migrationEligibilityCheck.canStartMigration(clusterState) == false) { - listener.onResponse(false); - return; - } - if (migrationInProgress.compareAndSet(false, true) == false) { listener.onResponse(Boolean.FALSE); return; } - logger.debug("migrating ml configurations"); - ActionListener unMarkMigrationInProgress = ActionListener.wrap( response -> { migrationInProgress.set(false); @@ -150,19 +146,34 @@ public void migrateConfigsWithoutTasks(ClusterState clusterState, ActionListener } ); + List batches = splitInBatches(clusterState); + if (batches.isEmpty()) { + unMarkMigrationInProgress.onResponse(Boolean.FALSE); + return; + } + + if (clusterState.metaData().hasIndex(AnomalyDetectorsIndex.configIndexName()) == false) { + createConfigIndex(ActionListener.wrap( + response -> { + unMarkMigrationInProgress.onResponse(Boolean.FALSE); + }, + unMarkMigrationInProgress::onFailure + )); + return; + } + + if (migrationEligibilityCheck.canStartMigration(clusterState) == false) { + unMarkMigrationInProgress.onResponse(Boolean.FALSE); + return; + } + snapshotMlMeta(MlMetadata.getMlMetadata(clusterState), ActionListener.wrap( - response -> { - // We have successfully snapshotted the ML configs so we don't need to try again - tookConfigSnapshot.set(true); - - List batches = splitInBatches(clusterState); - if (batches.isEmpty()) { - unMarkMigrationInProgress.onResponse(Boolean.FALSE); - return; - } - migrateBatches(batches, unMarkMigrationInProgress); - }, - unMarkMigrationInProgress::onFailure + response -> { + // We have successfully snapshotted the ML configs so we don't need to try again + tookConfigSnapshot.set(true); + migrateBatches(batches, unMarkMigrationInProgress); + }, + unMarkMigrationInProgress::onFailure )); } @@ -296,6 +307,7 @@ static RemovalResult removeJobsAndDatafeeds(List jobsToRemove, List jobs, BulkRequestBuilder bulkRequestBuilder) { ToXContent.Params params = new ToXContent.MapParams(JobConfigProvider.TO_XCONTENT_PARAMS); for (Job job : jobs) { + logger.debug("adding job to migrate: " + job.getId()); bulkRequestBuilder.add(indexRequest(job, Job.documentId(job.getId()), params)); } } @@ -303,6 +315,7 @@ private void addJobIndexRequests(Collection jobs, BulkRequestBuilder bulkRe private void addDatafeedIndexRequests(Collection datafeedConfigs, BulkRequestBuilder bulkRequestBuilder) { ToXContent.Params params = new ToXContent.MapParams(DatafeedConfigProvider.TO_XCONTENT_PARAMS); for (DatafeedConfig datafeedConfig : datafeedConfigs) { + logger.debug("adding datafeed to migrate: " + datafeedConfig.getId()); bulkRequestBuilder.add(indexRequest(datafeedConfig, DatafeedConfig.documentId(datafeedConfig.getId()), params)); } } @@ -318,7 +331,6 @@ private IndexRequest indexRequest(ToXContentObject source, String documentId, To return indexRequest; } - // public for testing public void snapshotMlMeta(MlMetadata mlMetadata, ActionListener listener) { @@ -361,6 +373,30 @@ public void snapshotMlMeta(MlMetadata mlMetadata, ActionListener listen ); } + private void createConfigIndex(ActionListener listener) { + logger.info("creating the .ml-config index"); + CreateIndexRequest createIndexRequest = new CreateIndexRequest(AnomalyDetectorsIndex.configIndexName()); + try + { + createIndexRequest.settings( + Settings.builder() + .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1) + .put(IndexMetaData.SETTING_AUTO_EXPAND_REPLICAS, "0-1") + .put(IndexSettings.MAX_RESULT_WINDOW_SETTING.getKey(), AnomalyDetectorsIndex.CONFIG_INDEX_MAX_RESULTS_WINDOW) + ); + createIndexRequest.mapping(ElasticsearchMappings.DOC_TYPE, ElasticsearchMappings.configMapping()); + } catch (Exception e) { + logger.error("error writing the .ml-config mappings", e); + listener.onFailure(e); + return; + } + + executeAsyncWithOrigin(client.threadPool().getThreadContext(), ML_ORIGIN, createIndexRequest, + ActionListener.wrap( + r -> listener.onResponse(r.isAcknowledged()), + listener::onFailure + ), client.admin().indices()::create); + } public static Job updateJobForMigration(Job job) { Job.Builder builder = new Job.Builder(job); diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/persistence/DatafeedConfigProvider.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/persistence/DatafeedConfigProvider.java index 5367278be9ac2..d9ea6cb7c32e4 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/persistence/DatafeedConfigProvider.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/persistence/DatafeedConfigProvider.java @@ -73,6 +73,15 @@ import static org.elasticsearch.xpack.core.ClientHelper.ML_ORIGIN; import static org.elasticsearch.xpack.core.ClientHelper.executeAsyncWithOrigin; +/** + * This class implements CRUD operation for the + * datafeed configuration document + * + * The number of datafeeds returned in a search it limited to + * {@link AnomalyDetectorsIndex#CONFIG_INDEX_MAX_RESULTS_WINDOW}. + * In most cases we expect 10s or 100s of datafeeds to be defined and + * a search for all datafeeds should return all. + */ public class DatafeedConfigProvider { private static final Logger logger = LogManager.getLogger(DatafeedConfigProvider.class); @@ -87,13 +96,6 @@ public class DatafeedConfigProvider { TO_XCONTENT_PARAMS = Collections.unmodifiableMap(modifiable); } - /** - * In most cases we expect 10s or 100s of datafeeds to be defined and - * a search for all datafeeds should return all. - * TODO this is a temporary fix - */ - public int searchSize = 1000; - public DatafeedConfigProvider(Client client, NamedXContentRegistry xContentRegistry) { this.client = client; this.xContentRegistry = xContentRegistry; @@ -368,7 +370,9 @@ public void expandDatafeedIds(String expression, boolean allowNoDatafeeds, Actio SearchRequest searchRequest = client.prepareSearch(AnomalyDetectorsIndex.configIndexName()) .setIndicesOptions(IndicesOptions.lenientExpandOpen()) - .setSource(sourceBuilder).request(); + .setSource(sourceBuilder) + .setSize(AnomalyDetectorsIndex.CONFIG_INDEX_MAX_RESULTS_WINDOW) + .request(); ExpandedIdsMatcher requiredMatches = new ExpandedIdsMatcher(tokens, allowNoDatafeeds); @@ -407,7 +411,6 @@ public void expandDatafeedIds(String expression, boolean allowNoDatafeeds, Actio * wildcard then setting this true will not suppress the exception * @param listener The expanded datafeed config listener */ - // NORELEASE datafeed configs should be paged or have a mechanism to return all jobs if there are many of them public void expandDatafeedConfigs(String expression, boolean allowNoDatafeeds, ActionListener> listener) { String [] tokens = ExpandedIdsMatcher.tokenizeExpression(expression); SearchSourceBuilder sourceBuilder = new SearchSourceBuilder().query(buildDatafeedIdQuery(tokens)); @@ -416,7 +419,7 @@ public void expandDatafeedConfigs(String expression, boolean allowNoDatafeeds, A SearchRequest searchRequest = client.prepareSearch(AnomalyDetectorsIndex.configIndexName()) .setIndicesOptions(IndicesOptions.lenientExpandOpen()) .setSource(sourceBuilder) - .setSize(searchSize) + .setSize(AnomalyDetectorsIndex.CONFIG_INDEX_MAX_RESULTS_WINDOW) .request(); ExpandedIdsMatcher requiredMatches = new ExpandedIdsMatcher(tokens, allowNoDatafeeds); diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobConfigProvider.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobConfigProvider.java index 49f82bba0335d..73b1fe155fc3c 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobConfigProvider.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobConfigProvider.java @@ -89,6 +89,11 @@ /** * This class implements CRUD operation for the * anomaly detector job configuration document + * + * The number of jobs returned in a search it limited to + * {@link AnomalyDetectorsIndex#CONFIG_INDEX_MAX_RESULTS_WINDOW}. + * In most cases we expect 10s or 100s of jobs to be defined and + * a search for all jobs should return all. */ public class JobConfigProvider { @@ -101,13 +106,6 @@ public class JobConfigProvider { TO_XCONTENT_PARAMS = Collections.unmodifiableMap(modifiable); } - /** - * In most cases we expect 10s or 100s of jobs to be defined and - * a search for all jobs should return all. - * TODO this is a temporary fix - */ - private int searchSize = 1000; - private final Client client; public JobConfigProvider(Client client) { @@ -565,7 +563,7 @@ public void expandJobsIds(String expression, boolean allowNoJobs, boolean exclud SearchRequest searchRequest = client.prepareSearch(AnomalyDetectorsIndex.configIndexName()) .setIndicesOptions(IndicesOptions.lenientExpandOpen()) .setSource(sourceBuilder) - .setSize(searchSize) + .setSize(AnomalyDetectorsIndex.CONFIG_INDEX_MAX_RESULTS_WINDOW) .request(); ExpandedIdsMatcher requiredMatches = new ExpandedIdsMatcher(tokens, allowNoJobs); @@ -599,6 +597,21 @@ public void expandJobsIds(String expression, boolean allowNoJobs, boolean exclud } + private SearchRequest makeExpandIdsSearchRequest(String expression, boolean excludeDeleting) { + String [] tokens = ExpandedIdsMatcher.tokenizeExpression(expression); + SearchSourceBuilder sourceBuilder = new SearchSourceBuilder().query(buildQuery(tokens, excludeDeleting)); + sourceBuilder.sort(Job.ID.getPreferredName()); + sourceBuilder.fetchSource(false); + sourceBuilder.docValueField(Job.ID.getPreferredName(), DocValueFieldsContext.USE_DEFAULT_FORMAT); + sourceBuilder.docValueField(Job.GROUPS.getPreferredName(), DocValueFieldsContext.USE_DEFAULT_FORMAT); + + return client.prepareSearch(AnomalyDetectorsIndex.configIndexName()) + .setIndicesOptions(IndicesOptions.lenientExpandOpen()) + .setSource(sourceBuilder) + .setSize(AnomalyDetectorsIndex.CONFIG_INDEX_MAX_RESULTS_WINDOW) + .request(); + } + /** * The same logic as {@link #expandJobsIds(String, boolean, boolean, ActionListener)} but * the full anomaly detector job configuration is returned. @@ -612,7 +625,6 @@ public void expandJobsIds(String expression, boolean allowNoJobs, boolean exclud * @param excludeDeleting If true exclude jobs marked as deleting * @param listener The expanded jobs listener */ - // NORELEASE jobs should be paged or have a mechanism to return all jobs if there are many of them public void expandJobs(String expression, boolean allowNoJobs, boolean excludeDeleting, ActionListener> listener) { String [] tokens = ExpandedIdsMatcher.tokenizeExpression(expression); SearchSourceBuilder sourceBuilder = new SearchSourceBuilder().query(buildQuery(tokens, excludeDeleting)); @@ -621,7 +633,7 @@ public void expandJobs(String expression, boolean allowNoJobs, boolean excludeDe SearchRequest searchRequest = client.prepareSearch(AnomalyDetectorsIndex.configIndexName()) .setIndicesOptions(IndicesOptions.lenientExpandOpen()) .setSource(sourceBuilder) - .setSize(searchSize) + .setSize(AnomalyDetectorsIndex.CONFIG_INDEX_MAX_RESULTS_WINDOW) .request(); ExpandedIdsMatcher requiredMatches = new ExpandedIdsMatcher(tokens, allowNoJobs); @@ -679,7 +691,7 @@ public void expandGroupIds(List groupIds, ActionListener> listener) { SearchRequest searchRequest = client.prepareSearch(AnomalyDetectorsIndex.configIndexName()) .setIndicesOptions(IndicesOptions.lenientExpandOpen()) .setSource(sourceBuilder) - .setSize(searchSize) + .setSize(AnomalyDetectorsIndex.CONFIG_INDEX_MAX_RESULTS_WINDOW) .request(); executeAsyncWithOrigin(client.threadPool().getThreadContext(), ML_ORIGIN, searchRequest, diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManager.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManager.java index ff946f09cac41..a1c8cadce60ee 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManager.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManager.java @@ -45,13 +45,12 @@ import org.elasticsearch.xpack.ml.action.TransportOpenJobAction.JobTask; import org.elasticsearch.xpack.ml.job.JobManager; import org.elasticsearch.xpack.ml.job.persistence.JobDataCountsPersister; -import org.elasticsearch.xpack.ml.job.persistence.JobResultsProvider; import org.elasticsearch.xpack.ml.job.persistence.JobRenormalizedResultsPersister; import org.elasticsearch.xpack.ml.job.persistence.JobResultsPersister; +import org.elasticsearch.xpack.ml.job.persistence.JobResultsProvider; import org.elasticsearch.xpack.ml.job.persistence.ScheduledEventsQueryBuilder; import org.elasticsearch.xpack.ml.job.persistence.StateStreamer; import org.elasticsearch.xpack.ml.job.process.DataCountsReporter; -import org.elasticsearch.xpack.ml.process.NativeStorageProvider; import org.elasticsearch.xpack.ml.job.process.autodetect.output.AutoDetectResultProcessor; import org.elasticsearch.xpack.ml.job.process.autodetect.params.AutodetectParams; import org.elasticsearch.xpack.ml.job.process.autodetect.params.DataLoadParams; @@ -62,6 +61,7 @@ import org.elasticsearch.xpack.ml.job.process.normalizer.ScoresUpdater; import org.elasticsearch.xpack.ml.job.process.normalizer.ShortCircuitingRenormalizer; import org.elasticsearch.xpack.ml.notifications.Auditor; +import org.elasticsearch.xpack.ml.process.NativeStorageProvider; import java.io.IOException; import java.io.InputStream; @@ -408,7 +408,6 @@ public void openJob(JobTask jobTask, Consumer closeHandler) { logger.info("Opening job [{}]", jobId); jobManager.getJob(jobId, ActionListener.wrap( - // NORELEASE JIndex. Should not be doing this work on the network thread job -> { if (job.getJobVersion() == null) { closeHandler.accept(ExceptionsHelper.badRequestException("Cannot open job [" + jobId diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/MlConfigMigrationEligibilityCheckTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/MlConfigMigrationEligibilityCheckTests.java index fec071c464104..4785f9f75a5c3 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/MlConfigMigrationEligibilityCheckTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/MlConfigMigrationEligibilityCheckTests.java @@ -8,13 +8,22 @@ import org.elasticsearch.Version; import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNodes; +import org.elasticsearch.cluster.routing.IndexRoutingTable; +import org.elasticsearch.cluster.routing.IndexShardRoutingTable; +import org.elasticsearch.cluster.routing.RecoverySource; +import org.elasticsearch.cluster.routing.RoutingTable; +import org.elasticsearch.cluster.routing.ShardRouting; +import org.elasticsearch.cluster.routing.UnassignedInfo; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.transport.TransportAddress; +import org.elasticsearch.index.Index; +import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.persistent.PersistentTasksCustomMetaData; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.xpack.core.ml.MlMetadata; @@ -24,6 +33,7 @@ import org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfig; import org.elasticsearch.xpack.core.ml.job.config.Job; import org.elasticsearch.xpack.core.ml.job.config.JobTests; +import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndex; import org.junit.Before; import java.net.InetAddress; @@ -53,12 +63,18 @@ public void testCanStartMigration_givenMigrationIsDisabled() { } public void testCanStartMigration_givenNodesNotUpToVersion() { + MetaData.Builder metaData = MetaData.builder(); + RoutingTable.Builder routingTable = RoutingTable.builder(); + addMlConfigIndex(metaData, routingTable); + // mixed 6.5 and 6.6 nodes ClusterState clusterState = ClusterState.builder(new ClusterName("_name")) - .nodes(DiscoveryNodes.builder() - .add(new DiscoveryNode("node_id1", new TransportAddress(InetAddress.getLoopbackAddress(), 9300), Version.V_6_5_0)) - .add(new DiscoveryNode("node_id2", new TransportAddress(InetAddress.getLoopbackAddress(), 9301), Version.V_6_6_0))) - .build(); + .nodes(DiscoveryNodes.builder() + .add(new DiscoveryNode("node_id1", new TransportAddress(InetAddress.getLoopbackAddress(), 9300), Version.V_6_5_0)) + .add(new DiscoveryNode("node_id2", new TransportAddress(InetAddress.getLoopbackAddress(), 9301), Version.V_6_6_0))) + .routingTable(routingTable.build()) + .metaData(metaData) + .build(); Settings settings = newSettings(true); givenClusterSettings(settings); @@ -69,12 +85,18 @@ public void testCanStartMigration_givenNodesNotUpToVersion() { } public void testCanStartMigration_givenNodesNotUpToVersionAndMigrationIsEnabled() { + MetaData.Builder metaData = MetaData.builder(); + RoutingTable.Builder routingTable = RoutingTable.builder(); + addMlConfigIndex(metaData, routingTable); + // mixed 6.5 and 6.6 nodes ClusterState clusterState = ClusterState.builder(new ClusterName("_name")) - .nodes(DiscoveryNodes.builder() - .add(new DiscoveryNode("node_id1", new TransportAddress(InetAddress.getLoopbackAddress(), 9300), Version.V_6_6_0)) - .add(new DiscoveryNode("node_id2", new TransportAddress(InetAddress.getLoopbackAddress(), 9301), Version.V_6_6_0))) - .build(); + .nodes(DiscoveryNodes.builder() + .add(new DiscoveryNode("node_id1", new TransportAddress(InetAddress.getLoopbackAddress(), 9300), Version.V_6_6_0)) + .add(new DiscoveryNode("node_id2", new TransportAddress(InetAddress.getLoopbackAddress(), 9301), Version.V_6_6_0))) + .routingTable(routingTable.build()) + .metaData(metaData) + .build(); Settings settings = newSettings(true); givenClusterSettings(settings); @@ -84,6 +106,52 @@ public void testCanStartMigration_givenNodesNotUpToVersionAndMigrationIsEnabled( assertTrue(check.canStartMigration(clusterState)); } + public void testCanStartMigration_givenMissingIndex() { + Settings settings = newSettings(true); + givenClusterSettings(settings); + + ClusterState clusterState = ClusterState.builder(new ClusterName("migratortests")) + .build(); + + MlConfigMigrationEligibilityCheck check = new MlConfigMigrationEligibilityCheck(settings, clusterService); + assertFalse(check.canStartMigration(clusterState)); + } + + public void testCanStartMigration_givenInactiveShards() { + Settings settings = newSettings(true); + givenClusterSettings(settings); + + // index is present but no routing + MetaData.Builder metaData = MetaData.builder(); + RoutingTable.Builder routingTable = RoutingTable.builder(); + addMlConfigIndex(metaData, routingTable); + ClusterState clusterState = ClusterState.builder(new ClusterName("migratortests")) + .metaData(metaData) + .build(); + + MlConfigMigrationEligibilityCheck check = new MlConfigMigrationEligibilityCheck(settings, clusterService); + assertFalse(check.canStartMigration(clusterState)); + } + + private void addMlConfigIndex(MetaData.Builder metaData, RoutingTable.Builder routingTable) { + IndexMetaData.Builder indexMetaData = IndexMetaData.builder(AnomalyDetectorsIndex.configIndexName()); + indexMetaData.settings(Settings.builder() + .put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT) + .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1) + .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0) + ); + metaData.put(indexMetaData); + Index index = new Index(AnomalyDetectorsIndex.configIndexName(), "_uuid"); + ShardId shardId = new ShardId(index, 0); + ShardRouting shardRouting = ShardRouting.newUnassigned(shardId, true, RecoverySource.EmptyStoreRecoverySource.INSTANCE, + new UnassignedInfo(UnassignedInfo.Reason.INDEX_CREATED, "")); + shardRouting = shardRouting.initialize("node_id", null, 0L); + shardRouting = shardRouting.moveToStarted(); + routingTable.add(IndexRoutingTable.builder(index) + .addIndexShard(new IndexShardRoutingTable.Builder(shardId).addShard(shardRouting).build())); + } + + public void testJobIsEligibleForMigration_givenNodesNotUpToVersion() { // mixed 6.5 and 6.6 nodes ClusterState clusterState = ClusterState.builder(new ClusterName("_name")) @@ -185,11 +253,14 @@ public void testJobIsEligibleForMigration_givenClosedJob() { Job closedJob = JobTests.buildJobBuilder("closed-job").build(); MlMetadata.Builder mlMetadata = new MlMetadata.Builder().putJob(closedJob, false); + MetaData.Builder metaData = MetaData.builder(); + RoutingTable.Builder routingTable = RoutingTable.builder(); + addMlConfigIndex(metaData, routingTable); + ClusterState clusterState = ClusterState.builder(new ClusterName("migratortests")) - .metaData(MetaData.builder() - .putCustom(MlMetadata.TYPE, mlMetadata.build()) - ) - .build(); + .metaData(metaData.putCustom(MlMetadata.TYPE, mlMetadata.build())) + .routingTable(routingTable.build()) + .build(); Settings settings = newSettings(true); givenClusterSettings(settings); @@ -283,11 +354,14 @@ public void testDatafeedIsEligibleForMigration_givenStoppedDatafeed() { mlMetadata.putDatafeed(createCompatibleDatafeed(job.getId()), Collections.emptyMap()); String datafeedId = "df-" + job.getId(); + MetaData.Builder metaData = MetaData.builder(); + RoutingTable.Builder routingTable = RoutingTable.builder(); + addMlConfigIndex(metaData, routingTable); + ClusterState clusterState = ClusterState.builder(new ClusterName("migratortests")) - .metaData(MetaData.builder() - .putCustom(MlMetadata.TYPE, mlMetadata.build()) - ) - .build(); + .metaData(metaData.putCustom(MlMetadata.TYPE, mlMetadata.build())) + .routingTable(routingTable.build()) + .build(); Settings settings = newSettings(true); givenClusterSettings(settings); diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/integration/MlConfigMigratorIT.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/integration/MlConfigMigratorIT.java index d98abea55535c..87c0e4ac824ce 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/integration/MlConfigMigratorIT.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/integration/MlConfigMigratorIT.java @@ -5,12 +5,20 @@ */ package org.elasticsearch.xpack.ml.integration; +import org.elasticsearch.Version; import org.elasticsearch.action.get.GetResponse; import org.elasticsearch.action.index.IndexResponse; import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterStateUpdateTask; +import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.MetaData; +import org.elasticsearch.cluster.routing.IndexRoutingTable; +import org.elasticsearch.cluster.routing.IndexShardRoutingTable; +import org.elasticsearch.cluster.routing.RecoverySource; +import org.elasticsearch.cluster.routing.RoutingTable; +import org.elasticsearch.cluster.routing.ShardRouting; +import org.elasticsearch.cluster.routing.UnassignedInfo; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.Settings; @@ -19,6 +27,8 @@ import org.elasticsearch.common.xcontent.XContentFactory; import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.common.xcontent.XContentType; +import org.elasticsearch.index.Index; +import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.xpack.core.ml.MlMetadata; import org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfig; import org.elasticsearch.xpack.core.ml.job.config.Job; @@ -117,9 +127,12 @@ public void testMigrateConfigs() throws InterruptedException, IOException { builder.setIndices(Collections.singletonList("beats*")); mlMetadata.putDatafeed(builder.build(), Collections.emptyMap()); + MetaData.Builder metaData = MetaData.builder(); + RoutingTable.Builder routingTable = RoutingTable.builder(); + addMlConfigIndex(metaData, routingTable); ClusterState clusterState = ClusterState.builder(new ClusterName("_name")) - .metaData(MetaData.builder() - .putCustom(MlMetadata.TYPE, mlMetadata.build())) + .metaData(metaData.putCustom(MlMetadata.TYPE, mlMetadata.build())) + .routingTable(routingTable.build()) .build(); doAnswer(invocation -> { @@ -180,10 +193,13 @@ public void testMigrateConfigs_GivenLargeNumberOfJobsAndDatafeeds() throws Inter mlMetadata.putDatafeed(builder.build(), Collections.emptyMap()); } + MetaData.Builder metaData = MetaData.builder(); + RoutingTable.Builder routingTable = RoutingTable.builder(); + addMlConfigIndex(metaData, routingTable); ClusterState clusterState = ClusterState.builder(new ClusterName("_name")) - .metaData(MetaData.builder() - .putCustom(MlMetadata.TYPE, mlMetadata.build())) - .build(); + .metaData(metaData.putCustom(MlMetadata.TYPE, mlMetadata.build())) + .routingTable(routingTable.build()) + .build(); doAnswer(invocation -> { ClusterStateUpdateTask listener = (ClusterStateUpdateTask) invocation.getArguments()[1]; @@ -304,6 +320,49 @@ public void assertSnapshot(MlMetadata expectedMlMetadata) throws IOException { assertEquals(expectedMlMetadata, recoveredMeta); } } + + private void addMlConfigIndex(MetaData.Builder metaData, RoutingTable.Builder routingTable) { + IndexMetaData.Builder indexMetaData = IndexMetaData.builder(AnomalyDetectorsIndex.configIndexName()); + indexMetaData.settings(Settings.builder() + .put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT) + .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1) + .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0) + ); + metaData.put(indexMetaData); + Index index = new Index(AnomalyDetectorsIndex.configIndexName(), "_uuid"); + ShardId shardId = new ShardId(index, 0); + ShardRouting shardRouting = ShardRouting.newUnassigned(shardId, true, RecoverySource.EmptyStoreRecoverySource.INSTANCE, + new UnassignedInfo(UnassignedInfo.Reason.INDEX_CREATED, "")); + shardRouting = shardRouting.initialize("node_id", null, 0L); + shardRouting = shardRouting.moveToStarted(); + routingTable.add(IndexRoutingTable.builder(index) + .addIndexShard(new IndexShardRoutingTable.Builder(shardId).addShard(shardRouting).build())); + } + + public void testConfigIndexIsCreated() throws Exception { + // and jobs and datafeeds clusterstate + MlMetadata.Builder mlMetadata = new MlMetadata.Builder(); + mlMetadata.putJob(buildJobBuilder("job-foo").build(), false); + + ClusterState clusterState = ClusterState.builder(new ClusterName("_name")) + .metaData(MetaData.builder().putCustom(MlMetadata.TYPE, mlMetadata.build())) + .build(); + + AtomicReference exceptionHolder = new AtomicReference<>(); + AtomicReference responseHolder = new AtomicReference<>(); + MlConfigMigrator mlConfigMigrator = new MlConfigMigrator(nodeSettings(), client(), clusterService); + + // if the cluster state has a job config and the index does not + // exist it should be created + blockingCall(actionListener -> mlConfigMigrator.migrateConfigsWithoutTasks(clusterState, actionListener), + responseHolder, exceptionHolder); + + assertBusy(() -> assertTrue(configIndexExists())); + } + + private boolean configIndexExists() { + return client().admin().indices().prepareExists(AnomalyDetectorsIndex.configIndexName()).get().isExists(); + } } diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/JobManagerTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/JobManagerTests.java index a24950ed0918e..c52a5a592d817 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/JobManagerTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/JobManagerTests.java @@ -8,13 +8,21 @@ import org.elasticsearch.ElasticsearchStatusException; import org.elasticsearch.ResourceAlreadyExistsException; import org.elasticsearch.ResourceNotFoundException; +import org.elasticsearch.Version; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.get.GetResponse; import org.elasticsearch.client.Client; import org.elasticsearch.cluster.AckedClusterStateUpdateTask; import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.MetaData; +import org.elasticsearch.cluster.routing.IndexRoutingTable; +import org.elasticsearch.cluster.routing.IndexShardRoutingTable; +import org.elasticsearch.cluster.routing.RecoverySource; +import org.elasticsearch.cluster.routing.RoutingTable; +import org.elasticsearch.cluster.routing.ShardRouting; +import org.elasticsearch.cluster.routing.UnassignedInfo; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.document.DocumentField; @@ -25,7 +33,9 @@ import org.elasticsearch.common.xcontent.XContentFactory; import org.elasticsearch.env.Environment; import org.elasticsearch.env.TestEnvironment; +import org.elasticsearch.index.Index; import org.elasticsearch.index.analysis.AnalysisRegistry; +import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.persistent.PersistentTasksCustomMetaData; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.threadpool.ThreadPool; @@ -448,9 +458,29 @@ public void testNotifyFilterChangedGivenOnlyRemovedItems() throws IOException { public void testUpdateJob_notAllowedPreMigration() { MlMetadata.Builder mlmetadata = new MlMetadata.Builder().putJob(buildJobBuilder("closed-job-not-migrated").build(), false); + + MetaData.Builder metaData = MetaData.builder(); + RoutingTable.Builder routingTable = RoutingTable.builder(); + + IndexMetaData.Builder indexMetaData = IndexMetaData.builder(AnomalyDetectorsIndex.configIndexName()); + indexMetaData.settings(Settings.builder() + .put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT) + .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1) + .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0) + ); + metaData.put(indexMetaData); + Index index = new Index(AnomalyDetectorsIndex.configIndexName(), "_uuid"); + ShardId shardId = new ShardId(index, 0); + ShardRouting shardRouting = ShardRouting.newUnassigned(shardId, true, RecoverySource.EmptyStoreRecoverySource.INSTANCE, + new UnassignedInfo(UnassignedInfo.Reason.INDEX_CREATED, "")); + shardRouting = shardRouting.initialize("node_id", null, 0L); + shardRouting = shardRouting.moveToStarted(); + routingTable.add(IndexRoutingTable.builder(index) + .addIndexShard(new IndexShardRoutingTable.Builder(shardId).addShard(shardRouting).build())); + ClusterState clusterState = ClusterState.builder(new ClusterName("_name")) - .metaData(MetaData.builder() - .putCustom(MlMetadata.TYPE, mlmetadata.build())) + .metaData(metaData.putCustom(MlMetadata.TYPE, mlmetadata.build())) + .routingTable(routingTable.build()) .build(); when(clusterService.state()).thenReturn(clusterState);