Skip to content

Commit

Permalink
[ML] Merge the Jindex 6x feature branch (#36698)
Browse files Browse the repository at this point in the history
Store anomaly detector jobs and datafeed configurations in the .ml-config index.
Existing configurations for closed jobs and datafeeds are migrated after upgrade.

For #32905
  • Loading branch information
davidkyle authored Dec 18, 2018
1 parent c0d1597 commit 66a582a
Show file tree
Hide file tree
Showing 158 changed files with 12,199 additions and 2,856 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ public void testIndexTemplatesCreated() throws Exception {
if (masterIsNewVersion()) {
// Everything else waits until the master is upgraded to create its templates
expectedTemplates.add(".ml-anomalies-");
expectedTemplates.add(".ml-config");
expectedTemplates.add(".ml-meta");
expectedTemplates.add(".ml-notifications");
expectedTemplates.add(".ml-state");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@
import org.elasticsearch.xpack.core.logstash.LogstashFeatureSetUsage;
import org.elasticsearch.xpack.core.ml.MachineLearningFeatureSetUsage;
import org.elasticsearch.xpack.core.ml.MlMetadata;
import org.elasticsearch.xpack.core.ml.MlTasks;
import org.elasticsearch.xpack.core.ml.action.CloseJobAction;
import org.elasticsearch.xpack.core.ml.action.DeleteCalendarAction;
import org.elasticsearch.xpack.core.ml.action.DeleteCalendarEventAction;
Expand Down Expand Up @@ -363,9 +364,9 @@ public List<NamedWriteableRegistry.Entry> getNamedWriteables() {
new NamedWriteableRegistry.Entry(MetaData.Custom.class, "ml", MlMetadata::new),
new NamedWriteableRegistry.Entry(NamedDiff.class, "ml", MlMetadata.MlMetadataDiff::new),
// ML - Persistent action requests
new NamedWriteableRegistry.Entry(PersistentTaskParams.class, StartDatafeedAction.TASK_NAME,
new NamedWriteableRegistry.Entry(PersistentTaskParams.class, MlTasks.DATAFEED_TASK_NAME,
StartDatafeedAction.DatafeedParams::new),
new NamedWriteableRegistry.Entry(PersistentTaskParams.class, OpenJobAction.TASK_NAME,
new NamedWriteableRegistry.Entry(PersistentTaskParams.class, MlTasks.JOB_TASK_NAME,
OpenJobAction.JobParams::new),
// ML - Task states
new NamedWriteableRegistry.Entry(PersistentTaskState.class, JobTaskState.NAME, JobTaskState::new),
Expand Down Expand Up @@ -433,9 +434,9 @@ public List<NamedXContentRegistry.Entry> getNamedXContent() {
new NamedXContentRegistry.Entry(MetaData.Custom.class, new ParseField("ml"),
parser -> MlMetadata.LENIENT_PARSER.parse(parser, null).build()),
// ML - Persistent action requests
new NamedXContentRegistry.Entry(PersistentTaskParams.class, new ParseField(StartDatafeedAction.TASK_NAME),
new NamedXContentRegistry.Entry(PersistentTaskParams.class, new ParseField(MlTasks.DATAFEED_TASK_NAME),
StartDatafeedAction.DatafeedParams::fromXContent),
new NamedXContentRegistry.Entry(PersistentTaskParams.class, new ParseField(OpenJobAction.TASK_NAME),
new NamedXContentRegistry.Entry(PersistentTaskParams.class, new ParseField(MlTasks.JOB_TASK_NAME),
OpenJobAction.JobParams::fromXContent),
// ML - Task states
new NamedXContentRegistry.Entry(PersistentTaskState.class, new ParseField(DatafeedState.NAME), DatafeedState::fromXContent),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,6 @@ public final class MlMetaIndex {
*/
public static final String INDEX_NAME = ".ml-meta";

public static final String INCLUDE_TYPE_KEY = "include_type";

public static final String TYPE = "doc";

private MlMetaIndex() {}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
*/
package org.elasticsearch.xpack.core.ml;

import org.elasticsearch.ResourceAlreadyExistsException;
import org.elasticsearch.ResourceNotFoundException;
import org.elasticsearch.Version;
import org.elasticsearch.cluster.AbstractDiffable;
Expand Down Expand Up @@ -87,8 +86,13 @@ public boolean isGroupOrJob(String id) {
return groupOrJobLookup.isGroupOrJob(id);
}

public Set<String> expandJobIds(String expression, boolean allowNoJobs) {
return groupOrJobLookup.expandJobIds(expression, allowNoJobs);
public Set<String> expandJobIds(String expression) {
return groupOrJobLookup.expandJobIds(expression);
}

// Matches only groups
public Set<String> expandGroupIds(String expression) {
return groupOrJobLookup.expandGroupIds(expression);
}

public boolean isJobDeleting(String jobId) {
Expand All @@ -108,9 +112,9 @@ public Optional<DatafeedConfig> getDatafeedByJobId(String jobId) {
return datafeeds.values().stream().filter(s -> s.getJobId().equals(jobId)).findFirst();
}

public Set<String> expandDatafeedIds(String expression, boolean allowNoDatafeeds) {
return NameResolver.newUnaliased(datafeeds.keySet(), ExceptionsHelper::missingDatafeedException)
.expand(expression, allowNoDatafeeds);
public Set<String> expandDatafeedIds(String expression) {
return NameResolver.newUnaliased(datafeeds.keySet())
.expand(expression);
}

@Override
Expand Down Expand Up @@ -146,7 +150,6 @@ public MlMetadata(StreamInput in) throws IOException {
datafeeds.put(in.readString(), new DatafeedConfig(in));
}
this.datafeeds = datafeeds;

this.groupOrJobLookup = new GroupOrJobLookup(jobs.values());
}

Expand All @@ -167,7 +170,7 @@ private static <T extends Writeable> void writeMap(Map<String, T> map, StreamOut
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
DelegatingMapParams extendedParams =
new DelegatingMapParams(Collections.singletonMap(ToXContentParams.FOR_CLUSTER_STATE, "true"), params);
new DelegatingMapParams(Collections.singletonMap(ToXContentParams.FOR_INTERNAL_STORAGE, "true"), params);
mapValuesToXContent(JOBS_FIELD, jobs, builder, extendedParams);
mapValuesToXContent(DATAFEEDS_FIELD, datafeeds, builder, extendedParams);
return builder;
Expand Down Expand Up @@ -196,9 +199,14 @@ public MlMetadataDiff(StreamInput in) throws IOException {
this.jobs = DiffableUtils.readJdkMapDiff(in, DiffableUtils.getStringKeySerializer(), Job::new,
MlMetadataDiff::readJobDiffFrom);
this.datafeeds = DiffableUtils.readJdkMapDiff(in, DiffableUtils.getStringKeySerializer(), DatafeedConfig::new,
MlMetadataDiff::readSchedulerDiffFrom);
MlMetadataDiff::readDatafeedDiffFrom);
}

/**
* Merge the diff with the ML metadata.
* @param part The current ML metadata.
* @return The new ML metadata.
*/
@Override
public MetaData.Custom apply(MetaData.Custom part) {
TreeMap<String, Job> newJobs = new TreeMap<>(jobs.apply(((MlMetadata) part).jobs));
Expand All @@ -221,7 +229,7 @@ static Diff<Job> readJobDiffFrom(StreamInput in) throws IOException {
return AbstractDiffable.readDiffFrom(Job::new, in);
}

static Diff<DatafeedConfig> readSchedulerDiffFrom(StreamInput in) throws IOException {
static Diff<DatafeedConfig> readDatafeedDiffFrom(StreamInput in) throws IOException {
return AbstractDiffable.readDiffFrom(DatafeedConfig::new, in);
}
}
Expand Down Expand Up @@ -295,7 +303,7 @@ public Builder deleteJob(String jobId, PersistentTasksCustomMetaData tasks) {

public Builder putDatafeed(DatafeedConfig datafeedConfig, Map<String, String> headers) {
if (datafeeds.containsKey(datafeedConfig.getId())) {
throw new ResourceAlreadyExistsException("A datafeed with id [" + datafeedConfig.getId() + "] already exists");
throw ExceptionsHelper.datafeedAlreadyExists(datafeedConfig.getId());
}
String jobId = datafeedConfig.getJobId();
checkJobIsAvailableForDatafeed(jobId);
Expand Down Expand Up @@ -369,14 +377,14 @@ private void checkDatafeedIsStopped(Supplier<String> msg, String datafeedId, Per
}
}

private Builder putJobs(Collection<Job> jobs) {
public Builder putJobs(Collection<Job> jobs) {
for (Job job : jobs) {
putJob(job, true);
}
return this;
}

private Builder putDatafeeds(Collection<DatafeedConfig> datafeeds) {
public Builder putDatafeeds(Collection<DatafeedConfig> datafeeds) {
for (DatafeedConfig datafeed : datafeeds) {
this.datafeeds.put(datafeed.getId(), datafeed);
}
Expand Down Expand Up @@ -421,8 +429,6 @@ void checkJobHasNoDatafeed(String jobId) {
}
}



public static MlMetadata getMlMetadata(ClusterState state) {
MlMetadata mlMetadata = (state == null) ? null : state.getMetaData().custom(TYPE);
if (mlMetadata == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,19 @@
import org.elasticsearch.xpack.core.ml.job.config.JobState;
import org.elasticsearch.xpack.core.ml.job.config.JobTaskState;

import java.util.Collections;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;

public final class MlTasks {

public static final String JOB_TASK_NAME = "xpack/ml/job";
public static final String DATAFEED_TASK_NAME = "xpack/ml/datafeed";

private static final String JOB_TASK_ID_PREFIX = "job-";
private static final String DATAFEED_TASK_ID_PREFIX = "datafeed-";

private MlTasks() {
}

Expand All @@ -22,15 +33,15 @@ private MlTasks() {
* A datafeed id can be used as a job id, because they are stored separately in cluster state.
*/
public static String jobTaskId(String jobId) {
return "job-" + jobId;
return JOB_TASK_ID_PREFIX + jobId;
}

/**
* Namespaces the task ids for datafeeds.
* A job id can be used as a datafeed id, because they are stored separately in cluster state.
*/
public static String datafeedTaskId(String datafeedId) {
return "datafeed-" + datafeedId;
return DATAFEED_TASK_ID_PREFIX + datafeedId;
}

@Nullable
Expand Down Expand Up @@ -67,4 +78,64 @@ public static DatafeedState getDatafeedState(String datafeedId, @Nullable Persis
return DatafeedState.STOPPED;
}
}

/**
* The job Ids of anomaly detector job tasks.
* All anomaly detector jobs are returned regardless of the status of the
* task (OPEN, CLOSED, FAILED etc).
*
* @param tasks Persistent tasks. If null an empty set is returned.
* @return The job Ids of anomaly detector job tasks
*/
public static Set<String> openJobIds(@Nullable PersistentTasksCustomMetaData tasks) {
if (tasks == null) {
return Collections.emptySet();
}

return tasks.findTasks(JOB_TASK_NAME, task -> true)
.stream()
.map(t -> t.getId().substring(JOB_TASK_ID_PREFIX.length()))
.collect(Collectors.toSet());
}

/**
* The datafeed Ids of started datafeed tasks
*
* @param tasks Persistent tasks. If null an empty set is returned.
* @return The Ids of running datafeed tasks
*/
public static Set<String> startedDatafeedIds(@Nullable PersistentTasksCustomMetaData tasks) {
if (tasks == null) {
return Collections.emptySet();
}

return tasks.findTasks(DATAFEED_TASK_NAME, task -> true)
.stream()
.map(t -> t.getId().substring(DATAFEED_TASK_ID_PREFIX.length()))
.collect(Collectors.toSet());
}

/**
* Is there an ml anomaly detector job task for the job {@code jobId}?
* @param jobId The job id
* @param tasks Persistent tasks
* @return True if the job has a task
*/
public static boolean taskExistsForJob(String jobId, PersistentTasksCustomMetaData tasks) {
return openJobIds(tasks).contains(jobId);
}

/**
* Read the active anomaly detector job tasks.
* Active tasks are not {@code JobState.CLOSED} or {@code JobState.FAILED}.
*
* @param tasks Persistent tasks
* @return The job tasks excluding closed and failed jobs
*/
public static List<PersistentTasksCustomMetaData.PersistentTask<?>> activeJobTasks(PersistentTasksCustomMetaData tasks) {
return tasks.findTasks(JOB_TASK_NAME, task -> true)
.stream()
.filter(task -> ((JobTaskState) task.getState()).getState().isAnyOf(JobState.CLOSED, JobState.FAILED) == false)
.collect(Collectors.toList());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import org.elasticsearch.action.support.master.MasterNodeRequest;
import org.elasticsearch.client.ElasticsearchClient;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.io.stream.StreamInput;
Expand All @@ -25,6 +26,7 @@
import org.elasticsearch.tasks.Task;
import org.elasticsearch.xpack.core.XPackPlugin;
import org.elasticsearch.xpack.core.ml.MachineLearningField;
import org.elasticsearch.xpack.core.ml.MlTasks;
import org.elasticsearch.xpack.core.ml.job.config.Job;
import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper;

Expand All @@ -35,7 +37,7 @@ public class OpenJobAction extends Action<OpenJobAction.Request, AcknowledgedRes

public static final OpenJobAction INSTANCE = new OpenJobAction();
public static final String NAME = "cluster:admin/xpack/ml/job/open";
public static final String TASK_NAME = "xpack/ml/job";


private OpenJobAction() {
super(NAME);
Expand Down Expand Up @@ -136,15 +138,16 @@ public static class JobParams implements XPackPlugin.XPackPersistentTaskParams {

/** TODO Remove in 7.0.0 */
public static final ParseField IGNORE_DOWNTIME = new ParseField("ignore_downtime");

public static final ParseField TIMEOUT = new ParseField("timeout");
public static ObjectParser<JobParams, Void> PARSER = new ObjectParser<>(TASK_NAME, true, JobParams::new);
public static final ParseField JOB = new ParseField("job");

public static ObjectParser<JobParams, Void> PARSER = new ObjectParser<>(MlTasks.JOB_TASK_NAME, true, JobParams::new);
static {
PARSER.declareString(JobParams::setJobId, Job.ID);
PARSER.declareBoolean((p, v) -> {}, IGNORE_DOWNTIME);
PARSER.declareString((params, val) ->
params.setTimeout(TimeValue.parseTimeValue(val, TIMEOUT.getPreferredName())), TIMEOUT);
PARSER.declareObject(JobParams::setJob, (p, c) -> Job.LENIENT_PARSER.apply(p, c).build(), JOB);
}

public static JobParams fromXContent(XContentParser parser) {
Expand All @@ -163,6 +166,7 @@ public static JobParams parseRequest(String jobId, XContentParser parser) {
// A big state can take a while to restore. For symmetry with the _close endpoint any
// changes here should be reflected there too.
private TimeValue timeout = MachineLearningField.STATE_PERSIST_RESTORE_TIMEOUT;
private Job job;

JobParams() {
}
Expand All @@ -178,6 +182,9 @@ public JobParams(StreamInput in) throws IOException {
in.readBoolean();
}
timeout = TimeValue.timeValueMillis(in.readVLong());
if (in.getVersion().onOrAfter(Version.V_6_6_0)) {
job = in.readOptionalWriteable(Job::new);
}
}

public String getJobId() {
Expand All @@ -196,9 +203,18 @@ public void setTimeout(TimeValue timeout) {
this.timeout = timeout;
}

@Nullable
public Job getJob() {
return job;
}

public void setJob(Job job) {
this.job = job;
}

@Override
public String getWriteableName() {
return TASK_NAME;
return MlTasks.JOB_TASK_NAME;
}

@Override
Expand All @@ -209,20 +225,27 @@ public void writeTo(StreamOutput out) throws IOException {
out.writeBoolean(true);
}
out.writeVLong(timeout.millis());
if (out.getVersion().onOrAfter(Version.V_6_6_0)) {
out.writeOptionalWriteable(job);
}
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
builder.field(Job.ID.getPreferredName(), jobId);
builder.field(TIMEOUT.getPreferredName(), timeout.getStringRep());
if (job != null) {
builder.field("job", job);
}
builder.endObject();
// The job field is streamed but not persisted
return builder;
}

@Override
public int hashCode() {
return Objects.hash(jobId, timeout);
return Objects.hash(jobId, timeout, job);
}

@Override
Expand All @@ -235,7 +258,8 @@ public boolean equals(Object obj) {
}
OpenJobAction.JobParams other = (OpenJobAction.JobParams) obj;
return Objects.equals(jobId, other.jobId) &&
Objects.equals(timeout, other.timeout);
Objects.equals(timeout, other.timeout) &&
Objects.equals(job, other.job);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,9 +143,7 @@ public void writeTo(StreamOutput out) throws IOException {

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
datafeed.doXContentBody(builder, params);
builder.endObject();
datafeed.toXContent(builder, params);
return builder;
}

Expand Down
Loading

0 comments on commit 66a582a

Please sign in to comment.