Skip to content

Commit

Permalink
[ML] Reimplement established model memory (#35500)
Browse files Browse the repository at this point in the history
This is the 7.0 implementation of a master node service to
keep track of the native process memory requirement of each ML
job with an associated native process.

The new ML memory tracker service works when the whole cluster
is upgraded to at least version 6.6. For mixed version clusters
the old mechanism of established model memory stored on the job
in cluster state was used. This means that the old (and complex)
code to keep established model memory up to date on the job object
has been removed in 7.0.

Forward port of #35263
  • Loading branch information
droberts195 authored Nov 14, 2018
1 parent ce64484 commit 0c343ca
Show file tree
Hide file tree
Showing 29 changed files with 919 additions and 557 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,6 @@ public class Job implements ToXContentObject {
public static final ParseField DATA_DESCRIPTION = new ParseField("data_description");
public static final ParseField DESCRIPTION = new ParseField("description");
public static final ParseField FINISHED_TIME = new ParseField("finished_time");
public static final ParseField ESTABLISHED_MODEL_MEMORY = new ParseField("established_model_memory");
public static final ParseField MODEL_PLOT_CONFIG = new ParseField("model_plot_config");
public static final ParseField RENORMALIZATION_WINDOW_DAYS = new ParseField("renormalization_window_days");
public static final ParseField BACKGROUND_PERSIST_INTERVAL = new ParseField("background_persist_interval");
Expand All @@ -82,7 +81,6 @@ public class Job implements ToXContentObject {
(p) -> TimeUtil.parseTimeField(p, FINISHED_TIME.getPreferredName()),
FINISHED_TIME,
ValueType.VALUE);
PARSER.declareLong(Builder::setEstablishedModelMemory, ESTABLISHED_MODEL_MEMORY);
PARSER.declareObject(Builder::setAnalysisConfig, AnalysisConfig.PARSER, ANALYSIS_CONFIG);
PARSER.declareObject(Builder::setAnalysisLimits, AnalysisLimits.PARSER, ANALYSIS_LIMITS);
PARSER.declareObject(Builder::setDataDescription, DataDescription.PARSER, DATA_DESCRIPTION);
Expand All @@ -105,7 +103,6 @@ public class Job implements ToXContentObject {
private final String description;
private final Date createTime;
private final Date finishedTime;
private final Long establishedModelMemory;
private final AnalysisConfig analysisConfig;
private final AnalysisLimits analysisLimits;
private final DataDescription dataDescription;
Expand All @@ -120,7 +117,7 @@ public class Job implements ToXContentObject {
private final Boolean deleting;

private Job(String jobId, String jobType, List<String> groups, String description,
Date createTime, Date finishedTime, Long establishedModelMemory,
Date createTime, Date finishedTime,
AnalysisConfig analysisConfig, AnalysisLimits analysisLimits, DataDescription dataDescription,
ModelPlotConfig modelPlotConfig, Long renormalizationWindowDays, TimeValue backgroundPersistInterval,
Long modelSnapshotRetentionDays, Long resultsRetentionDays, Map<String, Object> customSettings,
Expand All @@ -132,7 +129,6 @@ private Job(String jobId, String jobType, List<String> groups, String descriptio
this.description = description;
this.createTime = createTime;
this.finishedTime = finishedTime;
this.establishedModelMemory = establishedModelMemory;
this.analysisConfig = analysisConfig;
this.analysisLimits = analysisLimits;
this.dataDescription = dataDescription;
Expand Down Expand Up @@ -202,16 +198,6 @@ public Date getFinishedTime() {
return finishedTime;
}

/**
* The established model memory of the job, or <code>null</code> if model
* memory has not reached equilibrium yet.
*
* @return The established model memory of the job
*/
public Long getEstablishedModelMemory() {
return establishedModelMemory;
}

/**
* The analysis configuration object
*
Expand Down Expand Up @@ -304,9 +290,6 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
builder.timeField(FINISHED_TIME.getPreferredName(), FINISHED_TIME.getPreferredName() + humanReadableSuffix,
finishedTime.getTime());
}
if (establishedModelMemory != null) {
builder.field(ESTABLISHED_MODEL_MEMORY.getPreferredName(), establishedModelMemory);
}
builder.field(ANALYSIS_CONFIG.getPreferredName(), analysisConfig, params);
if (analysisLimits != null) {
builder.field(ANALYSIS_LIMITS.getPreferredName(), analysisLimits, params);
Expand Down Expand Up @@ -362,7 +345,6 @@ public boolean equals(Object other) {
&& Objects.equals(this.description, that.description)
&& Objects.equals(this.createTime, that.createTime)
&& Objects.equals(this.finishedTime, that.finishedTime)
&& Objects.equals(this.establishedModelMemory, that.establishedModelMemory)
&& Objects.equals(this.analysisConfig, that.analysisConfig)
&& Objects.equals(this.analysisLimits, that.analysisLimits)
&& Objects.equals(this.dataDescription, that.dataDescription)
Expand All @@ -379,7 +361,7 @@ public boolean equals(Object other) {

@Override
public int hashCode() {
return Objects.hash(jobId, jobType, groups, description, createTime, finishedTime, establishedModelMemory,
return Objects.hash(jobId, jobType, groups, description, createTime, finishedTime,
analysisConfig, analysisLimits, dataDescription, modelPlotConfig, renormalizationWindowDays,
backgroundPersistInterval, modelSnapshotRetentionDays, resultsRetentionDays, customSettings,
modelSnapshotId, resultsIndexName, deleting);
Expand All @@ -405,7 +387,6 @@ public static class Builder {
private DataDescription dataDescription;
private Date createTime;
private Date finishedTime;
private Long establishedModelMemory;
private ModelPlotConfig modelPlotConfig;
private Long renormalizationWindowDays;
private TimeValue backgroundPersistInterval;
Expand Down Expand Up @@ -433,7 +414,6 @@ public Builder(Job job) {
this.dataDescription = job.getDataDescription();
this.createTime = job.getCreateTime();
this.finishedTime = job.getFinishedTime();
this.establishedModelMemory = job.getEstablishedModelMemory();
this.modelPlotConfig = job.getModelPlotConfig();
this.renormalizationWindowDays = job.getRenormalizationWindowDays();
this.backgroundPersistInterval = job.getBackgroundPersistInterval();
Expand Down Expand Up @@ -494,11 +474,6 @@ Builder setFinishedTime(Date finishedTime) {
return this;
}

public Builder setEstablishedModelMemory(Long establishedModelMemory) {
this.establishedModelMemory = establishedModelMemory;
return this;
}

public Builder setDataDescription(DataDescription.Builder description) {
dataDescription = Objects.requireNonNull(description, DATA_DESCRIPTION.getPreferredName()).build();
return this;
Expand Down Expand Up @@ -553,7 +528,7 @@ public Job build() {
Objects.requireNonNull(id, "[" + ID.getPreferredName() + "] must not be null");
Objects.requireNonNull(jobType, "[" + JOB_TYPE.getPreferredName() + "] must not be null");
return new Job(
id, jobType, groups, description, createTime, finishedTime, establishedModelMemory,
id, jobType, groups, description, createTime, finishedTime,
analysisConfig, analysisLimits, dataDescription, modelPlotConfig, renormalizationWindowDays,
backgroundPersistInterval, modelSnapshotRetentionDays, resultsRetentionDays, customSettings,
modelSnapshotId, resultsIndexName, deleting);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,9 +125,6 @@ public static Job.Builder createRandomizedJobBuilder() {
if (randomBoolean()) {
builder.setFinishedTime(new Date(randomNonNegativeLong()));
}
if (randomBoolean()) {
builder.setEstablishedModelMemory(randomNonNegativeLong());
}
builder.setAnalysisConfig(AnalysisConfigTests.createRandomized());
builder.setAnalysisLimits(AnalysisLimitsTests.createRandomized());

Expand Down
5 changes: 0 additions & 5 deletions docs/reference/ml/apis/jobresource.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -42,11 +42,6 @@ so do not set the `background_persist_interval` value too low.
`description`::
(string) An optional description of the job.

`established_model_memory`::
(long) The approximate amount of memory resources that have been used for
analytical processing. This field is present only when the analytics have used
a stable amount of memory for several consecutive buckets.

`finished_time`::
(string) If the job closed or failed, this is the time the job finished,
otherwise it is `null`. This property is informational; you cannot change its
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,24 +57,28 @@ public class MlMetadata implements XPackPlugin.XPackMetaDataCustom {
public static final String TYPE = "ml";
private static final ParseField JOBS_FIELD = new ParseField("jobs");
private static final ParseField DATAFEEDS_FIELD = new ParseField("datafeeds");
private static final ParseField LAST_MEMORY_REFRESH_VERSION_FIELD = new ParseField("last_memory_refresh_version");

public static final MlMetadata EMPTY_METADATA = new MlMetadata(Collections.emptySortedMap(), Collections.emptySortedMap());
public static final MlMetadata EMPTY_METADATA = new MlMetadata(Collections.emptySortedMap(), Collections.emptySortedMap(), null);
// This parser follows the pattern that metadata is parsed leniently (to allow for enhancements)
public static final ObjectParser<Builder, Void> LENIENT_PARSER = new ObjectParser<>("ml_metadata", true, Builder::new);

static {
LENIENT_PARSER.declareObjectArray(Builder::putJobs, (p, c) -> Job.LENIENT_PARSER.apply(p, c).build(), JOBS_FIELD);
LENIENT_PARSER.declareObjectArray(Builder::putDatafeeds,
(p, c) -> DatafeedConfig.LENIENT_PARSER.apply(p, c).build(), DATAFEEDS_FIELD);
LENIENT_PARSER.declareLong(Builder::setLastMemoryRefreshVersion, LAST_MEMORY_REFRESH_VERSION_FIELD);
}

private final SortedMap<String, Job> jobs;
private final SortedMap<String, DatafeedConfig> datafeeds;
private final Long lastMemoryRefreshVersion;
private final GroupOrJobLookup groupOrJobLookup;

private MlMetadata(SortedMap<String, Job> jobs, SortedMap<String, DatafeedConfig> datafeeds) {
private MlMetadata(SortedMap<String, Job> jobs, SortedMap<String, DatafeedConfig> datafeeds, Long lastMemoryRefreshVersion) {
this.jobs = Collections.unmodifiableSortedMap(jobs);
this.datafeeds = Collections.unmodifiableSortedMap(datafeeds);
this.lastMemoryRefreshVersion = lastMemoryRefreshVersion;
this.groupOrJobLookup = new GroupOrJobLookup(jobs.values());
}

Expand Down Expand Up @@ -112,6 +116,10 @@ public Set<String> expandDatafeedIds(String expression, boolean allowNoDatafeeds
.expand(expression, allowNoDatafeeds);
}

public Long getLastMemoryRefreshVersion() {
return lastMemoryRefreshVersion;
}

@Override
public Version getMinimalSupportedVersion() {
return Version.V_6_0_0_alpha1;
Expand Down Expand Up @@ -145,14 +153,21 @@ public MlMetadata(StreamInput in) throws IOException {
datafeeds.put(in.readString(), new DatafeedConfig(in));
}
this.datafeeds = datafeeds;

if (in.getVersion().onOrAfter(Version.V_6_6_0)) {
lastMemoryRefreshVersion = in.readOptionalLong();
} else {
lastMemoryRefreshVersion = null;
}
this.groupOrJobLookup = new GroupOrJobLookup(jobs.values());
}

@Override
public void writeTo(StreamOutput out) throws IOException {
writeMap(jobs, out);
writeMap(datafeeds, out);
if (out.getVersion().onOrAfter(Version.V_6_6_0)) {
out.writeOptionalLong(lastMemoryRefreshVersion);
}
}

private static <T extends Writeable> void writeMap(Map<String, T> map, StreamOutput out) throws IOException {
Expand All @@ -169,6 +184,9 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
new DelegatingMapParams(Collections.singletonMap(ToXContentParams.FOR_INTERNAL_STORAGE, "true"), params);
mapValuesToXContent(JOBS_FIELD, jobs, builder, extendedParams);
mapValuesToXContent(DATAFEEDS_FIELD, datafeeds, builder, extendedParams);
if (lastMemoryRefreshVersion != null) {
builder.field(LAST_MEMORY_REFRESH_VERSION_FIELD.getPreferredName(), lastMemoryRefreshVersion);
}
return builder;
}

Expand All @@ -185,30 +203,46 @@ public static class MlMetadataDiff implements NamedDiff<MetaData.Custom> {

final Diff<Map<String, Job>> jobs;
final Diff<Map<String, DatafeedConfig>> datafeeds;
final Long lastMemoryRefreshVersion;

MlMetadataDiff(MlMetadata before, MlMetadata after) {
this.jobs = DiffableUtils.diff(before.jobs, after.jobs, DiffableUtils.getStringKeySerializer());
this.datafeeds = DiffableUtils.diff(before.datafeeds, after.datafeeds, DiffableUtils.getStringKeySerializer());
this.lastMemoryRefreshVersion = after.lastMemoryRefreshVersion;
}

public MlMetadataDiff(StreamInput in) throws IOException {
this.jobs = DiffableUtils.readJdkMapDiff(in, DiffableUtils.getStringKeySerializer(), Job::new,
MlMetadataDiff::readJobDiffFrom);
this.datafeeds = DiffableUtils.readJdkMapDiff(in, DiffableUtils.getStringKeySerializer(), DatafeedConfig::new,
MlMetadataDiff::readSchedulerDiffFrom);
MlMetadataDiff::readDatafeedDiffFrom);
if (in.getVersion().onOrAfter(Version.V_6_6_0)) {
lastMemoryRefreshVersion = in.readOptionalLong();
} else {
lastMemoryRefreshVersion = null;
}
}

/**
* Merge the diff with the ML metadata.
* @param part The current ML metadata.
* @return The new ML metadata.
*/
@Override
public MetaData.Custom apply(MetaData.Custom part) {
TreeMap<String, Job> newJobs = new TreeMap<>(jobs.apply(((MlMetadata) part).jobs));
TreeMap<String, DatafeedConfig> newDatafeeds = new TreeMap<>(datafeeds.apply(((MlMetadata) part).datafeeds));
return new MlMetadata(newJobs, newDatafeeds);
// lastMemoryRefreshVersion always comes from the diff - no need to merge with the old value
return new MlMetadata(newJobs, newDatafeeds, lastMemoryRefreshVersion);
}

@Override
public void writeTo(StreamOutput out) throws IOException {
jobs.writeTo(out);
datafeeds.writeTo(out);
if (out.getVersion().onOrAfter(Version.V_6_6_0)) {
out.writeOptionalLong(lastMemoryRefreshVersion);
}
}

@Override
Expand All @@ -220,7 +254,7 @@ static Diff<Job> readJobDiffFrom(StreamInput in) throws IOException {
return AbstractDiffable.readDiffFrom(Job::new, in);
}

static Diff<DatafeedConfig> readSchedulerDiffFrom(StreamInput in) throws IOException {
static Diff<DatafeedConfig> readDatafeedDiffFrom(StreamInput in) throws IOException {
return AbstractDiffable.readDiffFrom(DatafeedConfig::new, in);
}
}
Expand All @@ -233,7 +267,8 @@ public boolean equals(Object o) {
return false;
MlMetadata that = (MlMetadata) o;
return Objects.equals(jobs, that.jobs) &&
Objects.equals(datafeeds, that.datafeeds);
Objects.equals(datafeeds, that.datafeeds) &&
Objects.equals(lastMemoryRefreshVersion, that.lastMemoryRefreshVersion);
}

@Override
Expand All @@ -243,13 +278,14 @@ public final String toString() {

@Override
public int hashCode() {
return Objects.hash(jobs, datafeeds);
return Objects.hash(jobs, datafeeds, lastMemoryRefreshVersion);
}

public static class Builder {

private TreeMap<String, Job> jobs;
private TreeMap<String, DatafeedConfig> datafeeds;
private Long lastMemoryRefreshVersion;

public Builder() {
jobs = new TreeMap<>();
Expand All @@ -263,6 +299,7 @@ public Builder(@Nullable MlMetadata previous) {
} else {
jobs = new TreeMap<>(previous.jobs);
datafeeds = new TreeMap<>(previous.datafeeds);
lastMemoryRefreshVersion = previous.lastMemoryRefreshVersion;
}
}

Expand Down Expand Up @@ -382,8 +419,13 @@ private Builder putDatafeeds(Collection<DatafeedConfig> datafeeds) {
return this;
}

public Builder setLastMemoryRefreshVersion(Long lastMemoryRefreshVersion) {
this.lastMemoryRefreshVersion = lastMemoryRefreshVersion;
return this;
}

public MlMetadata build() {
return new MlMetadata(jobs, datafeeds);
return new MlMetadata(jobs, datafeeds, lastMemoryRefreshVersion);
}

public void markJobAsDeleting(String jobId, PersistentTasksCustomMetaData tasks, boolean allowDeleteOpenJob) {
Expand Down Expand Up @@ -420,8 +462,6 @@ void checkJobHasNoDatafeed(String jobId) {
}
}



public static MlMetadata getMlMetadata(ClusterState state) {
MlMetadata mlMetadata = (state == null) ? null : state.getMetaData().custom(TYPE);
if (mlMetadata == null) {
Expand Down
Loading

0 comments on commit 0c343ca

Please sign in to comment.