Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ML] Job config document CRUD operations #32738

Merged
merged 7 commits into from
Aug 13, 2018
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -246,11 +246,19 @@ public Job(StreamInput in) throws IOException {

/**
* Get the persisted job document name from the Job Id.
* Throws if {@code jobId} is not a valid job Id.
*
* @param jobId The job id
* @return The id of document the job is persisted in
*/
public static String documentId(String jobId) {
if (!MlStrings.isValidId(jobId)) {
throw new IllegalArgumentException(Messages.getMessage(Messages.INVALID_ID, ID.getPreferredName(), jobId));
}
if (!MlStrings.hasValidLengthForId(jobId)) {
throw new IllegalArgumentException(Messages.getMessage(Messages.JOB_CONFIG_ID_TOO_LONG, MlStrings.ID_LENGTH_LIMIT));
}

return "job-" + jobId;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

since we're back at having the jobs indexed, we should double check the length limit on the ID is correct

}

Expand Down Expand Up @@ -770,6 +778,10 @@ public void setGroups(List<String> groups) {
this.groups = groups == null ? Collections.emptyList() : groups;
}

public List<String> getGroups() {
return groups;
}

public Builder setCustomSettings(Map<String, Object> customSettings) {
this.customSettings = customSettings;
return this;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,9 +51,13 @@
import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;

import static org.elasticsearch.xpack.core.ClientHelper.ML_ORIGIN;
import static org.elasticsearch.xpack.core.ClientHelper.executeAsyncWithOrigin;
Expand Down Expand Up @@ -232,7 +236,8 @@ public void onFailure(Exception e) {
/**
* Expands an expression into the set of matching names. {@code expresssion}
* may be a wildcard, a job group, a job ID or a list of those.
* If {@code expression} == ALL or * then all job IDs are returned.
* If {@code expression} == 'ALL', '*' or the empty string then all
* job IDs are returned.
* Job groups are expanded to all the jobs IDs in that group.
*
* For example, given a set of names ["foo-1", "foo-2", "bar-1", bar-2"],
Expand All @@ -252,34 +257,43 @@ public void onFailure(Exception e) {
* This only applies to wild card expressions, if {@code expression} is not a
* wildcard then setting this true will not suppress the exception
* @param listener The expanded job IDs listener
* @return the set of matching names
*/
public void expandJobsIds(String expression, boolean allowNoJobs, ActionListener<Set<String>> listener) {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The behaviour of allowNoJobs has changed slightly as it only applies if expression is '*' or '_all' whereas before it applied to each token in a comma delineated list foo-*,bar.

Given that the option isn't documented and the new behaviour matches the description in the API spec I can live with the subtle change

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The intention was that allow_no_jobs would work exactly how allow_no_indices works for index wildcards - see https://www.elastic.co/guide/en/elasticsearch/reference/current/multi-index.html

In the case of foo-*,bar, an exception should always be thrown if bar doesn't exist, and should be thrown if foo-* doesn't exist if and only if allow_no_jobs is false.

The intention was that job wildcard expansion would work like index wildcard expansion, and job groups would work like aliases, so that if anybody asked how they work we'd be able to give a simple answer and people who just assumed there would be consistency between different endpoints would be correct.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The original behaviour could be replicated, one option is to make a query for each token in expression and check the results.

However, maybe this can still be done with a single query. Check the results and throw if a token is not a wildcard and no job with that Id was returned or if the token is a wildcard check for matching job IDs and throw depending on the value of allow_no_jobs. How this could work with paging or a scroll search I do not know.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How this could work with paging or a scroll search I do not know

I think we'd have to set up a map of token to boolean before starting, record what we saw during the paging or scroll, then throw errors at the very end.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I pushed a change to replicate the old allow_no_jobs functionality, it's a little complicated but hopefully the code is self-explanatory (if not it needs a rewrite). This should work with any future scroll search/paging change

SearchSourceBuilder sourceBuilder = new SearchSourceBuilder().query(buildQuery(expression));
String [] tokens = tokenizeExpression(expression);
SearchSourceBuilder sourceBuilder = new SearchSourceBuilder().query(buildQuery(tokens));
sourceBuilder.sort(Job.ID.getPreferredName());
String [] includes = new String[] {Job.ID.getPreferredName()};
String [] includes = new String[] {Job.ID.getPreferredName(), Job.GROUPS.getPreferredName()};
sourceBuilder.fetchSource(includes, null);

SearchRequest searchRequest = client.prepareSearch(AnomalyDetectorsIndex.configIndexName())
.setIndicesOptions(IndicesOptions.lenientExpandOpen())
.setSource(sourceBuilder).request();

final boolean isWildCardExpression = isWildCard(expression);
LinkedList<IdMatcher> requiredMatches = requiredMatches(tokens, allowNoJobs);

executeAsyncWithOrigin(client.threadPool().getThreadContext(), ML_ORIGIN, searchRequest,
ActionListener.<SearchResponse>wrap(
response -> {
Set<String> jobIds = new HashSet<>();
Set<String> groupsIds = new HashSet<>();
SearchHit[] hits = response.getHits().getHits();
for (SearchHit hit : hits) {
jobIds.add((String)hit.getSourceAsMap().get(Job.ID.getPreferredName()));
List<String> groups = (List<String>)hit.getSourceAsMap().get(Job.GROUPS.getPreferredName());
if (groups != null) {
groupsIds.addAll(groups);
}
}

if (hits.length == 0 && isWildCardExpression && allowNoJobs == false) {
listener.onFailure(ExceptionsHelper.missingJobException(expression));
groupsIds.addAll(jobIds);
filterMatchedIds(requiredMatches, groupsIds);
if (requiredMatches.isEmpty() == false) {
// some required jobs were not found
String missing = requiredMatches.stream().map(IdMatcher::getId).collect(Collectors.joining(","));
listener.onFailure(ExceptionsHelper.missingJobException(missing));
return;
}

Set<String> jobIds = new HashSet<>();
for (SearchHit hit : hits) {
jobIds.add((String)hit.getSourceAsMap().get(Job.ID.getPreferredName()));
}
listener.onResponse(jobIds);
},
listener::onFailure)
Expand All @@ -298,54 +312,76 @@ public void expandJobsIds(String expression, boolean allowNoJobs, ActionListener
* This only applies to wild card expressions, if {@code expression} is not a
* wildcard then setting this true will not suppress the exception
* @param listener The expanded jobs listener
* @return The jobs with matching IDs
*/
// NORELEASE jobs should be paged or have a mechanism to return all jobs if there are many of them
public void expandJobs(String expression, boolean allowNoJobs, ActionListener<List<Job.Builder>> listener) {
SearchSourceBuilder sourceBuilder = new SearchSourceBuilder().query(buildQuery(expression));
String [] tokens = tokenizeExpression(expression);
SearchSourceBuilder sourceBuilder = new SearchSourceBuilder().query(buildQuery(tokens));
sourceBuilder.sort(Job.ID.getPreferredName());

SearchRequest searchRequest = client.prepareSearch(AnomalyDetectorsIndex.configIndexName())
.setIndicesOptions(IndicesOptions.lenientExpandOpen())
.setSource(sourceBuilder).request();

final boolean isWildCardExpression = isWildCard(expression);
LinkedList<IdMatcher> requiredMatches = requiredMatches(tokens, allowNoJobs);

executeAsyncWithOrigin(client.threadPool().getThreadContext(), ML_ORIGIN, searchRequest,
ActionListener.<SearchResponse>wrap(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: is the qualification of <SearchResponse> needed here?

response -> {
List<Job.Builder> jobs = new ArrayList<>();
Set<String> jobAndGroupIds = new HashSet<>();

SearchHit[] hits = response.getHits().getHits();
if (hits.length == 0 && isWildCardExpression && allowNoJobs == false) {
listener.onFailure(ExceptionsHelper.missingJobException(expression));
return;
}

List<Job.Builder> jobs = new ArrayList<>();
for (SearchHit hit : hits) {
try {
BytesReference source = hit.getSourceRef();
jobs.add(parseJobLenientlyFromSource(source));
Job.Builder job = parseJobLenientlyFromSource(source);
jobs.add(job);
jobAndGroupIds.add(job.getId());
jobAndGroupIds.addAll(job.getGroups());
} catch (IOException e) {
// TODO A better way to handle this rather than just ignoring the error?
logger.error("Error parsing anomaly detector job configuration [" + hit.getId() + "]", e);
}
}

filterMatchedIds(requiredMatches, jobAndGroupIds);
if (requiredMatches.isEmpty() == false) {
// some required jobs were not found
String missing = requiredMatches.stream().map(IdMatcher::getId).collect(Collectors.joining(","));
listener.onFailure(ExceptionsHelper.missingJobException(missing));
return;
}

listener.onResponse(jobs);
},
listener::onFailure)
, client::search);

}

private boolean isWildCard(String expression) {
return ALL.equals(expression) || Regex.isMatchAllPattern(expression);
private void parseJobLenientlyFromSource(BytesReference source, ActionListener<Job.Builder> jobListener) {
try (InputStream stream = source.streamInput();
XContentParser parser = XContentFactory.xContent(XContentType.JSON)
.createParser(NamedXContentRegistry.EMPTY, LoggingDeprecationHandler.INSTANCE, stream)) {
jobListener.onResponse(Job.LENIENT_PARSER.apply(parser, null));
} catch (Exception e) {
jobListener.onFailure(e);
}
}

private QueryBuilder buildQuery(String expression) {
private Job.Builder parseJobLenientlyFromSource(BytesReference source) throws IOException {
try (InputStream stream = source.streamInput();
XContentParser parser = XContentFactory.xContent(XContentType.JSON)
.createParser(NamedXContentRegistry.EMPTY, LoggingDeprecationHandler.INSTANCE, stream)) {
return Job.LENIENT_PARSER.apply(parser, null);
}
}

private QueryBuilder buildQuery(String [] tokens) {
QueryBuilder jobQuery = new TermQueryBuilder(Job.JOB_TYPE.getPreferredName(), Job.ANOMALY_DETECTOR_JOB_TYPE);
if (isWildCard(expression)) {
if (isWildcardAll(tokens)) {
// match all
return jobQuery;
}

Expand All @@ -354,7 +390,6 @@ private QueryBuilder buildQuery(String expression) {
BoolQueryBuilder shouldQueries = new BoolQueryBuilder();

List<String> terms = new ArrayList<>();
String[] tokens = Strings.tokenizeToStringArray(expression, ",");
for (String token : tokens) {
if (Regex.isSimpleMatchPattern(token)) {
shouldQueries.should(new WildcardQueryBuilder(Job.ID.getPreferredName(), token));
Expand All @@ -376,21 +411,131 @@ private QueryBuilder buildQuery(String expression) {
return boolQueryBuilder;
}

private void parseJobLenientlyFromSource(BytesReference source, ActionListener<Job.Builder> jobListener) {
try (InputStream stream = source.streamInput();
XContentParser parser = XContentFactory.xContent(XContentType.JSON)
.createParser(NamedXContentRegistry.EMPTY, LoggingDeprecationHandler.INSTANCE, stream)) {
jobListener.onResponse(Job.LENIENT_PARSER.apply(parser, null));
} catch (Exception e) {
jobListener.onFailure(e);
/**
* Does the {@code tokens} array resolves to a wildcard all expression.
* True if {@code tokens} is empty or if it contains a single element
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the tokens contain a full wildcard and something else then that's still a full wildcard overall. E.g. foo,* still means get everything, but with the way this method is used it makes sense not to report it. I think the comment should be changed to make clear that this function doesn't cover this case.

* equal to {@link #ALL}, '*' or an empty string
*
* @param tokens Expression tokens
* @return True if tokens resolves to a wildcard all expression
*/
static boolean isWildcardAll(String [] tokens) {
if (tokens.length == 0) {
return true;
}
return tokens.length == 1 && (ALL.equals(tokens[0]) || Regex.isMatchAllPattern(tokens[0]) || tokens[0].isEmpty());
}

private Job.Builder parseJobLenientlyFromSource(BytesReference source) throws IOException {
try (InputStream stream = source.streamInput();
XContentParser parser = XContentFactory.xContent(XContentType.JSON)
.createParser(NamedXContentRegistry.EMPTY, LoggingDeprecationHandler.INSTANCE, stream)) {
return Job.LENIENT_PARSER.apply(parser, null);
static String [] tokenizeExpression(String expression) {
return Strings.tokenizeToStringArray(expression, ",");
}

/**
* Generate the list of required matches from the expressions in {@code tokens}
*
* @param tokens List of expressions that may be wildcards or full Ids
* @param allowNoJobForWildcards If true then it is not required for wildcard
* expressions to match an Id meaning they are
* not returned in the list of required matches
* @return A list of required Id matchers
*/
static LinkedList<IdMatcher> requiredMatches(String [] tokens, boolean allowNoJobForWildcards) {
LinkedList<IdMatcher> matchers = new LinkedList<>();

if (isWildcardAll(tokens)) {
// if allowNoJobForWildcards == true then any number
// of jobs with any id is ok. Therefore no matches
// are required

if (allowNoJobForWildcards == false) {
// require something, anything to match
matchers.add(new WildcardMatcher("*"));
}
return matchers;
}

if (allowNoJobForWildcards) {
// matches are not required for wildcards but
// specific job Ids are
for (String token : tokens) {
if (Regex.isSimpleMatchPattern(token) == false) {
matchers.add(new EqualsIdMatcher(token));
}
}
} else {
// Matches are required for wildcards
for (String token : tokens) {
if (Regex.isSimpleMatchPattern(token)) {
matchers.add(new WildcardMatcher(token));
} else {
matchers.add(new EqualsIdMatcher(token));
}
}
}

return matchers;
}

/**
* For each given {@code requiredMatchers} check there is an element
* present in {@code ids} that matches. Once a match is made the
* matcher is popped from {@code requiredMatchers}.
*
* If all matchers are satisfied the list {@code requiredMatchers} will
* be empty after the call otherwise only the unmatched remain.
*
* @param requiredMatchers This is modified by the function: all matched matchers
* are removed from the list. At the end of the call only
* the unmatched ones are in this list
* @param ids Ids required to be matched
*/
static void filterMatchedIds(LinkedList<IdMatcher> requiredMatchers, Collection<String> ids) {
for (String id: ids) {
Iterator<IdMatcher> itr = requiredMatchers.iterator();
if (itr.hasNext() == false) {
break;
}
while (itr.hasNext()) {
if (itr.next().matches(id)) {
itr.remove();
}
}
}
}

abstract static class IdMatcher {
protected final String id;

IdMatcher(String id) {
this.id = id;
}

public String getId() {
return id;
}

public abstract boolean matches(String jobId);
}

static class EqualsIdMatcher extends IdMatcher {
EqualsIdMatcher(String id) {
super(id);
}

@Override
public boolean matches(String id) {
return this.id.equals(id);
}
}

static class WildcardMatcher extends IdMatcher {
WildcardMatcher(String id) {
super(id);
}

@Override
public boolean matches(String id) {
return Regex.simpleMatch(this.id, id);
}
}
}
Loading