Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ML] complete machine learning plugin feature state clean up integration #71011

Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,11 @@ private XContentBuilder toXContentBuilder(ToXContent toXContent) {
}
}

private void writeBacklog() {
protected void clearBacklog() {
backlog = null;
}

protected void writeBacklog() {
assert backlog != null;
if (backlog == null) {
logger.error("Message back log has already been written");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,14 @@ public class MlMetadata implements Metadata.Custom {
private static final ParseField JOBS_FIELD = new ParseField("jobs");
private static final ParseField DATAFEEDS_FIELD = new ParseField("datafeeds");
public static final ParseField UPGRADE_MODE = new ParseField("upgrade_mode");

public static final MlMetadata EMPTY_METADATA = new MlMetadata(Collections.emptySortedMap(), Collections.emptySortedMap(), false);
public static final ParseField RESET_MODE = new ParseField("reset_mode");

public static final MlMetadata EMPTY_METADATA = new MlMetadata(
Collections.emptySortedMap(),
Collections.emptySortedMap(),
false,
false
);
// This parser follows the pattern that metadata is parsed leniently (to allow for enhancements)
public static final ObjectParser<Builder, Void> LENIENT_PARSER = new ObjectParser<>("ml_metadata", true, Builder::new);

Expand All @@ -60,19 +66,21 @@ public class MlMetadata implements Metadata.Custom {
LENIENT_PARSER.declareObjectArray(Builder::putDatafeeds,
(p, c) -> DatafeedConfig.LENIENT_PARSER.apply(p, c).build(), DATAFEEDS_FIELD);
LENIENT_PARSER.declareBoolean(Builder::isUpgradeMode, UPGRADE_MODE);

LENIENT_PARSER.declareBoolean(Builder::isResetMode, RESET_MODE);
}

private final SortedMap<String, Job> jobs;
private final SortedMap<String, DatafeedConfig> datafeeds;
private final boolean upgradeMode;
private final boolean resetMode;
private final GroupOrJobLookup groupOrJobLookup;

private MlMetadata(SortedMap<String, Job> jobs, SortedMap<String, DatafeedConfig> datafeeds, boolean upgradeMode) {
private MlMetadata(SortedMap<String, Job> jobs, SortedMap<String, DatafeedConfig> datafeeds, boolean upgradeMode, boolean resetMode) {
this.jobs = Collections.unmodifiableSortedMap(jobs);
this.datafeeds = Collections.unmodifiableSortedMap(datafeeds);
this.groupOrJobLookup = new GroupOrJobLookup(jobs.values());
this.upgradeMode = upgradeMode;
this.resetMode = resetMode;
}

public Map<String, Job> getJobs() {
Expand Down Expand Up @@ -104,6 +112,10 @@ public boolean isUpgradeMode() {
return upgradeMode;
}

public boolean isResetMode() {
return resetMode;
}

@Override
public Version getMinimalSupportedVersion() {
return Version.CURRENT.minimumIndexCompatibilityVersion();
Expand Down Expand Up @@ -139,13 +151,21 @@ public MlMetadata(StreamInput in) throws IOException {
this.datafeeds = datafeeds;
this.groupOrJobLookup = new GroupOrJobLookup(jobs.values());
this.upgradeMode = in.readBoolean();
if (in.getVersion().onOrAfter(Version.V_8_0_0)) {
this.resetMode = in.readBoolean();
} else {
this.resetMode = false;
}
}

@Override
public void writeTo(StreamOutput out) throws IOException {
writeMap(jobs, out);
writeMap(datafeeds, out);
out.writeBoolean(upgradeMode);
if (out.getVersion().onOrAfter(Version.V_8_0_0)) {
out.writeBoolean(resetMode);
}
}

private static <T extends Writeable> void writeMap(Map<String, T> map, StreamOutput out) throws IOException {
Expand All @@ -163,6 +183,7 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
mapValuesToXContent(JOBS_FIELD, jobs, builder, extendedParams);
mapValuesToXContent(DATAFEEDS_FIELD, datafeeds, builder, extendedParams);
builder.field(UPGRADE_MODE.getPreferredName(), upgradeMode);
builder.field(RESET_MODE.getPreferredName(), resetMode);
return builder;
}

Expand All @@ -184,11 +205,13 @@ public static class MlMetadataDiff implements NamedDiff<Metadata.Custom> {
final Diff<Map<String, Job>> jobs;
final Diff<Map<String, DatafeedConfig>> datafeeds;
final boolean upgradeMode;
final boolean resetMode;

MlMetadataDiff(MlMetadata before, MlMetadata after) {
this.jobs = DiffableUtils.diff(before.jobs, after.jobs, DiffableUtils.getStringKeySerializer());
this.datafeeds = DiffableUtils.diff(before.datafeeds, after.datafeeds, DiffableUtils.getStringKeySerializer());
this.upgradeMode = after.upgradeMode;
this.resetMode = after.resetMode;
}

public MlMetadataDiff(StreamInput in) throws IOException {
Expand All @@ -197,6 +220,11 @@ public MlMetadataDiff(StreamInput in) throws IOException {
this.datafeeds = DiffableUtils.readJdkMapDiff(in, DiffableUtils.getStringKeySerializer(), DatafeedConfig::new,
MlMetadataDiff::readDatafeedDiffFrom);
upgradeMode = in.readBoolean();
if (in.getVersion().onOrAfter(Version.V_8_0_0)) {
resetMode = in.readBoolean();
} else {
resetMode = false;
}
}

/**
Expand All @@ -208,14 +236,17 @@ public MlMetadataDiff(StreamInput in) throws IOException {
public Metadata.Custom apply(Metadata.Custom part) {
TreeMap<String, Job> newJobs = new TreeMap<>(jobs.apply(((MlMetadata) part).jobs));
TreeMap<String, DatafeedConfig> newDatafeeds = new TreeMap<>(datafeeds.apply(((MlMetadata) part).datafeeds));
return new MlMetadata(newJobs, newDatafeeds, upgradeMode);
return new MlMetadata(newJobs, newDatafeeds, upgradeMode, resetMode);
}

@Override
public void writeTo(StreamOutput out) throws IOException {
jobs.writeTo(out);
datafeeds.writeTo(out);
out.writeBoolean(upgradeMode);
if (out.getVersion().onOrAfter(Version.V_8_0_0)) {
out.writeBoolean(resetMode);
}
}

@Override
Expand All @@ -241,7 +272,8 @@ public boolean equals(Object o) {
MlMetadata that = (MlMetadata) o;
return Objects.equals(jobs, that.jobs) &&
Objects.equals(datafeeds, that.datafeeds) &&
Objects.equals(upgradeMode, that.upgradeMode);
upgradeMode == that.upgradeMode &&
resetMode == that.resetMode;
}

@Override
Expand All @@ -251,14 +283,19 @@ public final String toString() {

@Override
public int hashCode() {
return Objects.hash(jobs, datafeeds, upgradeMode);
return Objects.hash(jobs, datafeeds, upgradeMode, resetMode);
}

public static class Builder {

private TreeMap<String, Job> jobs;
private TreeMap<String, DatafeedConfig> datafeeds;
private boolean upgradeMode;
private boolean resetMode;

public static Builder from(@Nullable MlMetadata previous) {
return new Builder(previous);
}

public Builder() {
jobs = new TreeMap<>();
Expand Down Expand Up @@ -340,8 +377,13 @@ public Builder isUpgradeMode(boolean upgradeMode) {
return this;
}

public Builder isResetMode(boolean resetMode) {
this.resetMode = resetMode;
return this;
}

public MlMetadata build() {
return new MlMetadata(jobs, datafeeds, upgradeMode);
return new MlMetadata(jobs, datafeeds, upgradeMode, resetMode);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,44 +104,50 @@ public String getJobId() {
return jobId;
}

public void setJobId(String jobId) {
public Request setJobId(String jobId) {
this.jobId = jobId;
return this;
}

public TimeValue getCloseTimeout() {
return timeout;
}

public void setCloseTimeout(TimeValue timeout) {
public Request setCloseTimeout(TimeValue timeout) {
this.timeout = timeout;
return this;
}

public boolean isForce() {
return force;
}

public void setForce(boolean force) {
public Request setForce(boolean force) {
this.force = force;
return this;
}

public boolean allowNoMatch() {
return allowNoMatch;
}

public void setAllowNoMatch(boolean allowNoMatch) {
public Request setAllowNoMatch(boolean allowNoMatch) {
this.allowNoMatch = allowNoMatch;
return this;
}

public boolean isLocal() { return local; }

public void setLocal(boolean local) {
public Request setLocal(boolean local) {
this.local = local;
return this;
}

public String[] getOpenJobIds() { return openJobIds; }

public void setOpenJobIds(String [] openJobIds) {
public Request setOpenJobIds(String[] openJobIds) {
this.openJobIds = openJobIds;
return this;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
package org.elasticsearch.xpack.core.ml.action;

import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.ActionType;
import org.elasticsearch.action.support.master.AcknowledgedRequest;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.xcontent.ConstructingObjectParser;
import org.elasticsearch.common.xcontent.ToXContentObject;
import org.elasticsearch.common.xcontent.XContentBuilder;

import java.io.IOException;
import java.util.Objects;

public class SetResetModeAction extends ActionType<AcknowledgedResponse> {

public static final SetResetModeAction INSTANCE = new SetResetModeAction();
public static final String NAME = "cluster:admin/xpack/ml/reset_mode";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should use internal instead of admin here, to make absolutely clear that this is not intended to be called from outside the cluster. This is what our other actions that would be dangerous to call from outside the cluster have, e.g.:

public static final String NAME = "cluster:internal/xpack/ml/job/kill/process";


private SetResetModeAction() {
super(NAME, AcknowledgedResponse::readFrom);
}

public static class Request extends AcknowledgedRequest<Request> implements ToXContentObject {

public static Request enabled() {
return new Request(true);
}

public static Request disabled() {
return new Request(false);
}

private final boolean enabled;

private static final ParseField ENABLED = new ParseField("enabled");
public static final ConstructingObjectParser<Request, Void> PARSER =
new ConstructingObjectParser<>(NAME, a -> new Request((Boolean)a[0]));

static {
PARSER.declareBoolean(ConstructingObjectParser.constructorArg(), ENABLED);
}

Request(boolean enabled) {
this.enabled = enabled;
}

public Request(StreamInput in) throws IOException {
super(in);
this.enabled = in.readBoolean();
}

public boolean isEnabled() {
return enabled;
}

@Override
public ActionRequestValidationException validate() {
return null;
}

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeBoolean(enabled);
}

@Override
public int hashCode() {
return Objects.hash(enabled);
}

@Override
public boolean equals(Object obj) {
if (this == obj) {
return true;
}
if (obj == null || obj.getClass() != getClass()) {
return false;
}
Request other = (Request) obj;
return Objects.equals(enabled, other.enabled);
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
builder.field(ENABLED.getPreferredName(), enabled);
builder.endObject();
return builder;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -93,8 +93,9 @@ public Request() {
setTimeout(DEFAULT_TIMEOUT);
}

public final void setId(String id) {
public final Request setId(String id) {
this.id = ExceptionsHelper.requireNonNull(id, DataFrameAnalyticsConfig.ID);
return this;
}

public String getId() {
Expand All @@ -105,16 +106,18 @@ public boolean allowNoMatch() {
return allowNoMatch;
}

public void setAllowNoMatch(boolean allowNoMatch) {
public Request setAllowNoMatch(boolean allowNoMatch) {
this.allowNoMatch = allowNoMatch;
return this;
}

public boolean isForce() {
return force;
}

public void setForce(boolean force) {
public Request setForce(boolean force) {
this.force = force;
return this;
}

@Nullable
Expand Down
Loading