Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ML] Reimplement established model memory #35263

Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@
import org.elasticsearch.xpack.core.ml.action.PutDatafeedAction;
import org.elasticsearch.xpack.core.ml.action.PutFilterAction;
import org.elasticsearch.xpack.core.ml.action.PutJobAction;
import org.elasticsearch.xpack.core.ml.action.RefreshJobMemoryRequirementAction;
import org.elasticsearch.xpack.core.ml.action.RevertModelSnapshotAction;
import org.elasticsearch.xpack.core.ml.action.StartDatafeedAction;
import org.elasticsearch.xpack.core.ml.action.StopDatafeedAction;
Expand Down Expand Up @@ -231,6 +232,7 @@ public List<GenericAction> getClientActions() {
UpdateFilterAction.INSTANCE,
DeleteFilterAction.INSTANCE,
KillProcessAction.INSTANCE,
RefreshJobMemoryRequirementAction.INSTANCE,
GetBucketsAction.INSTANCE,
GetInfluencersAction.INSTANCE,
GetOverallBucketsAction.INSTANCE,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.core.ml.action;

import org.elasticsearch.action.Action;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.support.master.AcknowledgedRequest;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.action.support.master.MasterNodeOperationRequestBuilder;
import org.elasticsearch.client.ElasticsearchClient;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.xcontent.ObjectParser;
import org.elasticsearch.common.xcontent.ToXContentObject;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.xpack.core.ml.job.config.Job;

import java.io.IOException;
import java.util.Objects;

public class RefreshJobMemoryRequirementAction extends Action<RefreshJobMemoryRequirementAction.Request, AcknowledgedResponse,
RefreshJobMemoryRequirementAction.RequestBuilder> {

public static final RefreshJobMemoryRequirementAction INSTANCE = new RefreshJobMemoryRequirementAction();
public static final String NAME = "cluster:internal/xpack/ml/job/refresh_memory_requirement";

private RefreshJobMemoryRequirementAction() {
super(NAME);
}

@Override
public RequestBuilder newRequestBuilder(ElasticsearchClient client) {
return new RequestBuilder(client, this);
}

@Override
public AcknowledgedResponse newResponse() {
return new AcknowledgedResponse();
}

public static class Request extends AcknowledgedRequest<Request> implements ToXContentObject {

public static ObjectParser<Request, Void> PARSER = new ObjectParser<>(NAME, Request::new);

static {
PARSER.declareString(Request::setJobId, Job.ID);
}

public static Request parseRequest(String jobId, XContentParser parser) {
Request request = PARSER.apply(parser, null);
if (jobId != null) {
request.setJobId(jobId);
}
return request;
}

private String jobId;

public Request(String jobId) {

this.jobId = jobId;
}

public Request() {
}

public void setJobId(String jobId) {
this.jobId = jobId;
}

public String getJobId() {
return jobId;
}

@Override
public ActionRequestValidationException validate() {
return null;
}

@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
jobId = in.readString();
}

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeString(jobId);
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
builder.field(Job.ID.getPreferredName(), jobId);
builder.endObject();
return builder;
}

@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
Request request = (Request) o;
return Objects.equals(jobId, request.jobId);
}

@Override
public int hashCode() {
return Objects.hash(jobId);
}

@Override
public final String toString() {
return Strings.toString(this);
}
}

