Skip to content

Commit

Permalink
[ML] Job In Index: Enable GET APIS in mixed state (#35344)
Browse files Browse the repository at this point in the history
This enables calls to the job and datafeed APIs in a mixed cluster state
before jobs have been migrated
  • Loading branch information
davidkyle authored Nov 13, 2018
1 parent 2b6cd7a commit 64ba62a
Show file tree
Hide file tree
Showing 21 changed files with 1,194 additions and 274 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -90,8 +90,13 @@ public boolean isGroupOrJob(String id) {
return groupOrJobLookup.isGroupOrJob(id);
}

public Set<String> expandJobIds(String expression, boolean allowNoJobs) {
return groupOrJobLookup.expandJobIds(expression, allowNoJobs);
public Set<String> expandJobIds(String expression) {
return groupOrJobLookup.expandJobIds(expression);
}

// Matches only groups
public Set<String> expandGroupIds(String expression) {
return groupOrJobLookup.expandGroupIds(expression);
}

public boolean isJobDeleting(String jobId) {
Expand All @@ -111,9 +116,9 @@ public Optional<DatafeedConfig> getDatafeedByJobId(String jobId) {
return datafeeds.values().stream().filter(s -> s.getJobId().equals(jobId)).findFirst();
}

public Set<String> expandDatafeedIds(String expression, boolean allowNoDatafeeds) {
return NameResolver.newUnaliased(datafeeds.keySet(), ExceptionsHelper::missingDatafeedException)
.expand(expression, allowNoDatafeeds);
public Set<String> expandDatafeedIds(String expression) {
return NameResolver.newUnaliased(datafeeds.keySet())
.expand(expression);
}

public Long getLastMemoryRefreshVersion() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
import org.elasticsearch.ResourceAlreadyExistsException;
import org.elasticsearch.xpack.core.ml.job.config.Job;
import org.elasticsearch.xpack.core.ml.job.messages.Messages;
import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper;
import org.elasticsearch.xpack.core.ml.utils.NameResolver;

import java.util.ArrayList;
Expand Down Expand Up @@ -55,8 +54,12 @@ private void put(Job job) {
}
}

public Set<String> expandJobIds(String expression, boolean allowNoJobs) {
return new GroupOrJobResolver().expand(expression, allowNoJobs);
public Set<String> expandJobIds(String expression) {
return new GroupOrJobResolver().expand(expression);
}

public Set<String> expandGroupIds(String expression) {
return new GroupResolver().expand(expression);
}

public boolean isGroupOrJob(String id) {
Expand All @@ -66,7 +69,6 @@ public boolean isGroupOrJob(String id) {
private class GroupOrJobResolver extends NameResolver {

private GroupOrJobResolver() {
super(ExceptionsHelper::missingJobException);
}

@Override
Expand All @@ -88,4 +90,33 @@ protected List<String> lookup(String key) {
return groupOrJob == null ? Collections.emptyList() : groupOrJob.jobs().stream().map(Job::getId).collect(Collectors.toList());
}
}

private class GroupResolver extends NameResolver {

private GroupResolver() {
}

@Override
protected Set<String> keys() {
return nameSet();
}

@Override
protected Set<String> nameSet() {
return groupOrJobLookup.entrySet().stream()
.filter(entry -> entry.getValue().isGroup())
.map(entry -> entry.getKey())
.collect(Collectors.toSet());
}

@Override
protected List<String> lookup(String key) {
GroupOrJob groupOrJob = groupOrJobLookup.get(key);
if (groupOrJob == null || groupOrJob.isGroup() == false) {
return Collections.emptyList();
} else {
return Collections.singletonList(key);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,18 +5,15 @@
*/
package org.elasticsearch.xpack.core.ml.utils;

import org.elasticsearch.ResourceNotFoundException;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.regex.Regex;

import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.Set;
import java.util.SortedSet;
import java.util.TreeSet;
import java.util.function.Function;
import java.util.stream.Collectors;

/**
Expand All @@ -25,12 +22,6 @@
*/
public abstract class NameResolver {

private final Function<String, ResourceNotFoundException> notFoundExceptionSupplier;

protected NameResolver(Function<String, ResourceNotFoundException> notFoundExceptionSupplier) {
this.notFoundExceptionSupplier = Objects.requireNonNull(notFoundExceptionSupplier);
}

/**
* Expands an expression into the set of matching names.
* For example, given a set of names ["foo-1", "foo-2", "bar-1", bar-2"],
Expand All @@ -46,12 +37,9 @@ protected NameResolver(Function<String, ResourceNotFoundException> notFoundExcep
* </ul>
*
* @param expression the expression to resolve
* @param allowNoMatch if {@code false}, an error is thrown when no name matches the {@code expression}.
* This only applies to wild card expressions, if {@code expression} is not a
* wildcard then setting this true will not suppress the exception
* @return the sorted set of matching names
*/
public SortedSet<String> expand(String expression, boolean allowNoMatch) {
public SortedSet<String> expand(String expression) {
SortedSet<String> result = new TreeSet<>();
if (MetaData.ALL.equals(expression) || Regex.isMatchAllPattern(expression)) {
result.addAll(nameSet());
Expand All @@ -64,24 +52,13 @@ public SortedSet<String> expand(String expression, boolean allowNoMatch) {
.map(this::lookup)
.flatMap(List::stream)
.collect(Collectors.toList());
if (expanded.isEmpty() && allowNoMatch == false) {
throw notFoundExceptionSupplier.apply(token);
}
result.addAll(expanded);
} else {
List<String> matchingNames = lookup(token);
// allowNoMatch only applies to wildcard expressions,
// this isn't so don't check the allowNoMatch here
if (matchingNames.isEmpty()) {
throw notFoundExceptionSupplier.apply(token);
}
result.addAll(matchingNames);
}
}
}
if (result.isEmpty() && allowNoMatch == false) {
throw notFoundExceptionSupplier.apply(expression);
}
return result;
}

Expand All @@ -105,11 +82,10 @@ public SortedSet<String> expand(String expression, boolean allowNoMatch) {
/**
* Creates a {@code NameResolver} that has no aliases
* @param nameSet the set of all names
* @param notFoundExceptionSupplier a supplier of {@link ResourceNotFoundException} to be used when an expression matches no name
* @return the unaliased {@code NameResolver}
*/
public static NameResolver newUnaliased(Set<String> nameSet, Function<String, ResourceNotFoundException> notFoundExceptionSupplier) {
return new NameResolver(notFoundExceptionSupplier) {
public static NameResolver newUnaliased(Set<String> nameSet) {
return new NameResolver() {
@Override
protected Set<String> keys() {
return nameSet;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,46 +6,34 @@
package org.elasticsearch.xpack.core.ml.job.groups;

import org.elasticsearch.ResourceAlreadyExistsException;
import org.elasticsearch.ResourceNotFoundException;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.xpack.core.ml.job.config.Job;
import org.elasticsearch.xpack.core.ml.job.groups.GroupOrJobLookup;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;

import static org.hamcrest.Matchers.contains;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.contains;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

public class GroupOrJobLookupTests extends ESTestCase {

public void testEmptyLookup_GivenAllowNoJobs() {
GroupOrJobLookup lookup = new GroupOrJobLookup(Collections.emptyList());

assertThat(lookup.expandJobIds("_all", true).isEmpty(), is(true));
assertThat(lookup.expandJobIds("*", true).isEmpty(), is(true));
assertThat(lookup.expandJobIds("foo*", true).isEmpty(), is(true));
expectThrows(ResourceNotFoundException.class, () -> lookup.expandJobIds("foo", true));
}

public void testEmptyLookup_GivenNotAllowNoJobs() {
public void testEmptyLookup() {
GroupOrJobLookup lookup = new GroupOrJobLookup(Collections.emptyList());

expectThrows(ResourceNotFoundException.class, () -> lookup.expandJobIds("_all", false));
expectThrows(ResourceNotFoundException.class, () -> lookup.expandJobIds("*", false));
expectThrows(ResourceNotFoundException.class, () -> lookup.expandJobIds("foo*", false));
expectThrows(ResourceNotFoundException.class, () -> lookup.expandJobIds("foo", true));
assertThat(lookup.expandJobIds("_all").isEmpty(), is(true));
assertThat(lookup.expandJobIds("*").isEmpty(), is(true));
assertThat(lookup.expandJobIds("foo*").isEmpty(), is(true));
assertThat(lookup.expandJobIds("foo").isEmpty(), is(true));
}

public void testAllIsNotExpandedInCommaSeparatedExpression() {
GroupOrJobLookup lookup = new GroupOrJobLookup(Collections.emptyList());
ResourceNotFoundException e = expectThrows(ResourceNotFoundException.class, () -> lookup.expandJobIds("foo-*,_all", true));
assertThat(e.getMessage(), equalTo("No known job with id '_all'"));
assertThat(lookup.expandJobIds("foo*,_all").isEmpty(), is(true));
}

public void testConstructor_GivenJobWithSameIdAsPreviousGroupName() {
Expand Down Expand Up @@ -75,19 +63,19 @@ public void testLookup() {
jobs.add(mockJob("nogroup", Collections.emptyList()));
GroupOrJobLookup groupOrJobLookup = new GroupOrJobLookup(jobs);

assertThat(groupOrJobLookup.expandJobIds("_all", false), contains("bar-1", "bar-2", "foo-1", "foo-2", "nogroup"));
assertThat(groupOrJobLookup.expandJobIds("*", false), contains("bar-1", "bar-2", "foo-1", "foo-2", "nogroup"));
assertThat(groupOrJobLookup.expandJobIds("bar-1", false), contains("bar-1"));
assertThat(groupOrJobLookup.expandJobIds("foo-1", false), contains("foo-1"));
assertThat(groupOrJobLookup.expandJobIds("foo-2, bar-1", false), contains("bar-1", "foo-2"));
assertThat(groupOrJobLookup.expandJobIds("foo-group", false), contains("foo-1", "foo-2"));
assertThat(groupOrJobLookup.expandJobIds("bar-group", false), contains("bar-1", "bar-2"));
assertThat(groupOrJobLookup.expandJobIds("ones", false), contains("bar-1", "foo-1"));
assertThat(groupOrJobLookup.expandJobIds("twos", false), contains("bar-2", "foo-2"));
assertThat(groupOrJobLookup.expandJobIds("foo-group, nogroup", false), contains("foo-1", "foo-2", "nogroup"));
assertThat(groupOrJobLookup.expandJobIds("*-group", false), contains("bar-1", "bar-2", "foo-1", "foo-2"));
assertThat(groupOrJobLookup.expandJobIds("foo-group,foo-1,foo-2", false), contains("foo-1", "foo-2"));
assertThat(groupOrJobLookup.expandJobIds("foo-group,*-2", false), contains("bar-2", "foo-1", "foo-2"));
assertThat(groupOrJobLookup.expandJobIds("_all"), contains("bar-1", "bar-2", "foo-1", "foo-2", "nogroup"));
assertThat(groupOrJobLookup.expandJobIds("*"), contains("bar-1", "bar-2", "foo-1", "foo-2", "nogroup"));
assertThat(groupOrJobLookup.expandJobIds("bar-1"), contains("bar-1"));
assertThat(groupOrJobLookup.expandJobIds("foo-1"), contains("foo-1"));
assertThat(groupOrJobLookup.expandJobIds("foo-2, bar-1"), contains("bar-1", "foo-2"));
assertThat(groupOrJobLookup.expandJobIds("foo-group"), contains("foo-1", "foo-2"));
assertThat(groupOrJobLookup.expandJobIds("bar-group"), contains("bar-1", "bar-2"));
assertThat(groupOrJobLookup.expandJobIds("ones"), contains("bar-1", "foo-1"));
assertThat(groupOrJobLookup.expandJobIds("twos"), contains("bar-2", "foo-2"));
assertThat(groupOrJobLookup.expandJobIds("foo-group, nogroup"), contains("foo-1", "foo-2", "nogroup"));
assertThat(groupOrJobLookup.expandJobIds("*-group"), contains("bar-1", "bar-2", "foo-1", "foo-2"));
assertThat(groupOrJobLookup.expandJobIds("foo-group,foo-1,foo-2"), contains("foo-1", "foo-2"));
assertThat(groupOrJobLookup.expandJobIds("foo-group,*-2"), contains("bar-2", "foo-1", "foo-2"));
}

public void testIsGroupOrJob() {
Expand All @@ -104,6 +92,19 @@ public void testIsGroupOrJob() {
assertFalse(groupOrJobLookup.isGroupOrJob("missing"));
}

public void testExpandGroupIds() {
List<Job> jobs = new ArrayList<>();
jobs.add(mockJob("foo-1", Arrays.asList("foo-group")));
jobs.add(mockJob("foo-2", Arrays.asList("foo-group")));
jobs.add(mockJob("bar-1", Arrays.asList("bar-group")));
jobs.add(mockJob("nogroup", Collections.emptyList()));

GroupOrJobLookup groupOrJobLookup = new GroupOrJobLookup(jobs);
assertThat(groupOrJobLookup.expandGroupIds("foo*"), contains("foo-group"));
assertThat(groupOrJobLookup.expandGroupIds("bar-group,nogroup"), contains("bar-group"));
assertThat(groupOrJobLookup.expandGroupIds("*"), contains("bar-group", "foo-group"));
}

private static Job mockJob(String jobId, List<String> groups) {
Job job = mock(Job.class);
when(job.getId()).thenReturn(jobId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
import org.elasticsearch.action.TaskOperationFailure;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.tasks.TransportTasksAction;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.node.DiscoveryNodes;
Expand All @@ -37,7 +36,7 @@
import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper;
import org.elasticsearch.xpack.ml.MachineLearning;
import org.elasticsearch.xpack.ml.datafeed.persistence.DatafeedConfigProvider;
import org.elasticsearch.xpack.ml.job.persistence.JobConfigProvider;
import org.elasticsearch.xpack.ml.job.JobManager;
import org.elasticsearch.xpack.ml.notifications.Auditor;

import java.io.IOException;
Expand All @@ -52,28 +51,25 @@
public class TransportCloseJobAction extends TransportTasksAction<TransportOpenJobAction.JobTask, CloseJobAction.Request,
CloseJobAction.Response, CloseJobAction.Response> {

private final Client client;
private final ClusterService clusterService;
private final Auditor auditor;
private final PersistentTasksService persistentTasksService;
private final JobConfigProvider jobConfigProvider;
private final DatafeedConfigProvider datafeedConfigProvider;
private final JobManager jobManager;

@Inject
public TransportCloseJobAction(Settings settings, TransportService transportService, ThreadPool threadPool,
ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver,
ClusterService clusterService, Client client, Auditor auditor,
PersistentTasksService persistentTasksService, JobConfigProvider jobConfigProvider,
DatafeedConfigProvider datafeedConfigProvider) {
ClusterService clusterService, Auditor auditor, PersistentTasksService persistentTasksService,
DatafeedConfigProvider datafeedConfigProvider, JobManager jobManager) {
// We fork in innerTaskOperation(...), so we can use ThreadPool.Names.SAME here:
super(settings, CloseJobAction.NAME, threadPool, clusterService, transportService, actionFilters,
indexNameExpressionResolver, CloseJobAction.Request::new, CloseJobAction.Response::new, ThreadPool.Names.SAME);
this.client = client;
this.clusterService = clusterService;
this.auditor = auditor;
this.persistentTasksService = persistentTasksService;
this.jobConfigProvider = jobConfigProvider;
this.datafeedConfigProvider = datafeedConfigProvider;
this.jobManager = jobManager;
}

@Override
Expand Down Expand Up @@ -107,7 +103,7 @@ protected void doExecute(Task task, CloseJobAction.Request request, ActionListen
*/

PersistentTasksCustomMetaData tasksMetaData = state.getMetaData().custom(PersistentTasksCustomMetaData.TYPE);
jobConfigProvider.expandJobsIds(request.getJobId(), request.allowNoJobs(), true, ActionListener.wrap(
jobManager.expandJobIds(request.getJobId(), request.allowNoJobs(), ActionListener.wrap(
expandedJobIds -> {
validate(expandedJobIds, request.isForce(), tasksMetaData, ActionListener.wrap(
response -> {
Expand Down
Loading

0 comments on commit 64ba62a

Please sign in to comment.