Skip to content

Commit

Permalink
[7.7][ML] DF Analytics should always display operational stats (#54210)…
Browse files Browse the repository at this point in the history
… (#54291)

This commit populates the _stats API response with sensible "empty"
`data_counts` and `memory_usage` objects when the job itself
has not started reporting them.

Backport of #54210
  • Loading branch information
dimitris-athanasiou authored Mar 26, 2020
1 parent f78c159 commit e6e27ff
Show file tree
Hide file tree
Showing 10 changed files with 119 additions and 30 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.elasticsearch.client.ml.dataframe.stats.common;

import org.elasticsearch.client.common.TimeUtil;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.inject.internal.ToStringBuilder;
import org.elasticsearch.common.xcontent.ConstructingObjectParser;
Expand All @@ -39,21 +40,23 @@ public class MemoryUsage implements ToXContentObject {
true, a -> new MemoryUsage((Instant) a[0], (long) a[1]));

static {
PARSER.declareField(ConstructingObjectParser.constructorArg(),
PARSER.declareField(ConstructingObjectParser.optionalConstructorArg(),
p -> TimeUtil.parseTimeFieldToInstant(p, TIMESTAMP.getPreferredName()),
TIMESTAMP,
ObjectParser.ValueType.VALUE);
PARSER.declareLong(ConstructingObjectParser.constructorArg(), PEAK_USAGE_BYTES);
}

@Nullable
private final Instant timestamp;
private final long peakUsageBytes;

public MemoryUsage(Instant timestamp, long peakUsageBytes) {
this.timestamp = Instant.ofEpochMilli(Objects.requireNonNull(timestamp).toEpochMilli());
public MemoryUsage(@Nullable Instant timestamp, long peakUsageBytes) {
this.timestamp = timestamp == null ? null : Instant.ofEpochMilli(Objects.requireNonNull(timestamp).toEpochMilli());
this.peakUsageBytes = peakUsageBytes;
}

@Nullable
public Instant getTimestamp() {
return timestamp;
}
Expand All @@ -65,7 +68,9 @@ public long getPeakUsageBytes() {
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
builder.timeField(TIMESTAMP.getPreferredName(), TIMESTAMP.getPreferredName() + "_string", timestamp.toEpochMilli());
if (timestamp != null) {
builder.timeField(TIMESTAMP.getPreferredName(), TIMESTAMP.getPreferredName() + "_string", timestamp.toEpochMilli());
}
builder.field(PEAK_USAGE_BYTES.getPreferredName(), peakUsageBytes);
builder.endObject();
return builder;
Expand All @@ -89,7 +94,7 @@ public int hashCode() {
@Override
public String toString() {
return new ToStringBuilder(getClass())
.add(TIMESTAMP.getPreferredName(), timestamp.getEpochSecond())
.add(TIMESTAMP.getPreferredName(), timestamp == null ? null : timestamp.getEpochSecond())
.add(PEAK_USAGE_BYTES.getPreferredName(), peakUsageBytes)
.toString();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,7 @@
import org.elasticsearch.client.ml.dataframe.evaluation.softclassification.RecallMetric;
import org.elasticsearch.client.ml.dataframe.explain.FieldSelection;
import org.elasticsearch.client.ml.dataframe.explain.MemoryEstimation;
import org.elasticsearch.client.ml.dataframe.stats.common.DataCounts;
import org.elasticsearch.client.ml.filestructurefinder.FileStructure;
import org.elasticsearch.client.ml.inference.InferenceToXContentCompressor;
import org.elasticsearch.client.ml.inference.MlInferenceNamedXContentProvider;
Expand Down Expand Up @@ -1561,7 +1562,8 @@ public void testGetDataFrameAnalyticsStats() throws Exception {
assertThat(progress.get(1), equalTo(new PhaseProgress("loading_data", 0)));
assertThat(progress.get(2), equalTo(new PhaseProgress("analyzing", 0)));
assertThat(progress.get(3), equalTo(new PhaseProgress("writing_results", 0)));
assertThat(stats.getMemoryUsage(), is(nullValue()));
assertThat(stats.getMemoryUsage().getPeakUsageBytes(), equalTo(0L));
assertThat(stats.getDataCounts(), equalTo(new DataCounts(0, 0, 0)));
}

public void testStartDataFrameAnalyticsConfig() throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
import java.io.IOException;
import java.time.Instant;

import static org.hamcrest.Matchers.equalTo;

public class MemoryUsageTests extends AbstractXContentTestCase<MemoryUsage> {

@Override
Expand All @@ -32,7 +34,7 @@ protected MemoryUsage createTestInstance() {
}

public static MemoryUsage createRandom() {
return new MemoryUsage(Instant.now(), randomNonNegativeLong());
return new MemoryUsage(randomBoolean() ? null : Instant.now(), randomNonNegativeLong());
}

@Override
Expand All @@ -44,4 +46,9 @@ protected MemoryUsage doParseInstance(XContentParser parser) throws IOException
protected boolean supportsUnknownFields() {
return true;
}

public void testToString_GivenNullTimestamp() {
MemoryUsage memoryUsage = new MemoryUsage(null, 42L);
assertThat(memoryUsage.toString(), equalTo("MemoryUsage[timestamp=null, peak_usage_bytes=42]"));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,8 @@
import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsConfig;
import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsState;
import org.elasticsearch.xpack.core.ml.dataframe.stats.AnalysisStats;
import org.elasticsearch.xpack.core.ml.dataframe.stats.common.DataCounts;
import org.elasticsearch.xpack.core.ml.dataframe.stats.MemoryUsage;
import org.elasticsearch.xpack.core.ml.dataframe.stats.common.DataCounts;
import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper;
import org.elasticsearch.xpack.core.ml.utils.PhaseProgress;

Expand Down Expand Up @@ -166,10 +166,8 @@ public static class Stats implements ToXContentObject, Writeable {
*/
private final List<PhaseProgress> progress;

@Nullable
private final DataCounts dataCounts;

@Nullable
private final MemoryUsage memoryUsage;

@Nullable
Expand All @@ -187,8 +185,8 @@ public Stats(String id, DataFrameAnalyticsState state, @Nullable String failureR
this.state = Objects.requireNonNull(state);
this.failureReason = failureReason;
this.progress = Objects.requireNonNull(progress);
this.dataCounts = dataCounts;
this.memoryUsage = memoryUsage;
this.dataCounts = dataCounts == null ? new DataCounts(id) : dataCounts;
this.memoryUsage = memoryUsage == null ? new MemoryUsage(id) : memoryUsage;
this.analysisStats = analysisStats;
this.node = node;
this.assignmentExplanation = assignmentExplanation;
Expand All @@ -204,12 +202,12 @@ public Stats(StreamInput in) throws IOException {
progress = in.readList(PhaseProgress::new);
}
if (in.getVersion().onOrAfter(Version.V_7_7_0)) {
dataCounts = in.readOptionalWriteable(DataCounts::new);
dataCounts = new DataCounts(in);
} else {
dataCounts = null;
}
if (in.getVersion().onOrAfter(Version.V_7_7_0)) {
memoryUsage = in.readOptionalWriteable(MemoryUsage::new);
memoryUsage = new MemoryUsage(in);
} else {
memoryUsage = null;
}
Expand Down Expand Up @@ -276,11 +274,14 @@ public DataCounts getDataCounts() {
return dataCounts;
}

@Nullable
public MemoryUsage getMemoryUsage() {
return memoryUsage;
}

public AnalysisStats getAnalysisStats() {
return analysisStats;
}

public DiscoveryNode getNode() {
return node;
}
Expand Down Expand Up @@ -308,12 +309,8 @@ public XContentBuilder toUnwrappedXContent(XContentBuilder builder) throws IOExc
if (progress != null) {
builder.field("progress", progress);
}
if (dataCounts != null) {
builder.field("data_counts", dataCounts);
}
if (memoryUsage != null) {
builder.field("memory_usage", memoryUsage);
}
builder.field("data_counts", dataCounts);
builder.field("memory_usage", memoryUsage);
if (analysisStats != null) {
builder.startObject("analysis_stats");
builder.field(analysisStats.getWriteableName(), analysisStats);
Expand Down Expand Up @@ -350,10 +347,10 @@ public void writeTo(StreamOutput out) throws IOException {
out.writeList(progress);
}
if (out.getVersion().onOrAfter(Version.V_7_7_0)) {
out.writeOptionalWriteable(dataCounts);
dataCounts.writeTo(out);
}
if (out.getVersion().onOrAfter(Version.V_7_7_0)) {
out.writeOptionalWriteable(memoryUsage);
memoryUsage.writeTo(out);
}
if (out.getVersion().onOrAfter(Version.V_7_7_0)) {
out.writeOptionalNamedWriteable(analysisStats);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,27 +46,38 @@ private static ConstructingObjectParser<MemoryUsage, Void> createParser(boolean
}

private final String jobId;
/**
* timestamp may only be null when we construct a zero usage object
*/
private final Instant timestamp;
private final long peakUsageBytes;

/**
* Creates a zero usage object
*/
public MemoryUsage(String jobId) {
this(jobId, null, 0);
}

public MemoryUsage(String jobId, Instant timestamp, long peakUsageBytes) {
this.jobId = Objects.requireNonNull(jobId);
// We intend to store this timestamp in millis granularity. Thus we're rounding here to ensure
// internal representation matches toXContent
this.timestamp = Instant.ofEpochMilli(ExceptionsHelper.requireNonNull(timestamp, Fields.TIMESTAMP).toEpochMilli());
this.timestamp = timestamp == null ? null : Instant.ofEpochMilli(
ExceptionsHelper.requireNonNull(timestamp, Fields.TIMESTAMP).toEpochMilli());
this.peakUsageBytes = peakUsageBytes;
}

public MemoryUsage(StreamInput in) throws IOException {
jobId = in.readString();
timestamp = in.readInstant();
timestamp = in.readOptionalInstant();
peakUsageBytes = in.readVLong();
}

@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeString(jobId);
out.writeInstant(timestamp);
out.writeOptionalInstant(timestamp);
out.writeVLong(peakUsageBytes);
}

Expand All @@ -77,7 +88,10 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
builder.field(Fields.TYPE.getPreferredName(), TYPE_VALUE);
builder.field(Fields.JOB_ID.getPreferredName(), jobId);
}
builder.timeField(Fields.TIMESTAMP.getPreferredName(), Fields.TIMESTAMP.getPreferredName() + "_string", timestamp.toEpochMilli());
if (timestamp != null) {
builder.timeField(Fields.TIMESTAMP.getPreferredName(), Fields.TIMESTAMP.getPreferredName() + "_string",
timestamp.toEpochMilli());
}
builder.field(PEAK_USAGE_BYTES.getPreferredName(), peakUsageBytes);
builder.endObject();
return builder;
Expand Down Expand Up @@ -105,6 +119,7 @@ public String toString() {
}

public String documentId(String jobId) {
assert timestamp != null;
return documentIdPrefix(jobId) + timestamp.toEpochMilli();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,10 @@ private static ConstructingObjectParser<DataCounts, Void> createParser(boolean i
private final long testDocsCount;
private final long skippedDocsCount;

public DataCounts(String jobId) {
this(jobId, 0, 0, 0);
}

public DataCounts(String jobId, long trainingDocsCount, long testDocsCount, long skippedDocsCount) {
this.jobId = Objects.requireNonNull(jobId);
this.trainingDocsCount = trainingDocsCount;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,19 +14,22 @@
import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsState;
import org.elasticsearch.xpack.core.ml.dataframe.stats.AnalysisStats;
import org.elasticsearch.xpack.core.ml.dataframe.stats.AnalysisStatsNamedWriteablesProvider;
import org.elasticsearch.xpack.core.ml.dataframe.stats.common.DataCounts;
import org.elasticsearch.xpack.core.ml.dataframe.stats.common.DataCountsTests;
import org.elasticsearch.xpack.core.ml.dataframe.stats.MemoryUsage;
import org.elasticsearch.xpack.core.ml.dataframe.stats.MemoryUsageTests;
import org.elasticsearch.xpack.core.ml.dataframe.stats.classification.ClassificationStatsTests;
import org.elasticsearch.xpack.core.ml.dataframe.stats.common.DataCounts;
import org.elasticsearch.xpack.core.ml.dataframe.stats.common.DataCountsTests;
import org.elasticsearch.xpack.core.ml.dataframe.stats.outlierdetection.OutlierDetectionStatsTests;
import org.elasticsearch.xpack.core.ml.dataframe.stats.regression.RegressionStatsTests;
import org.elasticsearch.xpack.core.ml.utils.PhaseProgress;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.stream.IntStream;

import static org.hamcrest.Matchers.equalTo;

public class GetDataFrameAnalyticsStatsActionResponseTests extends AbstractWireSerializingTestCase<Response> {

@Override
Expand Down Expand Up @@ -69,4 +72,20 @@ protected Response createTestInstance() {
protected Writeable.Reader<Response> instanceReader() {
return Response::new;
}

public void testStats_GivenNulls() {
Response.Stats stats = new Response.Stats(randomAlphaOfLength(10),
randomFrom(DataFrameAnalyticsState.values()),
null,
Collections.emptyList(),
null,
null,
null,
null,
null
);

assertThat(stats.getDataCounts(), equalTo(new DataCounts(stats.getId())));
assertThat(stats.getMemoryUsage(), equalTo(new MemoryUsage(stats.getId())));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
*/
package org.elasticsearch.xpack.core.ml.dataframe.stats;

import org.elasticsearch.common.Strings;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentParser;
Expand All @@ -16,6 +17,8 @@
import java.time.Instant;
import java.util.Collections;

import static org.hamcrest.Matchers.equalTo;

public class MemoryUsageTests extends AbstractSerializingTestCase<MemoryUsage> {

private boolean lenient;
Expand Down Expand Up @@ -53,4 +56,10 @@ protected Writeable.Reader<MemoryUsage> instanceReader() {
protected MemoryUsage createTestInstance() {
return createRandom();
}

public void testZeroUsage() {
MemoryUsage memoryUsage = new MemoryUsage("zero_usage_job");
String asJson = Strings.toString(memoryUsage);
assertThat(asJson, equalTo("{\"peak_usage_bytes\":0}"));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,10 @@
import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsState;
import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsTaskState;
import org.elasticsearch.xpack.core.ml.dataframe.stats.AnalysisStats;
import org.elasticsearch.xpack.core.ml.dataframe.stats.common.DataCounts;
import org.elasticsearch.xpack.core.ml.dataframe.stats.Fields;
import org.elasticsearch.xpack.core.ml.dataframe.stats.MemoryUsage;
import org.elasticsearch.xpack.core.ml.dataframe.stats.classification.ClassificationStats;
import org.elasticsearch.xpack.core.ml.dataframe.stats.common.DataCounts;
import org.elasticsearch.xpack.core.ml.dataframe.stats.outlierdetection.OutlierDetectionStats;
import org.elasticsearch.xpack.core.ml.dataframe.stats.regression.RegressionStats;
import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndex;
Expand Down Expand Up @@ -140,6 +140,7 @@ protected void doExecute(Task task, GetDataFrameAnalyticsStatsAction.Request req
runningTasksStatsResponse -> gatherStatsForStoppedTasks(request.getExpandedIds(), runningTasksStatsResponse,
ActionListener.wrap(
finalResponse -> {

// While finalResponse has all the stats objects we need, we should report the count
// from the get response
QueryPage<Stats> finalStats = new QueryPage<>(finalResponse.getResponse().results(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -902,6 +902,36 @@ setup:
- match: { data_frame_analytics.0.id: "foo-1" }
- match: { data_frame_analytics.0.state: "stopped" }

---
"Test get stats on newly created congig":

- do:
ml.put_data_frame_analytics:
id: "foo-1"
body: >
{
"source": {
"index": "index-source"
},
"dest": {
"index": "index-foo-1_dest"
},
"analysis": {"outlier_detection":{}}
}
- match: { id: "foo-1" }

- do:
ml.get_data_frame_analytics_stats:
id: "foo-1"
- match: { count: 1 }
- length: { data_frame_analytics: 1 }
- match: { data_frame_analytics.0.id: "foo-1" }
- match: { data_frame_analytics.0.state: "stopped" }
- match: { data_frame_analytics.0.data_counts.training_docs_count: 0 }
- match: { data_frame_analytics.0.data_counts.test_docs_count: 0 }
- match: { data_frame_analytics.0.data_counts.skipped_docs_count: 0 }
- match: { data_frame_analytics.0.memory_usage.peak_usage_bytes: 0 }

---
"Test delete given stopped config":

Expand Down

0 comments on commit e6e27ff

Please sign in to comment.