Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ML] Job in index: Datafeed node selector #34218

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ public static Set<String> openJobIds(PersistentTasksCustomMetaData tasks) {
* Is there an ml anomaly detector job task for the job {@code jobId}?
* @param jobId The job id
* @param tasks Persistent tasks
* @return
* @return True if the job has a task
*/
public static boolean taskExistsForJob(String jobId, PersistentTasksCustomMetaData tasks) {
return openJobIds(tasks).contains(jobId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,13 @@
import org.elasticsearch.xpack.core.XPackPlugin;
import org.elasticsearch.xpack.core.ml.MlTasks;
import org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfig;
import org.elasticsearch.xpack.core.ml.job.config.Job;
import org.elasticsearch.xpack.core.ml.job.messages.Messages;
import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper;

import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.function.LongSupplier;

Expand Down Expand Up @@ -147,6 +150,8 @@ public boolean equals(Object obj) {

public static class DatafeedParams implements XPackPlugin.XPackPersistentTaskParams {

public static final ParseField INDICES = new ParseField("indices");

public static ObjectParser<DatafeedParams, Void> PARSER = new ObjectParser<>(MlTasks.DATAFEED_TASK_NAME, true, DatafeedParams::new);
static {
PARSER.declareString((params, datafeedId) -> params.datafeedId = datafeedId, DatafeedConfig.ID);
Expand All @@ -155,6 +160,8 @@ public static class DatafeedParams implements XPackPlugin.XPackPersistentTaskPar
PARSER.declareString(DatafeedParams::setEndTime, END_TIME);
PARSER.declareString((params, val) ->
params.setTimeout(TimeValue.parseTimeValue(val, TIMEOUT.getPreferredName())), TIMEOUT);
PARSER.declareString(DatafeedParams::setJobId, Job.ID);
PARSER.declareStringArray(DatafeedParams::setDatafeedIndices, INDICES);
}

static long parseDateOrThrow(String date, ParseField paramName, LongSupplier now) {
Expand Down Expand Up @@ -194,6 +201,10 @@ public DatafeedParams(StreamInput in) throws IOException {
startTime = in.readVLong();
endTime = in.readOptionalLong();
timeout = TimeValue.timeValueMillis(in.readVLong());
if (in.getVersion().onOrAfter(Version.CURRENT)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems like this should be a specific version, not Version.CURRENT. Once we get to a mixed version 7.2/7.3 cluster surely it would be OK for the master node to be on 7.2?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

CURRENT is a placeholder. This is to be merged into a feature branch that will merge into 6.x at some point in the future into in an unknown version. There is no version constant > 6.5.0 so it's CURRENT for now. OpenJobAction is the same, I'll add a checkbox to the meta issue so this isn't forgotten

jobId = in.readOptionalString();
datafeedIndices = in.readList(StreamInput::readString);
}
}

DatafeedParams() {
Expand All @@ -203,6 +214,9 @@ public DatafeedParams(StreamInput in) throws IOException {
private long startTime;
private Long endTime;
private TimeValue timeout = TimeValue.timeValueSeconds(20);
private List<String> datafeedIndices = Collections.emptyList();
private String jobId;


public String getDatafeedId() {
return datafeedId;
Expand Down Expand Up @@ -232,6 +246,22 @@ public void setTimeout(TimeValue timeout) {
this.timeout = timeout;
}

public String getJobId() {
return jobId;
}

public void setJobId(String jobId) {
this.jobId = jobId;
}

public List<String> getDatafeedIndices() {
return datafeedIndices;
}

public void setDatafeedIndices(List<String> datafeedIndices) {
this.datafeedIndices = datafeedIndices;
}

@Override
public String getWriteableName() {
return MlTasks.DATAFEED_TASK_NAME;
Expand All @@ -248,6 +278,10 @@ public void writeTo(StreamOutput out) throws IOException {
out.writeVLong(startTime);
out.writeOptionalLong(endTime);
out.writeVLong(timeout.millis());
if (out.getVersion().onOrAfter(Version.CURRENT)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As above, surely this should be a specific version?

out.writeOptionalString(jobId);
out.writeStringList(datafeedIndices);
}
}

@Override
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems wrong that toXContent() persists different information to writeTo(). This means that the behaviour on full cluster restart is different to behaviour within a running cluster. If someone did a full cluster restart with started datafeeds I suspect that things wouldn't work on restart. (It should be possible to add a test in x-pack/qa/full-cluster-restart to confirm this.)

However, I think the solution to this problem is not to put the whole datafeed config in the X-Content representation. That creates the same problem we have now where search syntax that's been removed prevents reading the cluster state from disk.

This leads to the conclusion that instead of storing the whole datafeed config in the task params we should store just enough fields from it to validate allocation, and definitely not the search or aggregations. Then toXContent() can be consistent with writeTo().

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems wrong that toXContent() persists different information to writeTo()

This is because the params do not have a lenient parser (see #33950), in 7 this won't be the case and the new fields can be written.

I suspect that things wouldn't work on restart

The feature branch isn't at a point where the QA tests can be run yet but true. This will have to be handled by the migration process in a future PR but given that the persistent task params cannot be modified the only solution may be to stop and restart the job & datafeed.

I choose to put the datafeed in the params as once the datafeed has been validated the same config should be used but this does add weight to the cluster state and is not good for the reasons mentioned above especially deprecated search & aggs config. I've changed it to use the minimal set of fields (job id & datafeed indices).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The migration process will only move datafeeds from cluster state to index when the entire cluster has been upgraded to whatever version this goes into (6.6 or 6.7). So if we can find the minimum node version in the cluster in toXContent() then we can write the extra fields into the X-Content representation only after the entire cluster has been upgraded. That will make full cluster restarts work in the case where the entire cluster is on 6.6/6.7 (and it is essential this works because some people will run 6.7 for a year after 7.0 is released).

So if job_id is null after a full cluster restart then that implies the cluster was not completely upgraded to 6.6/6.7, and hence the migration will not have started, and hence the information can be obtained from the MlMetadata in cluster state.

Copy link
Member Author

@davidkyle davidkyle Oct 16, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The reasoning is thus:

  • The job id is required to find the job task related to the datafeed and hence make the allocation decision
  • If job_id is null then the task was not started from a migrated config/upgraded node. i.e. it is a running task from a full cluster restart or rolling upgrade
  • In that case the datafeed config should be present in the clusterstate and the job_id can be found that way
  • It is not possible to modify the persistent task parameters
  • The persistent task may migrate once again before the task is stopped due to node failure and once again job_id will have to be found via the datafeed config in the clusterstate
  • Therefore config cannot be removed from the clusterstate until the task has stopped
  • Migration should duplicate the config in an index document and remove it from clusterstate on datafeed stopping

That seems reasonable enough until we hit a datafeed job created in 6.4 which is not stopped before 8

So if we can find the minimum node version in the cluster in toXContent() ...

I can't see a way of doing this and it won't help with full cluster restarts from 6.5

I could add the code to read job_id from the datafeed config in the clusterstate in this PR but I have no mechanism of testing it yet and have not started on the migration code. My preference is to finish the work on making jobs run from the new config and make it stable then tackle this issue (and others) in the migration work.

Expand All @@ -259,13 +293,19 @@ public XContentBuilder toXContent(XContentBuilder builder, ToXContent.Params par
builder.field(END_TIME.getPreferredName(), String.valueOf(endTime));
}
builder.field(TIMEOUT.getPreferredName(), timeout.getStringRep());
if (jobId != null) {
builder.field(Job.ID.getPreferredName(), jobId);
}
if (datafeedIndices.isEmpty() == false) {
builder.field(INDICES.getPreferredName(), datafeedIndices);
}
builder.endObject();
return builder;
}

@Override
public int hashCode() {
return Objects.hash(datafeedId, startTime, endTime, timeout);
return Objects.hash(datafeedId, startTime, endTime, timeout, jobId, datafeedIndices);
}

@Override
Expand All @@ -280,7 +320,9 @@ public boolean equals(Object obj) {
return Objects.equals(datafeedId, other.datafeedId) &&
Objects.equals(startTime, other.startTime) &&
Objects.equals(endTime, other.endTime) &&
Objects.equals(timeout, other.timeout);
Objects.equals(timeout, other.timeout) &&
Objects.equals(jobId, other.jobId) &&
Objects.equals(datafeedIndices, other.datafeedIndices);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import org.elasticsearch.test.AbstractSerializingTestCase;

import java.io.IOException;
import java.util.Arrays;

public class DatafeedParamsTests extends AbstractSerializingTestCase<StartDatafeedAction.DatafeedParams> {
@Override
Expand All @@ -28,6 +29,13 @@ public static StartDatafeedAction.DatafeedParams createDatafeedParams() {
if (randomBoolean()) {
params.setTimeout(TimeValue.timeValueMillis(randomNonNegativeLong()));
}
if (randomBoolean()) {
params.setJobId(randomAlphaOfLength(10));
}
if (randomBoolean()) {
params.setDatafeedIndices(Arrays.asList(randomAlphaOfLength(10), randomAlphaOfLength(10)));
}

return params;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,7 @@
import org.elasticsearch.xpack.ml.action.TransportValidateJobConfigAction;
import org.elasticsearch.xpack.ml.datafeed.DatafeedJobBuilder;
import org.elasticsearch.xpack.ml.datafeed.DatafeedManager;
import org.elasticsearch.xpack.ml.datafeed.persistence.DatafeedConfigProvider;
import org.elasticsearch.xpack.ml.job.JobManager;
import org.elasticsearch.xpack.ml.job.UpdateJobProcessNotifier;
import org.elasticsearch.xpack.ml.job.categorization.MlClassicTokenizer;
Expand Down Expand Up @@ -369,6 +370,7 @@ public Collection<Object> createComponents(Client client, ClusterService cluster
Auditor auditor = new Auditor(client, clusterService.nodeName());
JobResultsProvider jobResultsProvider = new JobResultsProvider(client, settings);
JobConfigProvider jobConfigProvider = new JobConfigProvider(client, settings);
DatafeedConfigProvider datafeedConfigProvider = new DatafeedConfigProvider(client, settings, xContentRegistry);
UpdateJobProcessNotifier notifier = new UpdateJobProcessNotifier(settings, client, clusterService, threadPool);
JobManager jobManager = new JobManager(env, settings, jobResultsProvider, clusterService, auditor, threadPool, client, notifier);

Expand Down Expand Up @@ -409,7 +411,8 @@ public Collection<Object> createComponents(Client client, ClusterService cluster
jobManager, jobResultsProvider, jobResultsPersister, jobDataCountsPersister, autodetectProcessFactory,
normalizerFactory, xContentRegistry, auditor);
this.autodetectProcessManager.set(autodetectProcessManager);
DatafeedJobBuilder datafeedJobBuilder = new DatafeedJobBuilder(client, jobResultsProvider, auditor, System::currentTimeMillis);
DatafeedJobBuilder datafeedJobBuilder = new DatafeedJobBuilder(client, settings, xContentRegistry,
auditor, System::currentTimeMillis);
DatafeedManager datafeedManager = new DatafeedManager(threadPool, client, clusterService, datafeedJobBuilder,
System::currentTimeMillis, auditor);
this.datafeedManager.set(datafeedManager);
Expand All @@ -426,6 +429,7 @@ public Collection<Object> createComponents(Client client, ClusterService cluster
mlLifeCycleService,
jobResultsProvider,
jobConfigProvider,
datafeedConfigProvider,
jobManager,
autodetectProcessManager,
new MlInitializationService(settings, threadPool, clusterService, client),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,15 +12,13 @@
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.persistent.PersistentTasksCustomMetaData;
import org.elasticsearch.persistent.PersistentTasksCustomMetaData.Assignment;
import org.elasticsearch.persistent.PersistentTasksCustomMetaData.PersistentTask;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xpack.core.ml.MlMetadata;
import org.elasticsearch.xpack.core.ml.MlTasks;
import org.elasticsearch.xpack.core.ml.action.OpenJobAction;
import org.elasticsearch.xpack.core.ml.action.StartDatafeedAction;
import org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfig;
import org.elasticsearch.persistent.PersistentTasksCustomMetaData;
import org.elasticsearch.persistent.PersistentTasksCustomMetaData.Assignment;
import org.elasticsearch.persistent.PersistentTasksCustomMetaData.PersistentTask;
import org.elasticsearch.xpack.ml.notifications.Auditor;

import java.util.Objects;
Expand Down Expand Up @@ -89,16 +87,20 @@ public void clusterChanged(ClusterChangedEvent event) {
auditor.info(jobId, "Opening job on node [" + node.toString() + "]");
}
} else if (MlTasks.DATAFEED_TASK_NAME.equals(currentTask.getTaskName())) {
String datafeedId = ((StartDatafeedAction.DatafeedParams) currentTask.getParams()).getDatafeedId();
DatafeedConfig datafeedConfig = MlMetadata.getMlMetadata(event.state()).getDatafeed(datafeedId);
StartDatafeedAction.DatafeedParams datafeedParams = (StartDatafeedAction.DatafeedParams) currentTask.getParams();
String jobId = datafeedParams.getJobId();
if (currentAssignment.getExecutorNode() == null) {
String msg = "No node found to start datafeed [" + datafeedId +"]. Reasons [" +
String msg = "No node found to start datafeed [" + datafeedParams.getDatafeedId() +"]. Reasons [" +
currentAssignment.getExplanation() + "]";
logger.warn("[{}] {}", datafeedConfig.getJobId(), msg);
auditor.warning(datafeedConfig.getJobId(), msg);
logger.warn("[{}] {}", jobId, msg);
if (jobId != null) {
auditor.warning(jobId, msg);
}
} else {
DiscoveryNode node = event.state().nodes().get(currentAssignment.getExecutorNode());
auditor.info(datafeedConfig.getJobId(), "Starting datafeed [" + datafeedId + "] on node [" + node + "]");
if (jobId != null) {
auditor.info(jobId, "Starting datafeed [" + datafeedParams.getDatafeedId() + "] on node [" + node + "]");
}
}
}
}
Expand Down
Loading