From af6c1b91e52c84c4da8c03b4d26474072db993c2 Mon Sep 17 00:00:00 2001 From: David Kyle Date: Mon, 13 Aug 2018 14:48:45 +0100 Subject: [PATCH] [ML] Job config document CRUD operations (#32738) --- .../xpack/core/ml/job/config/Job.java | 23 + .../ml/job/persistence/JobConfigProvider.java | 541 ++++++++++++++++++ .../xpack/ml/MlSingleNodeTestCase.java | 27 + .../AutodetectResultProcessorIT.java | 25 +- .../ml/integration/JobConfigProviderIT.java | 351 ++++++++++++ .../ml/integration/JobResultsProviderIT.java | 30 - .../persistence/JobConfigProviderTests.java | 96 ++++ 7 files changed, 1039 insertions(+), 54 deletions(-) create mode 100644 x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobConfigProvider.java create mode 100644 x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/integration/JobConfigProviderIT.java create mode 100644 x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/persistence/JobConfigProviderTests.java diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/config/Job.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/config/Job.java index 10bfaf7e53d14..292d4dd85014e 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/config/Job.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/config/Job.java @@ -244,6 +244,25 @@ public Job(StreamInput in) throws IOException { deleted = in.readBoolean(); } + /** + * Get the persisted job document name from the Job Id. + * Throws if {@code jobId} is not a valid job Id. + * + * @param jobId The job id + * @return The id of document the job is persisted in + */ + public static String documentId(String jobId) { + if (!MlStrings.isValidId(jobId)) { + throw new IllegalArgumentException(Messages.getMessage(Messages.INVALID_ID, ID.getPreferredName(), jobId)); + } + if (!MlStrings.hasValidLengthForId(jobId)) { + throw new IllegalArgumentException(Messages.getMessage(Messages.JOB_CONFIG_ID_TOO_LONG, MlStrings.ID_LENGTH_LIMIT)); + } + + return "job-" + jobId; + } + + /** * Return the Job Id. * @@ -759,6 +778,10 @@ public void setGroups(List groups) { this.groups = groups == null ? Collections.emptyList() : groups; } + public List getGroups() { + return groups; + } + public Builder setCustomSettings(Map customSettings) { this.customSettings = customSettings; return this; diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobConfigProvider.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobConfigProvider.java new file mode 100644 index 0000000000000..3166ca33c5bb9 --- /dev/null +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobConfigProvider.java @@ -0,0 +1,541 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.ml.job.persistence; + +import org.elasticsearch.ElasticsearchParseException; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.DocWriteRequest; +import org.elasticsearch.action.DocWriteResponse; +import org.elasticsearch.action.delete.DeleteAction; +import org.elasticsearch.action.delete.DeleteRequest; +import org.elasticsearch.action.delete.DeleteResponse; +import org.elasticsearch.action.get.GetAction; +import org.elasticsearch.action.get.GetRequest; +import org.elasticsearch.action.get.GetResponse; +import org.elasticsearch.action.index.IndexAction; +import org.elasticsearch.action.index.IndexRequest; +import org.elasticsearch.action.index.IndexResponse; +import org.elasticsearch.action.search.SearchRequest; +import org.elasticsearch.action.search.SearchResponse; +import org.elasticsearch.action.support.IndicesOptions; +import org.elasticsearch.client.Client; +import org.elasticsearch.common.Strings; +import org.elasticsearch.common.bytes.BytesReference; +import org.elasticsearch.common.component.AbstractComponent; +import org.elasticsearch.common.regex.Regex; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.unit.ByteSizeValue; +import org.elasticsearch.common.xcontent.LoggingDeprecationHandler; +import org.elasticsearch.common.xcontent.NamedXContentRegistry; +import org.elasticsearch.common.xcontent.ToXContent; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.common.xcontent.XContentFactory; +import org.elasticsearch.common.xcontent.XContentParser; +import org.elasticsearch.common.xcontent.XContentType; +import org.elasticsearch.index.query.BoolQueryBuilder; +import org.elasticsearch.index.query.QueryBuilder; +import org.elasticsearch.index.query.TermQueryBuilder; +import org.elasticsearch.index.query.TermsQueryBuilder; +import org.elasticsearch.index.query.WildcardQueryBuilder; +import org.elasticsearch.search.SearchHit; +import org.elasticsearch.search.builder.SearchSourceBuilder; +import org.elasticsearch.xpack.core.ml.job.config.Job; +import org.elasticsearch.xpack.core.ml.job.config.JobUpdate; +import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndex; +import org.elasticsearch.xpack.core.ml.job.persistence.ElasticsearchMappings; +import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper; + +import java.io.IOException; +import java.io.InputStream; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashSet; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; +import java.util.Set; +import java.util.stream.Collectors; + +import static org.elasticsearch.xpack.core.ClientHelper.ML_ORIGIN; +import static org.elasticsearch.xpack.core.ClientHelper.executeAsyncWithOrigin; + +/** + * This class implements CRUD operation for the + * anomaly detector job configuration document + */ +public class JobConfigProvider extends AbstractComponent { + + public static String ALL = "_all"; + + private final Client client; + + public JobConfigProvider(Client client, Settings settings) { + super(settings); + this.client = client; + } + + /** + * Persist the anomaly detector job configuration to the configuration index. + * It is an error if an job with the same Id already exists - the config will + * not be overwritten. + * + * @param job The anomaly detector job configuration + * @param listener Index response listener + */ + public void putJob(Job job, ActionListener listener) { + try (XContentBuilder builder = XContentFactory.jsonBuilder()) { + XContentBuilder source = job.toXContent(builder, ToXContent.EMPTY_PARAMS); + IndexRequest indexRequest = client.prepareIndex(AnomalyDetectorsIndex.configIndexName(), + ElasticsearchMappings.DOC_TYPE, Job.documentId(job.getId())) + .setSource(source) + .setOpType(DocWriteRequest.OpType.CREATE) + .request(); + + executeAsyncWithOrigin(client, ML_ORIGIN, IndexAction.INSTANCE, indexRequest, listener); + + } catch (IOException e) { + listener.onFailure(new ElasticsearchParseException("Failed to serialise job with id [" + job.getId() + "]", e)); + } + } + + /** + * Get the anomaly detector job specified by {@code jobId}. + * If the job is missing a {@code ResourceNotFoundException} is returned + * via the listener. + * + * @param jobId The job ID + * @param jobListener Job listener + */ + public void getJob(String jobId, ActionListener jobListener) { + GetRequest getRequest = new GetRequest(AnomalyDetectorsIndex.configIndexName(), + ElasticsearchMappings.DOC_TYPE, Job.documentId(jobId)); + + executeAsyncWithOrigin(client, ML_ORIGIN, GetAction.INSTANCE, getRequest, new ActionListener() { + @Override + public void onResponse(GetResponse getResponse) { + if (getResponse.isExists() == false) { + jobListener.onFailure(ExceptionsHelper.missingJobException(jobId)); + return; + } + + BytesReference source = getResponse.getSourceAsBytesRef(); + parseJobLenientlyFromSource(source, jobListener); + } + + @Override + public void onFailure(Exception e) { + jobListener.onFailure(e); + } + }); + } + + /** + * Delete the anomaly detector job config document + * + * @param jobId The job id + * @param actionListener Deleted job listener + */ + public void deleteJob(String jobId, ActionListener actionListener) { + DeleteRequest request = new DeleteRequest(AnomalyDetectorsIndex.configIndexName(), + ElasticsearchMappings.DOC_TYPE, Job.documentId(jobId)); + + executeAsyncWithOrigin(client, ML_ORIGIN, DeleteAction.INSTANCE, request, new ActionListener() { + @Override + public void onResponse(DeleteResponse deleteResponse) { + if (deleteResponse.getResult() == DocWriteResponse.Result.NOT_FOUND) { + actionListener.onFailure(ExceptionsHelper.missingJobException(jobId)); + return; + } + + assert deleteResponse.getResult() == DocWriteResponse.Result.DELETED; + actionListener.onResponse(deleteResponse); + } + + @Override + public void onFailure(Exception e) { + actionListener.onFailure(e); + } + }); + } + + /** + * Get the job and update it by applying {@code jobUpdater} then index the changed job + * setting the version in the request. Applying the update may cause a validation error + * which is returned via {@code updatedJobListener} + * + * @param jobId The Id of the job to update + * @param update The job update + * @param maxModelMemoryLimit The maximum model memory allowed + * @param updatedJobListener Updated job listener + */ + public void updateJob(String jobId, JobUpdate update, ByteSizeValue maxModelMemoryLimit, ActionListener updatedJobListener) { + GetRequest getRequest = new GetRequest(AnomalyDetectorsIndex.configIndexName(), + ElasticsearchMappings.DOC_TYPE, Job.documentId(jobId)); + + executeAsyncWithOrigin(client, ML_ORIGIN, GetAction.INSTANCE, getRequest, new ActionListener() { + @Override + public void onResponse(GetResponse getResponse) { + if (getResponse.isExists() == false) { + updatedJobListener.onFailure(ExceptionsHelper.missingJobException(jobId)); + return; + } + + long version = getResponse.getVersion(); + BytesReference source = getResponse.getSourceAsBytesRef(); + Job.Builder jobBuilder; + try { + jobBuilder = parseJobLenientlyFromSource(source); + } catch (IOException e) { + updatedJobListener.onFailure(new ElasticsearchParseException("failed to parse " + getResponse.getType(), e)); + return; + } + + Job updatedJob; + try { + // Applying the update may result in a validation error + updatedJob = update.mergeWithJob(jobBuilder.build(), maxModelMemoryLimit); + } catch (Exception e) { + updatedJobListener.onFailure(e); + return; + } + + try (XContentBuilder builder = XContentFactory.jsonBuilder()) { + XContentBuilder updatedSource = updatedJob.toXContent(builder, ToXContent.EMPTY_PARAMS); + IndexRequest indexRequest = client.prepareIndex(AnomalyDetectorsIndex.configIndexName(), + ElasticsearchMappings.DOC_TYPE, Job.documentId(updatedJob.getId())) + .setSource(updatedSource) + .setVersion(version) + .request(); + + executeAsyncWithOrigin(client, ML_ORIGIN, IndexAction.INSTANCE, indexRequest, ActionListener.wrap( + indexResponse -> { + assert indexResponse.getResult() == DocWriteResponse.Result.UPDATED; + updatedJobListener.onResponse(updatedJob); + }, + updatedJobListener::onFailure + )); + + } catch (IOException e) { + updatedJobListener.onFailure( + new ElasticsearchParseException("Failed to serialise job with id [" + jobId + "]", e)); + } + + + } + + @Override + public void onFailure(Exception e) { + updatedJobListener.onFailure(e); + } + }); + } + + /** + * Expands an expression into the set of matching names. {@code expresssion} + * may be a wildcard, a job group, a job ID or a list of those. + * If {@code expression} == 'ALL', '*' or the empty string then all + * job IDs are returned. + * Job groups are expanded to all the jobs IDs in that group. + * + * For example, given a set of names ["foo-1", "foo-2", "bar-1", bar-2"], + * expressions resolve follows: + *
    + *
  • "foo-1" : ["foo-1"]
  • + *
  • "bar-1" : ["bar-1"]
  • + *
  • "foo-1,foo-2" : ["foo-1", "foo-2"]
  • + *
  • "foo-*" : ["foo-1", "foo-2"]
  • + *
  • "*-1" : ["bar-1", "foo-1"]
  • + *
  • "*" : ["bar-1", "bar-2", "foo-1", "foo-2"]
  • + *
  • "_all" : ["bar-1", "bar-2", "foo-1", "foo-2"]
  • + *
+ * + * @param expression the expression to resolve + * @param allowNoJobs if {@code false}, an error is thrown when no name matches the {@code expression}. + * This only applies to wild card expressions, if {@code expression} is not a + * wildcard then setting this true will not suppress the exception + * @param listener The expanded job IDs listener + */ + public void expandJobsIds(String expression, boolean allowNoJobs, ActionListener> listener) { + String [] tokens = tokenizeExpression(expression); + SearchSourceBuilder sourceBuilder = new SearchSourceBuilder().query(buildQuery(tokens)); + sourceBuilder.sort(Job.ID.getPreferredName()); + String [] includes = new String[] {Job.ID.getPreferredName(), Job.GROUPS.getPreferredName()}; + sourceBuilder.fetchSource(includes, null); + + SearchRequest searchRequest = client.prepareSearch(AnomalyDetectorsIndex.configIndexName()) + .setIndicesOptions(IndicesOptions.lenientExpandOpen()) + .setSource(sourceBuilder).request(); + + LinkedList requiredMatches = requiredMatches(tokens, allowNoJobs); + + executeAsyncWithOrigin(client.threadPool().getThreadContext(), ML_ORIGIN, searchRequest, + ActionListener.wrap( + response -> { + Set jobIds = new HashSet<>(); + Set groupsIds = new HashSet<>(); + SearchHit[] hits = response.getHits().getHits(); + for (SearchHit hit : hits) { + jobIds.add((String)hit.getSourceAsMap().get(Job.ID.getPreferredName())); + List groups = (List)hit.getSourceAsMap().get(Job.GROUPS.getPreferredName()); + if (groups != null) { + groupsIds.addAll(groups); + } + } + + groupsIds.addAll(jobIds); + filterMatchedIds(requiredMatches, groupsIds); + if (requiredMatches.isEmpty() == false) { + // some required jobs were not found + String missing = requiredMatches.stream().map(IdMatcher::getId).collect(Collectors.joining(",")); + listener.onFailure(ExceptionsHelper.missingJobException(missing)); + return; + } + + listener.onResponse(jobIds); + }, + listener::onFailure) + , client::search); + + } + + /** + * The same logic as {@link #expandJobsIds(String, boolean, ActionListener)} but + * the full anomaly detector job configuration is returned. + * + * See {@link #expandJobsIds(String, boolean, ActionListener)} + * + * @param expression the expression to resolve + * @param allowNoJobs if {@code false}, an error is thrown when no name matches the {@code expression}. + * This only applies to wild card expressions, if {@code expression} is not a + * wildcard then setting this true will not suppress the exception + * @param listener The expanded jobs listener + */ + // NORELEASE jobs should be paged or have a mechanism to return all jobs if there are many of them + public void expandJobs(String expression, boolean allowNoJobs, ActionListener> listener) { + String [] tokens = tokenizeExpression(expression); + SearchSourceBuilder sourceBuilder = new SearchSourceBuilder().query(buildQuery(tokens)); + sourceBuilder.sort(Job.ID.getPreferredName()); + + SearchRequest searchRequest = client.prepareSearch(AnomalyDetectorsIndex.configIndexName()) + .setIndicesOptions(IndicesOptions.lenientExpandOpen()) + .setSource(sourceBuilder).request(); + + LinkedList requiredMatches = requiredMatches(tokens, allowNoJobs); + + executeAsyncWithOrigin(client.threadPool().getThreadContext(), ML_ORIGIN, searchRequest, + ActionListener.wrap( + response -> { + List jobs = new ArrayList<>(); + Set jobAndGroupIds = new HashSet<>(); + + SearchHit[] hits = response.getHits().getHits(); + for (SearchHit hit : hits) { + try { + BytesReference source = hit.getSourceRef(); + Job.Builder job = parseJobLenientlyFromSource(source); + jobs.add(job); + jobAndGroupIds.add(job.getId()); + jobAndGroupIds.addAll(job.getGroups()); + } catch (IOException e) { + // TODO A better way to handle this rather than just ignoring the error? + logger.error("Error parsing anomaly detector job configuration [" + hit.getId() + "]", e); + } + } + + filterMatchedIds(requiredMatches, jobAndGroupIds); + if (requiredMatches.isEmpty() == false) { + // some required jobs were not found + String missing = requiredMatches.stream().map(IdMatcher::getId).collect(Collectors.joining(",")); + listener.onFailure(ExceptionsHelper.missingJobException(missing)); + return; + } + + listener.onResponse(jobs); + }, + listener::onFailure) + , client::search); + + } + + private void parseJobLenientlyFromSource(BytesReference source, ActionListener jobListener) { + try (InputStream stream = source.streamInput(); + XContentParser parser = XContentFactory.xContent(XContentType.JSON) + .createParser(NamedXContentRegistry.EMPTY, LoggingDeprecationHandler.INSTANCE, stream)) { + jobListener.onResponse(Job.LENIENT_PARSER.apply(parser, null)); + } catch (Exception e) { + jobListener.onFailure(e); + } + } + + private Job.Builder parseJobLenientlyFromSource(BytesReference source) throws IOException { + try (InputStream stream = source.streamInput(); + XContentParser parser = XContentFactory.xContent(XContentType.JSON) + .createParser(NamedXContentRegistry.EMPTY, LoggingDeprecationHandler.INSTANCE, stream)) { + return Job.LENIENT_PARSER.apply(parser, null); + } + } + + private QueryBuilder buildQuery(String [] tokens) { + QueryBuilder jobQuery = new TermQueryBuilder(Job.JOB_TYPE.getPreferredName(), Job.ANOMALY_DETECTOR_JOB_TYPE); + if (isWildcardAll(tokens)) { + // match all + return jobQuery; + } + + BoolQueryBuilder boolQueryBuilder = new BoolQueryBuilder(); + boolQueryBuilder.filter(jobQuery); + BoolQueryBuilder shouldQueries = new BoolQueryBuilder(); + + List terms = new ArrayList<>(); + for (String token : tokens) { + if (Regex.isSimpleMatchPattern(token)) { + shouldQueries.should(new WildcardQueryBuilder(Job.ID.getPreferredName(), token)); + shouldQueries.should(new WildcardQueryBuilder(Job.GROUPS.getPreferredName(), token)); + } else { + terms.add(token); + } + } + + if (terms.isEmpty() == false) { + shouldQueries.should(new TermsQueryBuilder(Job.ID.getPreferredName(), terms)); + shouldQueries.should(new TermsQueryBuilder(Job.GROUPS.getPreferredName(), terms)); + } + + if (shouldQueries.should().isEmpty() == false) { + boolQueryBuilder.filter(shouldQueries); + } + + return boolQueryBuilder; + } + + /** + * Does the {@code tokens} array resolves to a wildcard all expression. + * True if {@code tokens} is empty or if it contains a single element + * equal to {@link #ALL}, '*' or an empty string + * + * @param tokens Expression tokens + * @return True if tokens resolves to a wildcard all expression + */ + static boolean isWildcardAll(String [] tokens) { + if (tokens.length == 0) { + return true; + } + return tokens.length == 1 && (ALL.equals(tokens[0]) || Regex.isMatchAllPattern(tokens[0]) || tokens[0].isEmpty()); + } + + static String [] tokenizeExpression(String expression) { + return Strings.tokenizeToStringArray(expression, ","); + } + + /** + * Generate the list of required matches from the expressions in {@code tokens} + * + * @param tokens List of expressions that may be wildcards or full Ids + * @param allowNoJobForWildcards If true then it is not required for wildcard + * expressions to match an Id meaning they are + * not returned in the list of required matches + * @return A list of required Id matchers + */ + static LinkedList requiredMatches(String [] tokens, boolean allowNoJobForWildcards) { + LinkedList matchers = new LinkedList<>(); + + if (isWildcardAll(tokens)) { + // if allowNoJobForWildcards == true then any number + // of jobs with any id is ok. Therefore no matches + // are required + + if (allowNoJobForWildcards == false) { + // require something, anything to match + matchers.add(new WildcardMatcher("*")); + } + return matchers; + } + + if (allowNoJobForWildcards) { + // matches are not required for wildcards but + // specific job Ids are + for (String token : tokens) { + if (Regex.isSimpleMatchPattern(token) == false) { + matchers.add(new EqualsIdMatcher(token)); + } + } + } else { + // Matches are required for wildcards + for (String token : tokens) { + if (Regex.isSimpleMatchPattern(token)) { + matchers.add(new WildcardMatcher(token)); + } else { + matchers.add(new EqualsIdMatcher(token)); + } + } + } + + return matchers; + } + + /** + * For each given {@code requiredMatchers} check there is an element + * present in {@code ids} that matches. Once a match is made the + * matcher is popped from {@code requiredMatchers}. + * + * If all matchers are satisfied the list {@code requiredMatchers} will + * be empty after the call otherwise only the unmatched remain. + * + * @param requiredMatchers This is modified by the function: all matched matchers + * are removed from the list. At the end of the call only + * the unmatched ones are in this list + * @param ids Ids required to be matched + */ + static void filterMatchedIds(LinkedList requiredMatchers, Collection ids) { + for (String id: ids) { + Iterator itr = requiredMatchers.iterator(); + if (itr.hasNext() == false) { + break; + } + while (itr.hasNext()) { + if (itr.next().matches(id)) { + itr.remove(); + } + } + } + } + + abstract static class IdMatcher { + protected final String id; + + IdMatcher(String id) { + this.id = id; + } + + public String getId() { + return id; + } + + public abstract boolean matches(String jobId); + } + + static class EqualsIdMatcher extends IdMatcher { + EqualsIdMatcher(String id) { + super(id); + } + + @Override + public boolean matches(String id) { + return this.id.equals(id); + } + } + + static class WildcardMatcher extends IdMatcher { + WildcardMatcher(String id) { + super(id); + } + + @Override + public boolean matches(String id) { + return Regex.simpleMatch(this.id, id); + } + } +} diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/MlSingleNodeTestCase.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/MlSingleNodeTestCase.java index 7171f15218635..83c3d75a612ca 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/MlSingleNodeTestCase.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/MlSingleNodeTestCase.java @@ -5,11 +5,17 @@ */ package org.elasticsearch.xpack.ml; +import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.license.LicenseService; +import org.elasticsearch.plugins.Plugin; import org.elasticsearch.test.ESSingleNodeTestCase; +import org.elasticsearch.xpack.core.XPackSettings; import org.elasticsearch.xpack.core.ml.MachineLearningField; +import java.util.Collection; + /** * An extention to {@link ESSingleNodeTestCase} that adds node settings specifically needed for ML test cases. */ @@ -18,10 +24,31 @@ public abstract class MlSingleNodeTestCase extends ESSingleNodeTestCase { @Override protected Settings nodeSettings() { Settings.Builder newSettings = Settings.builder(); + newSettings.put(super.nodeSettings()); + // Disable native ML autodetect_process as the c++ controller won't be available newSettings.put(MachineLearningField.AUTODETECT_PROCESS.getKey(), false); + newSettings.put(MachineLearningField.MAX_MODEL_MEMORY_LIMIT.getKey(), new ByteSizeValue(1024)); newSettings.put(LicenseService.SELF_GENERATED_LICENSE_TYPE.getKey(), "trial"); + // Disable security otherwise delete-by-query action fails to get authorized + newSettings.put(XPackSettings.SECURITY_ENABLED.getKey(), false); + newSettings.put(XPackSettings.MONITORING_ENABLED.getKey(), false); + newSettings.put(XPackSettings.WATCHER_ENABLED.getKey(), false); return newSettings.build(); } + @Override + protected Collection> getPlugins() { + return pluginList(LocalStateMachineLearning.class); + } + + protected void waitForMlTemplates() throws Exception { + // block until the templates are installed + assertBusy(() -> { + ClusterState state = client().admin().cluster().prepareState().get().getState(); + assertTrue("Timed out waiting for the ML templates to be installed", + MachineLearning.allTemplatesInstalled(state)); + }); + } + } diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/integration/AutodetectResultProcessorIT.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/integration/AutodetectResultProcessorIT.java index be2e7146b52be..072f3941660fe 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/integration/AutodetectResultProcessorIT.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/integration/AutodetectResultProcessorIT.java @@ -6,7 +6,6 @@ package org.elasticsearch.xpack.ml.integration; import org.elasticsearch.action.support.master.AcknowledgedResponse; -import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.routing.UnassignedInfo; import org.elasticsearch.common.io.stream.Writeable; import org.elasticsearch.common.settings.Settings; @@ -14,7 +13,6 @@ import org.elasticsearch.common.xcontent.ToXContent; import org.elasticsearch.index.reindex.ReindexPlugin; import org.elasticsearch.plugins.Plugin; -import org.elasticsearch.xpack.core.XPackSettings; import org.elasticsearch.xpack.core.ml.action.DeleteJobAction; import org.elasticsearch.xpack.core.ml.action.PutJobAction; import org.elasticsearch.xpack.core.ml.action.util.QueryPage; @@ -33,7 +31,6 @@ import org.elasticsearch.xpack.core.ml.job.results.Influencer; import org.elasticsearch.xpack.core.ml.job.results.ModelPlot; import org.elasticsearch.xpack.ml.LocalStateMachineLearning; -import org.elasticsearch.xpack.ml.MachineLearning; import org.elasticsearch.xpack.ml.MlSingleNodeTestCase; import org.elasticsearch.xpack.ml.job.persistence.BucketsQueryBuilder; import org.elasticsearch.xpack.ml.job.persistence.InfluencersQueryBuilder; @@ -78,17 +75,6 @@ public class AutodetectResultProcessorIT extends MlSingleNodeTestCase { private AutoDetectResultProcessor resultProcessor; private Renormalizer renormalizer; - @Override - protected Settings nodeSettings() { - Settings.Builder newSettings = Settings.builder(); - newSettings.put(super.nodeSettings()); - // Disable security otherwise delete-by-query action fails to get authorized - newSettings.put(XPackSettings.SECURITY_ENABLED.getKey(), false); - newSettings.put(XPackSettings.MONITORING_ENABLED.getKey(), false); - newSettings.put(XPackSettings.WATCHER_ENABLED.getKey(), false); - return newSettings.build(); - } - @Override protected Collection> getPlugins() { return pluginList(LocalStateMachineLearning.class, ReindexPlugin.class); @@ -109,7 +95,7 @@ protected void updateModelSnapshotIdOnJob(ModelSnapshot modelSnapshot) { capturedUpdateModelSnapshotOnJobRequests.add(modelSnapshot); } }; - putIndexTemplates(); + waitForMlTemplates(); putJob(); } @@ -288,15 +274,6 @@ public void testEndOfStreamTriggersPersisting() throws Exception { assertResultsAreSame(allRecords, persistedRecords); } - private void putIndexTemplates() throws Exception { - // block until the templates are installed - assertBusy(() -> { - ClusterState state = client().admin().cluster().prepareState().get().getState(); - assertTrue("Timed out waiting for the ML templates to be installed", - MachineLearning.allTemplatesInstalled(state)); - }); - } - private void putJob() { Detector detector = new Detector.Builder("dc", "by_instance").build(); Job.Builder jobBuilder = new Job.Builder(JOB_ID); diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/integration/JobConfigProviderIT.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/integration/JobConfigProviderIT.java new file mode 100644 index 0000000000000..fb82b1c74d0eb --- /dev/null +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/integration/JobConfigProviderIT.java @@ -0,0 +1,351 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.ml.integration; + +import org.elasticsearch.ElasticsearchStatusException; +import org.elasticsearch.ResourceNotFoundException; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.DocWriteResponse; +import org.elasticsearch.action.delete.DeleteResponse; +import org.elasticsearch.action.index.IndexResponse; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.unit.ByteSizeValue; +import org.elasticsearch.index.engine.VersionConflictEngineException; +import org.elasticsearch.xpack.core.ml.job.config.AnalysisConfig; +import org.elasticsearch.xpack.core.ml.job.config.DataDescription; +import org.elasticsearch.xpack.core.ml.job.config.DetectionRule; +import org.elasticsearch.xpack.core.ml.job.config.Detector; +import org.elasticsearch.xpack.core.ml.job.config.Job; +import org.elasticsearch.xpack.core.ml.job.config.JobUpdate; +import org.elasticsearch.xpack.core.ml.job.config.RuleScope; +import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndex; +import org.elasticsearch.xpack.ml.MlSingleNodeTestCase; +import org.elasticsearch.xpack.ml.job.persistence.JobConfigProvider; +import org.junit.Before; + +import java.util.Arrays; +import java.util.Collections; +import java.util.Date; +import java.util.List; +import java.util.Set; +import java.util.TreeSet; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Consumer; +import java.util.stream.Collectors; + +import static org.hamcrest.Matchers.containsInAnyOrder; +import static org.hamcrest.Matchers.containsString; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.core.IsInstanceOf.instanceOf; + +public class JobConfigProviderIT extends MlSingleNodeTestCase { + + private JobConfigProvider jobConfigProvider; + + @Before + public void createComponents() throws Exception { + jobConfigProvider = new JobConfigProvider(client(), Settings.EMPTY); + waitForMlTemplates(); + } + + public void testGetMissingJob() throws InterruptedException { + AtomicReference jobHolder = new AtomicReference<>(); + AtomicReference exceptionHolder = new AtomicReference<>(); + + blockingCall(actionListener -> jobConfigProvider.getJob("missing", actionListener), jobHolder, exceptionHolder); + + assertNull(jobHolder.get()); + assertNotNull(exceptionHolder.get()); + assertThat(exceptionHolder.get(), instanceOf(ResourceNotFoundException.class)); + } + + public void testOverwriteNotAllowed() throws InterruptedException { + final String jobId = "same-id"; + + AtomicReference indexResponseHolder = new AtomicReference<>(); + AtomicReference exceptionHolder = new AtomicReference<>(); + + // Create job + Job initialJob = createJob(jobId, null).build(new Date()); + blockingCall(actionListener -> jobConfigProvider.putJob(initialJob, actionListener), indexResponseHolder, exceptionHolder); + assertNull(exceptionHolder.get()); + assertNotNull(indexResponseHolder.get()); + + indexResponseHolder.set(null); + Job jobWithSameId = createJob(jobId, null).build(new Date()); + blockingCall(actionListener -> jobConfigProvider.putJob(jobWithSameId, actionListener), indexResponseHolder, exceptionHolder); + assertNull(indexResponseHolder.get()); + assertNotNull(exceptionHolder.get()); + assertThat(exceptionHolder.get(), instanceOf(VersionConflictEngineException.class)); + } + + public void testCrud() throws InterruptedException { + final String jobId = "crud-job"; + + AtomicReference indexResponseHolder = new AtomicReference<>(); + AtomicReference exceptionHolder = new AtomicReference<>(); + + // Create job + Job newJob = createJob(jobId, null).build(new Date()); + blockingCall(actionListener -> jobConfigProvider.putJob(newJob, actionListener), indexResponseHolder, exceptionHolder); + assertNull(exceptionHolder.get()); + assertNotNull(indexResponseHolder.get()); + + // Read Job + AtomicReference getJobResponseHolder = new AtomicReference<>(); + blockingCall(actionListener -> jobConfigProvider.getJob(jobId, actionListener), getJobResponseHolder, exceptionHolder); + assertNull(exceptionHolder.get()); + + assertEquals(newJob, getJobResponseHolder.get().build()); + + // Update Job + indexResponseHolder.set(null); + JobUpdate jobUpdate = new JobUpdate.Builder(jobId).setDescription("This job has been updated").build(); + + AtomicReference updateJobResponseHolder = new AtomicReference<>(); + blockingCall(actionListener -> jobConfigProvider.updateJob(jobId, jobUpdate, new ByteSizeValue(32), actionListener), + updateJobResponseHolder, exceptionHolder); + assertNull(exceptionHolder.get()); + assertEquals("This job has been updated", updateJobResponseHolder.get().getDescription()); + + getJobResponseHolder.set(null); + blockingCall(actionListener -> jobConfigProvider.getJob(jobId, actionListener), getJobResponseHolder, exceptionHolder); + assertNull(exceptionHolder.get()); + assertEquals("This job has been updated", getJobResponseHolder.get().build().getDescription()); + + // Delete Job + AtomicReference deleteJobResponseHolder = new AtomicReference<>(); + blockingCall(actionListener -> jobConfigProvider.deleteJob(jobId, actionListener), + deleteJobResponseHolder, exceptionHolder); + assertNull(exceptionHolder.get()); + assertThat(deleteJobResponseHolder.get().getResult(), equalTo(DocWriteResponse.Result.DELETED)); + + // Read deleted job + getJobResponseHolder.set(null); + blockingCall(actionListener -> jobConfigProvider.getJob(jobId, actionListener), getJobResponseHolder, exceptionHolder); + assertNull(getJobResponseHolder.get()); + assertThat(exceptionHolder.get(), instanceOf(ResourceNotFoundException.class)); + + // Delete deleted job + deleteJobResponseHolder.set(null); + exceptionHolder.set(null); + blockingCall(actionListener -> jobConfigProvider.deleteJob(jobId, actionListener), + deleteJobResponseHolder, exceptionHolder); + assertNull(deleteJobResponseHolder.get()); + assertThat(exceptionHolder.get(), instanceOf(ResourceNotFoundException.class)); + } + + public void testUpdateWithAValidationError() throws Exception { + final String jobId = "bad-update-job"; + + AtomicReference indexResponseHolder = new AtomicReference<>(); + AtomicReference exceptionHolder = new AtomicReference<>(); + + // Create job + Job newJob = createJob(jobId, null).build(new Date()); + blockingCall(actionListener -> jobConfigProvider.putJob(newJob, actionListener), indexResponseHolder, exceptionHolder); + assertNull(exceptionHolder.get()); + assertNotNull(indexResponseHolder.get()); + + DetectionRule rule = new DetectionRule.Builder(RuleScope.builder().exclude("not a used field", "filerfoo")).build(); + JobUpdate.DetectorUpdate detectorUpdate = new JobUpdate.DetectorUpdate(0, null, Collections.singletonList(rule)); + JobUpdate invalidUpdate = new JobUpdate.Builder(jobId) + .setDetectorUpdates(Collections.singletonList(detectorUpdate)) + .build(); + + AtomicReference updateJobResponseHolder = new AtomicReference<>(); + blockingCall(actionListener -> jobConfigProvider.updateJob(jobId, invalidUpdate, new ByteSizeValue(32), actionListener), + updateJobResponseHolder, exceptionHolder); + assertNull(updateJobResponseHolder.get()); + assertNotNull(exceptionHolder.get()); + assertThat(exceptionHolder.get(), instanceOf(ElasticsearchStatusException.class)); + assertThat(exceptionHolder.get().getMessage(), containsString("Invalid detector rule:")); + } + + public void testAllowNoJobs() throws InterruptedException { + AtomicReference> jobIdsHolder = new AtomicReference<>(); + AtomicReference exceptionHolder = new AtomicReference<>(); + + blockingCall(actionListener -> jobConfigProvider.expandJobsIds("_all", false, actionListener), + jobIdsHolder, exceptionHolder); + + assertNull(jobIdsHolder.get()); + assertNotNull(exceptionHolder.get()); + assertThat(exceptionHolder.get(), instanceOf(ResourceNotFoundException.class)); + assertThat(exceptionHolder.get().getMessage(), containsString("No known job with id")); + + exceptionHolder.set(null); + blockingCall(actionListener -> jobConfigProvider.expandJobsIds("_all", true, actionListener), + jobIdsHolder, exceptionHolder); + assertNotNull(jobIdsHolder.get()); + assertNull(exceptionHolder.get()); + + AtomicReference> jobsHolder = new AtomicReference<>(); + blockingCall(actionListener -> jobConfigProvider.expandJobs("*", false, actionListener), + jobsHolder, exceptionHolder); + + assertNull(jobsHolder.get()); + assertNotNull(exceptionHolder.get()); + assertThat(exceptionHolder.get(), instanceOf(ResourceNotFoundException.class)); + assertThat(exceptionHolder.get().getMessage(), containsString("No known job with id")); + + exceptionHolder.set(null); + blockingCall(actionListener -> jobConfigProvider.expandJobs("*", true, actionListener), + jobsHolder, exceptionHolder); + assertNotNull(jobsHolder.get()); + assertNull(exceptionHolder.get()); + } + + public void testExpandJobs_GroupsAndJobIds() throws Exception { + Job tom = putJob(createJob("tom", null)); + Job dick = putJob(createJob("dick", null)); + Job harry = putJob(createJob("harry", Collections.singletonList("harry-group"))); + Job harryJnr = putJob(createJob("harry-jnr", Collections.singletonList("harry-group"))); + + client().admin().indices().prepareRefresh(AnomalyDetectorsIndex.configIndexName()).get(); + + // Job Ids + Set expandedIds = blockingCall(actionListener -> jobConfigProvider.expandJobsIds("_all", true, actionListener)); + assertEquals(new TreeSet<>(Arrays.asList("tom", "dick", "harry", "harry-jnr")), expandedIds); + + expandedIds = blockingCall(actionListener -> jobConfigProvider.expandJobsIds("*", true, actionListener)); + assertEquals(new TreeSet<>(Arrays.asList("tom", "dick", "harry", "harry-jnr")), expandedIds); + + expandedIds = blockingCall(actionListener -> jobConfigProvider.expandJobsIds("tom,harry", true, actionListener)); + assertEquals(new TreeSet<>(Arrays.asList("tom", "harry")), expandedIds); + + expandedIds = blockingCall(actionListener -> jobConfigProvider.expandJobsIds("harry-group,tom", true, actionListener)); + assertEquals(new TreeSet<>(Arrays.asList("harry", "harry-jnr", "tom")), expandedIds); + + AtomicReference exceptionHolder = new AtomicReference<>(); + AtomicReference> jobIdsHolder = new AtomicReference<>(); + blockingCall(actionListener -> jobConfigProvider.expandJobsIds("tom,missing1,missing2", true, actionListener), + jobIdsHolder, exceptionHolder); + assertNull(jobIdsHolder.get()); + assertNotNull(exceptionHolder.get()); + assertThat(exceptionHolder.get(), instanceOf(ResourceNotFoundException.class)); + assertThat(exceptionHolder.get().getMessage(), equalTo("No known job with id 'missing1,missing2'")); + + // Job builders + List expandedJobsBuilders = blockingCall(actionListener -> + jobConfigProvider.expandJobs("harry-group,tom", false, actionListener)); + List expandedJobs = expandedJobsBuilders.stream().map(Job.Builder::build).collect(Collectors.toList()); + assertThat(expandedJobs, containsInAnyOrder(harry, harryJnr, tom)); + + expandedJobsBuilders = blockingCall(actionListener -> + jobConfigProvider.expandJobs("_all", false, actionListener)); + expandedJobs = expandedJobsBuilders.stream().map(Job.Builder::build).collect(Collectors.toList()); + assertThat(expandedJobs, containsInAnyOrder(tom, dick, harry, harryJnr)); + + expandedJobsBuilders = blockingCall(actionListener -> + jobConfigProvider.expandJobs("tom,harry", false, actionListener)); + expandedJobs = expandedJobsBuilders.stream().map(Job.Builder::build).collect(Collectors.toList()); + assertThat(expandedJobs, containsInAnyOrder(tom, harry)); + + expandedJobsBuilders = blockingCall(actionListener -> + jobConfigProvider.expandJobs("", false, actionListener)); + expandedJobs = expandedJobsBuilders.stream().map(Job.Builder::build).collect(Collectors.toList()); + assertThat(expandedJobs, containsInAnyOrder(tom, dick, harry, harryJnr)); + + AtomicReference> jobsHolder = new AtomicReference<>(); + blockingCall(actionListener -> jobConfigProvider.expandJobs("tom,missing1,missing2", false, actionListener), + jobsHolder, exceptionHolder); + assertNull(jobsHolder.get()); + assertNotNull(exceptionHolder.get()); + assertThat(exceptionHolder.get(), instanceOf(ResourceNotFoundException.class)); + assertThat(exceptionHolder.get().getMessage(), equalTo("No known job with id 'missing1,missing2'")); + } + + public void testExpandJobs_WildCardExpansion() throws Exception { + Job foo1 = putJob(createJob("foo-1", null)); + Job foo2 = putJob(createJob("foo-2", null)); + Job bar1 = putJob(createJob("bar-1", Collections.singletonList("bar"))); + Job bar2 = putJob(createJob("bar-2", Collections.singletonList("bar"))); + Job nbar = putJob(createJob("nbar", Collections.singletonList("bar"))); + + client().admin().indices().prepareRefresh(AnomalyDetectorsIndex.configIndexName()).get(); + + // Test job IDs only + Set expandedIds = blockingCall(actionListener -> jobConfigProvider.expandJobsIds("foo*", true, actionListener)); + assertEquals(new TreeSet<>(Arrays.asList("foo-1", "foo-2")), expandedIds); + + expandedIds = blockingCall(actionListener -> jobConfigProvider.expandJobsIds("*-1", true, actionListener)); + assertEquals(new TreeSet<>(Arrays.asList("bar-1", "foo-1")), expandedIds); + + expandedIds = blockingCall(actionListener -> jobConfigProvider.expandJobsIds("bar*", true, actionListener)); + assertEquals(new TreeSet<>(Arrays.asList("bar-1", "bar-2", "nbar")), expandedIds); + + expandedIds = blockingCall(actionListener -> jobConfigProvider.expandJobsIds("b*r-1", true, actionListener)); + assertEquals(new TreeSet<>(Collections.singletonList("bar-1")), expandedIds); + + // Test full job config + List expandedJobsBuilders = blockingCall(actionListener -> jobConfigProvider.expandJobs("foo*", true, actionListener)); + List expandedJobs = expandedJobsBuilders.stream().map(Job.Builder::build).collect(Collectors.toList()); + assertThat(expandedJobs, containsInAnyOrder(foo1, foo2)); + + expandedJobsBuilders = blockingCall(actionListener -> jobConfigProvider.expandJobs("*-1", true, actionListener)); + expandedJobs = expandedJobsBuilders.stream().map(Job.Builder::build).collect(Collectors.toList()); + assertThat(expandedJobs, containsInAnyOrder(foo1, bar1)); + + expandedJobsBuilders = blockingCall(actionListener -> jobConfigProvider.expandJobs("bar*", true, actionListener)); + expandedJobs = expandedJobsBuilders.stream().map(Job.Builder::build).collect(Collectors.toList()); + assertThat(expandedJobs, containsInAnyOrder(bar1, bar2, nbar)); + + expandedJobsBuilders = blockingCall(actionListener -> jobConfigProvider.expandJobs("b*r-1", true, actionListener)); + expandedJobs = expandedJobsBuilders.stream().map(Job.Builder::build).collect(Collectors.toList()); + assertThat(expandedJobs, containsInAnyOrder(bar1)); + } + + private Job.Builder createJob(String jobId, List groups) { + Detector.Builder d1 = new Detector.Builder("info_content", "domain"); + d1.setOverFieldName("client"); + AnalysisConfig.Builder ac = new AnalysisConfig.Builder(Collections.singletonList(d1.build())); + + Job.Builder builder = new Job.Builder(); + builder.setId(jobId); + builder.setAnalysisConfig(ac); + builder.setDataDescription(new DataDescription.Builder()); + if (groups != null && groups.isEmpty() == false) { + builder.setGroups(groups); + } + return builder; + } + + private void blockingCall(Consumer> function, AtomicReference response, + AtomicReference error) throws InterruptedException { + CountDownLatch latch = new CountDownLatch(1); + ActionListener listener = ActionListener.wrap( + r -> { + response.set(r); + latch.countDown(); + }, + e -> { + error.set(e); + latch.countDown(); + } + ); + + function.accept(listener); + latch.await(); + } + + private T blockingCall(Consumer> function) throws Exception { + AtomicReference exceptionHolder = new AtomicReference<>(); + AtomicReference responseHolder = new AtomicReference<>(); + blockingCall(function, responseHolder, exceptionHolder); + if (exceptionHolder.get() != null) { + assertNotNull(exceptionHolder.get().getMessage(), exceptionHolder.get()); + } + return responseHolder.get(); + } + + private Job putJob(Job.Builder job) throws Exception { + Job builtJob = job.build(new Date()); + this.blockingCall(actionListener -> jobConfigProvider.putJob(builtJob, actionListener)); + return builtJob; + } +} diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/integration/JobResultsProviderIT.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/integration/JobResultsProviderIT.java index e36c313b626c9..09651f554d848 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/integration/JobResultsProviderIT.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/integration/JobResultsProviderIT.java @@ -10,7 +10,6 @@ import org.elasticsearch.action.bulk.BulkResponse; import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.support.WriteRequest; -import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.routing.UnassignedInfo; import org.elasticsearch.common.Strings; import org.elasticsearch.common.settings.Settings; @@ -18,8 +17,6 @@ import org.elasticsearch.common.xcontent.ToXContent; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentFactory; -import org.elasticsearch.plugins.Plugin; -import org.elasticsearch.xpack.core.XPackSettings; import org.elasticsearch.xpack.core.ml.MlMetaIndex; import org.elasticsearch.xpack.core.ml.MlMetadata; import org.elasticsearch.xpack.core.ml.action.PutJobAction; @@ -40,8 +37,6 @@ import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSizeStats; import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSnapshot; import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.Quantiles; -import org.elasticsearch.xpack.ml.LocalStateMachineLearning; -import org.elasticsearch.xpack.ml.MachineLearning; import org.elasticsearch.xpack.ml.MlSingleNodeTestCase; import org.elasticsearch.xpack.ml.job.persistence.CalendarQueryBuilder; import org.elasticsearch.xpack.ml.job.persistence.JobDataCountsPersister; @@ -55,7 +50,6 @@ import java.time.ZonedDateTime; import java.util.ArrayList; import java.util.Arrays; -import java.util.Collection; import java.util.Collections; import java.util.Date; import java.util.HashSet; @@ -76,21 +70,6 @@ public class JobResultsProviderIT extends MlSingleNodeTestCase { private JobResultsProvider jobProvider; - @Override - protected Settings nodeSettings() { - Settings.Builder newSettings = Settings.builder(); - newSettings.put(super.nodeSettings()); - newSettings.put(XPackSettings.SECURITY_ENABLED.getKey(), false); - newSettings.put(XPackSettings.MONITORING_ENABLED.getKey(), false); - newSettings.put(XPackSettings.WATCHER_ENABLED.getKey(), false); - return newSettings.build(); - } - - @Override - protected Collection> getPlugins() { - return pluginList(LocalStateMachineLearning.class); - } - @Before public void createComponents() throws Exception { Settings.Builder builder = Settings.builder() @@ -99,15 +78,6 @@ public void createComponents() throws Exception { waitForMlTemplates(); } - private void waitForMlTemplates() throws Exception { - // block until the templates are installed - assertBusy(() -> { - ClusterState state = client().admin().cluster().prepareState().get().getState(); - assertTrue("Timed out waiting for the ML templates to be installed", - MachineLearning.allTemplatesInstalled(state)); - }); - } - public void testGetCalandarByJobId() throws Exception { List calendars = new ArrayList<>(); calendars.add(new Calendar("empty calendar", Collections.emptyList(), null)); diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/persistence/JobConfigProviderTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/persistence/JobConfigProviderTests.java new file mode 100644 index 0000000000000..04bcd57e64fc0 --- /dev/null +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/persistence/JobConfigProviderTests.java @@ -0,0 +1,96 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.ml.job.persistence; + +import org.elasticsearch.test.ESTestCase; + +import java.util.Arrays; +import java.util.Collections; +import java.util.LinkedList; + +import static org.hamcrest.Matchers.empty; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.hasSize; +import static org.hamcrest.Matchers.isOneOf; + +public class JobConfigProviderTests extends ESTestCase { + + public void testMatchingJobIds() { + LinkedList requiredMatches = JobConfigProvider.requiredMatches(new String[] {"*"}, false); + assertThat(requiredMatches, hasSize(1)); + JobConfigProvider.filterMatchedIds(requiredMatches, Collections.singletonList("foo")); + assertThat(requiredMatches, empty()); + + requiredMatches = JobConfigProvider.requiredMatches(JobConfigProvider.tokenizeExpression(""), false); + assertThat(requiredMatches, hasSize(1)); + JobConfigProvider.filterMatchedIds(requiredMatches, Collections.singletonList("foo")); + assertThat(requiredMatches, empty()); + + requiredMatches = JobConfigProvider.requiredMatches(JobConfigProvider.tokenizeExpression(null), false); + assertThat(requiredMatches, hasSize(1)); + JobConfigProvider.filterMatchedIds(requiredMatches, Collections.singletonList("foo")); + assertThat(requiredMatches, empty()); + + requiredMatches = JobConfigProvider.requiredMatches(JobConfigProvider.tokenizeExpression(null), false); + assertThat(requiredMatches, hasSize(1)); + JobConfigProvider.filterMatchedIds(requiredMatches, Collections.emptyList()); + assertThat(requiredMatches, hasSize(1)); + assertThat(requiredMatches.get(0).getId(), equalTo("*")); + + requiredMatches = JobConfigProvider.requiredMatches(JobConfigProvider.tokenizeExpression("_all"), false); + assertThat(requiredMatches, hasSize(1)); + JobConfigProvider.filterMatchedIds(requiredMatches, Collections.singletonList("foo")); + assertThat(requiredMatches, empty()); + + requiredMatches = JobConfigProvider.requiredMatches(new String[] {"foo*"}, false); + assertThat(requiredMatches, hasSize(1)); + JobConfigProvider.filterMatchedIds(requiredMatches, Arrays.asList("foo1","foo2")); + assertThat(requiredMatches, empty()); + + requiredMatches = JobConfigProvider.requiredMatches(new String[] {"foo*","bar"}, false); + assertThat(requiredMatches, hasSize(2)); + JobConfigProvider.filterMatchedIds(requiredMatches, Arrays.asList("foo1","foo2")); + assertThat(requiredMatches, hasSize(1)); + assertEquals("bar", requiredMatches.get(0).getId()); + + requiredMatches = JobConfigProvider.requiredMatches(new String[] {"foo*","bar"}, false); + assertThat(requiredMatches, hasSize(2)); + JobConfigProvider.filterMatchedIds(requiredMatches, Arrays.asList("foo1","bar")); + assertThat(requiredMatches, empty()); + + requiredMatches = JobConfigProvider.requiredMatches(new String[] {"foo*","bar"}, false); + assertThat(requiredMatches, hasSize(2)); + JobConfigProvider.filterMatchedIds(requiredMatches, Collections.singletonList("bar")); + assertThat(requiredMatches, hasSize(1)); + assertEquals("foo*", requiredMatches.get(0).getId()); + + requiredMatches = JobConfigProvider.requiredMatches(JobConfigProvider.tokenizeExpression("foo,bar,baz,wild*"), false); + assertThat(requiredMatches, hasSize(4)); + JobConfigProvider.filterMatchedIds(requiredMatches, Arrays.asList("foo","baz")); + assertThat(requiredMatches, hasSize(2)); + assertThat(requiredMatches.get(0).getId(), isOneOf("bar", "wild*")); + assertThat(requiredMatches.get(1).getId(), isOneOf("bar", "wild*")); + } + + public void testMatchingJobIds_allowNoJobs() { + // wildcard all with allow no jobs + LinkedList requiredMatches = JobConfigProvider.requiredMatches(new String[] {"*"}, true); + assertThat(requiredMatches, empty()); + JobConfigProvider.filterMatchedIds(requiredMatches, Collections.emptyList()); + assertThat(requiredMatches, empty()); + + requiredMatches = JobConfigProvider.requiredMatches(new String[] {"foo*","bar"}, true); + assertThat(requiredMatches, hasSize(1)); + JobConfigProvider.filterMatchedIds(requiredMatches, Collections.singletonList("bar")); + assertThat(requiredMatches, empty()); + + requiredMatches = JobConfigProvider.requiredMatches(new String[] {"foo*","bar"}, true); + assertThat(requiredMatches, hasSize(1)); + JobConfigProvider.filterMatchedIds(requiredMatches, Collections.emptyList()); + assertThat(requiredMatches, hasSize(1)); + assertEquals("bar", requiredMatches.get(0).getId()); + } +}