public static class RequestBuilder extends MasterNodeOperationRequestBuilder<Request, AcknowledgedResponse, RequestBuilder> {

public RequestBuilder(ElasticsearchClient client, RefreshJobMemoryRequirementAction action) {
super(client, action, new Request());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,7 @@ private static ObjectParser<Builder, Void> createParser(boolean ignoreUnknownFie
private final Date createTime;
private final Date finishedTime;
private final Date lastDataTime;
// TODO: Remove in 7.0
private final Long establishedModelMemory;
private final AnalysisConfig analysisConfig;
private final AnalysisLimits analysisLimits;
Expand Down Expand Up @@ -439,6 +440,7 @@ public Collection<String> allInputFields() {
* program code and stack.
* @return an estimate of the memory requirement of this job, in bytes
*/
// TODO: remove this method in 7.0
public long estimateMemoryFootprint() {
if (establishedModelMemory != null && establishedModelMemory > 0) {
return establishedModelMemory + PROCESS_MEMORY_OVERHEAD.getBytes();
Expand Down Expand Up @@ -658,6 +660,7 @@ public static class Builder implements Writeable, ToXContentObject {
private Date createTime;
private Date finishedTime;
private Date lastDataTime;
// TODO: remove in 7.0
private Long establishedModelMemory;
private ModelPlotConfig modelPlotConfig;
private Long renormalizationWindowDays;
Expand Down Expand Up @@ -1102,10 +1105,6 @@ private void validateGroups() {
public Job build(Date createTime) {
setCreateTime(createTime);
setJobVersion(Version.CURRENT);
// TODO: Maybe we _could_ accept a value for this supplied at create time - it would
// mean cloned jobs that hadn't been edited much would start with an accurate expected size.
// But on the other hand it would mean jobs that were cloned and then completely changed
// would start with a size that was completely wrong.
setEstablishedModelMemory(null);
return build();
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.core.ml.action;

import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.test.AbstractStreamableXContentTestCase;

public class RefreshJobMemoryRequirementActionRequestTests
extends AbstractStreamableXContentTestCase<RefreshJobMemoryRequirementAction.Request> {

@Override
protected RefreshJobMemoryRequirementAction.Request createTestInstance() {
return new RefreshJobMemoryRequirementAction.Request(randomAlphaOfLengthBetween(1, 20));
}

@Override
protected boolean supportsUnknownFields() {
return false;
}

@Override
protected RefreshJobMemoryRequirementAction.Request createBlankInstance() {
return new RefreshJobMemoryRequirementAction.Request();
}

@Override
protected RefreshJobMemoryRequirementAction.Request doParseInstance(XContentParser parser) {
return RefreshJobMemoryRequirementAction.Request.parseRequest(null, parser);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -561,7 +561,7 @@ public void testEstimateMemoryFootprint_GivenNoLimitAndNotEstablished() {
builder.setEstablishedModelMemory(0L);
}
assertEquals(ByteSizeUnit.MB.toBytes(AnalysisLimits.PRE_6_1_DEFAULT_MODEL_MEMORY_LIMIT_MB)
+ Job.PROCESS_MEMORY_OVERHEAD.getBytes(), builder.build().estimateMemoryFootprint());
+ Job.PROCESS_MEMORY_OVERHEAD.getBytes(), builder.build().estimateMemoryFootprint());
}

public void testEarliestValidTimestamp_GivenEmptyDataCounts() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@
import org.elasticsearch.xpack.core.ml.action.PutDatafeedAction;
import org.elasticsearch.xpack.core.ml.action.PutFilterAction;
import org.elasticsearch.xpack.core.ml.action.PutJobAction;
import org.elasticsearch.xpack.core.ml.action.RefreshJobMemoryRequirementAction;
import org.elasticsearch.xpack.core.ml.action.RevertModelSnapshotAction;
import org.elasticsearch.xpack.core.ml.action.StartDatafeedAction;
import org.elasticsearch.xpack.core.ml.action.StopDatafeedAction;
Expand Down Expand Up @@ -149,6 +150,7 @@
import org.elasticsearch.xpack.ml.action.TransportPutDatafeedAction;
import org.elasticsearch.xpack.ml.action.TransportPutFilterAction;
import org.elasticsearch.xpack.ml.action.TransportPutJobAction;
import org.elasticsearch.xpack.ml.action.TransportRefreshJobMemoryRequirementAction;
import org.elasticsearch.xpack.ml.action.TransportRevertModelSnapshotAction;
import org.elasticsearch.xpack.ml.action.TransportStartDatafeedAction;
import org.elasticsearch.xpack.ml.action.TransportStopDatafeedAction;
Expand Down Expand Up @@ -181,6 +183,7 @@
import org.elasticsearch.xpack.ml.job.process.normalizer.NormalizerFactory;
import org.elasticsearch.xpack.ml.job.process.normalizer.NormalizerProcessFactory;
import org.elasticsearch.xpack.ml.notifications.Auditor;
import org.elasticsearch.xpack.ml.process.MlMemoryTracker;
import org.elasticsearch.xpack.ml.process.NativeController;
import org.elasticsearch.xpack.ml.process.NativeControllerHolder;
import org.elasticsearch.xpack.ml.rest.RestDeleteExpiredDataAction;
Expand Down Expand Up @@ -278,6 +281,7 @@ public class MachineLearning extends Plugin implements ActionPlugin, AnalysisPlu

private final SetOnce<AutodetectProcessManager> autodetectProcessManager = new SetOnce<>();
private final SetOnce<DatafeedManager> datafeedManager = new SetOnce<>();
private final SetOnce<MlMemoryTracker> memoryTracker = new SetOnce<>();

public MachineLearning(Settings settings, Path configPath) {
this.settings = settings;
Expand All @@ -299,6 +303,7 @@ public List<Setting<?>> getSettings() {
MachineLearningField.MAX_MODEL_MEMORY_LIMIT,
MAX_LAZY_ML_NODES,
MAX_MACHINE_MEMORY_PERCENT,
MlMemoryTracker.ML_MEMORY_UPDATE_FREQUENCY,
AutodetectBuilder.DONT_PERSIST_MODEL_STATE_SETTING,
AutodetectBuilder.MAX_ANOMALY_RECORDS_SETTING,
AutodetectBuilder.MAX_ANOMALY_RECORDS_SETTING_DYNAMIC,
Expand Down Expand Up @@ -420,6 +425,8 @@ public Collection<Object> createComponents(Client client, ClusterService cluster
this.datafeedManager.set(datafeedManager);
MlLifeCycleService mlLifeCycleService = new MlLifeCycleService(environment, clusterService, datafeedManager,
autodetectProcessManager);
MlMemoryTracker memoryTracker = new MlMemoryTracker(settings, clusterService, threadPool, jobManager, jobResultsProvider);
this.memoryTracker.set(memoryTracker);

// This object's constructor attaches to the license state, so there's no need to retain another reference to it
new InvalidLicenseEnforcer(settings, getLicenseState(), threadPool, datafeedManager, autodetectProcessManager);
Expand All @@ -438,7 +445,8 @@ public Collection<Object> createComponents(Client client, ClusterService cluster
jobDataCountsPersister,
datafeedManager,
auditor,
new MlAssignmentNotifier(settings, auditor, clusterService)
new MlAssignmentNotifier(settings, auditor, clusterService),
memoryTracker
);
}

Expand All @@ -449,7 +457,8 @@ public List<PersistentTasksExecutor<?>> getPersistentTasksExecutor(ClusterServic
}

return Arrays.asList(
new TransportOpenJobAction.OpenJobPersistentTasksExecutor(settings, clusterService, autodetectProcessManager.get()),
new TransportOpenJobAction.OpenJobPersistentTasksExecutor(settings, clusterService, autodetectProcessManager.get(),
memoryTracker.get()),
new TransportStartDatafeedAction.StartDatafeedPersistentTasksExecutor(settings, datafeedManager.get())
);
}
Expand Down Expand Up @@ -543,6 +552,7 @@ public List<RestHandler> getRestHandlers(Settings settings, RestController restC
new ActionHandler<>(UpdateFilterAction.INSTANCE, TransportUpdateFilterAction.class),
new ActionHandler<>(DeleteFilterAction.INSTANCE, TransportDeleteFilterAction.class),
new ActionHandler<>(KillProcessAction.INSTANCE, TransportKillProcessAction.class),
new ActionHandler<>(RefreshJobMemoryRequirementAction.INSTANCE, TransportRefreshJobMemoryRequirementAction.class),
new ActionHandler<>(GetBucketsAction.INSTANCE, TransportGetBucketsAction.class),
new ActionHandler<>(GetInfluencersAction.INSTANCE, TransportGetInfluencersAction.class),
new ActionHandler<>(GetOverallBucketsAction.INSTANCE, TransportGetOverallBucketsAction.class),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@
import org.elasticsearch.xpack.ml.job.persistence.JobDataDeleter;
import org.elasticsearch.xpack.ml.job.persistence.JobResultsProvider;
import org.elasticsearch.xpack.ml.notifications.Auditor;
import org.elasticsearch.xpack.ml.process.MlMemoryTracker;
import org.elasticsearch.xpack.ml.utils.MlIndicesUtils;

import java.util.ArrayList;
Expand All @@ -94,6 +95,7 @@ public class TransportDeleteJobAction extends TransportMasterNodeAction<DeleteJo
private final JobResultsProvider jobResultsProvider;
private final JobConfigProvider jobConfigProvider;
private final DatafeedConfigProvider datafeedConfigProvider;
private final MlMemoryTracker memoryTracker;

/**
* A map of task listeners by job_id.
Expand All @@ -108,7 +110,8 @@ public TransportDeleteJobAction(Settings settings, TransportService transportSer
ThreadPool threadPool, ActionFilters actionFilters,
IndexNameExpressionResolver indexNameExpressionResolver, PersistentTasksService persistentTasksService,
Client client, Auditor auditor, JobResultsProvider jobResultsProvider,
JobConfigProvider jobConfigProvider, DatafeedConfigProvider datafeedConfigProvider) {
JobConfigProvider jobConfigProvider, DatafeedConfigProvider datafeedConfigProvider,
MlMemoryTracker memoryTracker) {
super(settings, DeleteJobAction.NAME, transportService, clusterService, threadPool, actionFilters,
indexNameExpressionResolver, DeleteJobAction.Request::new);
this.client = client;
Expand All @@ -117,6 +120,7 @@ public TransportDeleteJobAction(Settings settings, TransportService transportSer
this.jobResultsProvider = jobResultsProvider;
this.jobConfigProvider = jobConfigProvider;
this.datafeedConfigProvider = datafeedConfigProvider;
this.memoryTracker = memoryTracker;
this.listenersByJobId = new HashMap<>();
}

Expand Down Expand Up @@ -210,6 +214,7 @@ private void notifyListeners(String jobId, @Nullable AcknowledgedResponse ack, @
private void normalDeleteJob(ParentTaskAssigningClient parentTaskClient, DeleteJobAction.Request request,
ActionListener<AcknowledgedResponse> listener) {
String jobId = request.getJobId();
memoryTracker.removeJob(jobId);
droberts195 marked this conversation as resolved.
Show resolved Hide resolved

// Step 4. When the job has been removed from the cluster state, return a response
// -------
Expand Down
Loading