diff --git a/client/rest-high-level/src/main/java/org/elasticsearch/client/ml/dataframe/stats/common/MemoryUsage.java b/client/rest-high-level/src/main/java/org/elasticsearch/client/ml/dataframe/stats/common/MemoryUsage.java index f492d26528e02..e8f4c9b256eae 100644 --- a/client/rest-high-level/src/main/java/org/elasticsearch/client/ml/dataframe/stats/common/MemoryUsage.java +++ b/client/rest-high-level/src/main/java/org/elasticsearch/client/ml/dataframe/stats/common/MemoryUsage.java @@ -19,6 +19,7 @@ package org.elasticsearch.client.ml.dataframe.stats.common; import org.elasticsearch.client.common.TimeUtil; +import org.elasticsearch.common.Nullable; import org.elasticsearch.common.ParseField; import org.elasticsearch.common.inject.internal.ToStringBuilder; import org.elasticsearch.common.xcontent.ConstructingObjectParser; @@ -39,21 +40,23 @@ public class MemoryUsage implements ToXContentObject { true, a -> new MemoryUsage((Instant) a[0], (long) a[1])); static { - PARSER.declareField(ConstructingObjectParser.constructorArg(), + PARSER.declareField(ConstructingObjectParser.optionalConstructorArg(), p -> TimeUtil.parseTimeFieldToInstant(p, TIMESTAMP.getPreferredName()), TIMESTAMP, ObjectParser.ValueType.VALUE); PARSER.declareLong(ConstructingObjectParser.constructorArg(), PEAK_USAGE_BYTES); } + @Nullable private final Instant timestamp; private final long peakUsageBytes; - public MemoryUsage(Instant timestamp, long peakUsageBytes) { - this.timestamp = Instant.ofEpochMilli(Objects.requireNonNull(timestamp).toEpochMilli()); + public MemoryUsage(@Nullable Instant timestamp, long peakUsageBytes) { + this.timestamp = timestamp == null ? null : Instant.ofEpochMilli(Objects.requireNonNull(timestamp).toEpochMilli()); this.peakUsageBytes = peakUsageBytes; } + @Nullable public Instant getTimestamp() { return timestamp; } @@ -65,7 +68,9 @@ public long getPeakUsageBytes() { @Override public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { builder.startObject(); - builder.timeField(TIMESTAMP.getPreferredName(), TIMESTAMP.getPreferredName() + "_string", timestamp.toEpochMilli()); + if (timestamp != null) { + builder.timeField(TIMESTAMP.getPreferredName(), TIMESTAMP.getPreferredName() + "_string", timestamp.toEpochMilli()); + } builder.field(PEAK_USAGE_BYTES.getPreferredName(), peakUsageBytes); builder.endObject(); return builder; @@ -89,7 +94,7 @@ public int hashCode() { @Override public String toString() { return new ToStringBuilder(getClass()) - .add(TIMESTAMP.getPreferredName(), timestamp.getEpochSecond()) + .add(TIMESTAMP.getPreferredName(), timestamp == null ? null : timestamp.getEpochSecond()) .add(PEAK_USAGE_BYTES.getPreferredName(), peakUsageBytes) .toString(); } diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/MachineLearningIT.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/MachineLearningIT.java index a3fe5c69ee01e..d1b8188b1003e 100644 --- a/client/rest-high-level/src/test/java/org/elasticsearch/client/MachineLearningIT.java +++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/MachineLearningIT.java @@ -149,6 +149,7 @@ import org.elasticsearch.client.ml.dataframe.evaluation.softclassification.RecallMetric; import org.elasticsearch.client.ml.dataframe.explain.FieldSelection; import org.elasticsearch.client.ml.dataframe.explain.MemoryEstimation; +import org.elasticsearch.client.ml.dataframe.stats.common.DataCounts; import org.elasticsearch.client.ml.filestructurefinder.FileStructure; import org.elasticsearch.client.ml.inference.InferenceToXContentCompressor; import org.elasticsearch.client.ml.inference.MlInferenceNamedXContentProvider; @@ -1561,7 +1562,8 @@ public void testGetDataFrameAnalyticsStats() throws Exception { assertThat(progress.get(1), equalTo(new PhaseProgress("loading_data", 0))); assertThat(progress.get(2), equalTo(new PhaseProgress("analyzing", 0))); assertThat(progress.get(3), equalTo(new PhaseProgress("writing_results", 0))); - assertThat(stats.getMemoryUsage(), is(nullValue())); + assertThat(stats.getMemoryUsage().getPeakUsageBytes(), equalTo(0L)); + assertThat(stats.getDataCounts(), equalTo(new DataCounts(0, 0, 0))); } public void testStartDataFrameAnalyticsConfig() throws Exception { diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/ml/dataframe/stats/common/MemoryUsageTests.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/ml/dataframe/stats/common/MemoryUsageTests.java index 0e27295752190..a8112ac31214c 100644 --- a/client/rest-high-level/src/test/java/org/elasticsearch/client/ml/dataframe/stats/common/MemoryUsageTests.java +++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/ml/dataframe/stats/common/MemoryUsageTests.java @@ -24,6 +24,8 @@ import java.io.IOException; import java.time.Instant; +import static org.hamcrest.Matchers.equalTo; + public class MemoryUsageTests extends AbstractXContentTestCase { @Override @@ -32,7 +34,7 @@ protected MemoryUsage createTestInstance() { } public static MemoryUsage createRandom() { - return new MemoryUsage(Instant.now(), randomNonNegativeLong()); + return new MemoryUsage(randomBoolean() ? null : Instant.now(), randomNonNegativeLong()); } @Override @@ -44,4 +46,9 @@ protected MemoryUsage doParseInstance(XContentParser parser) throws IOException protected boolean supportsUnknownFields() { return true; } + + public void testToString_GivenNullTimestamp() { + MemoryUsage memoryUsage = new MemoryUsage(null, 42L); + assertThat(memoryUsage.toString(), equalTo("MemoryUsage[timestamp=null, peak_usage_bytes=42]")); + } } diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/GetDataFrameAnalyticsStatsAction.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/GetDataFrameAnalyticsStatsAction.java index e37ccbbefc5b1..94cdfc0f4a1e3 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/GetDataFrameAnalyticsStatsAction.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/GetDataFrameAnalyticsStatsAction.java @@ -29,8 +29,8 @@ import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsConfig; import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsState; import org.elasticsearch.xpack.core.ml.dataframe.stats.AnalysisStats; -import org.elasticsearch.xpack.core.ml.dataframe.stats.common.DataCounts; import org.elasticsearch.xpack.core.ml.dataframe.stats.MemoryUsage; +import org.elasticsearch.xpack.core.ml.dataframe.stats.common.DataCounts; import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper; import org.elasticsearch.xpack.core.ml.utils.PhaseProgress; @@ -166,10 +166,8 @@ public static class Stats implements ToXContentObject, Writeable { */ private final List progress; - @Nullable private final DataCounts dataCounts; - @Nullable private final MemoryUsage memoryUsage; @Nullable @@ -187,8 +185,8 @@ public Stats(String id, DataFrameAnalyticsState state, @Nullable String failureR this.state = Objects.requireNonNull(state); this.failureReason = failureReason; this.progress = Objects.requireNonNull(progress); - this.dataCounts = dataCounts; - this.memoryUsage = memoryUsage; + this.dataCounts = dataCounts == null ? new DataCounts(id) : dataCounts; + this.memoryUsage = memoryUsage == null ? new MemoryUsage(id) : memoryUsage; this.analysisStats = analysisStats; this.node = node; this.assignmentExplanation = assignmentExplanation; @@ -204,12 +202,12 @@ public Stats(StreamInput in) throws IOException { progress = in.readList(PhaseProgress::new); } if (in.getVersion().onOrAfter(Version.V_7_7_0)) { - dataCounts = in.readOptionalWriteable(DataCounts::new); + dataCounts = new DataCounts(in); } else { dataCounts = null; } if (in.getVersion().onOrAfter(Version.V_7_7_0)) { - memoryUsage = in.readOptionalWriteable(MemoryUsage::new); + memoryUsage = new MemoryUsage(in); } else { memoryUsage = null; } @@ -276,11 +274,14 @@ public DataCounts getDataCounts() { return dataCounts; } - @Nullable public MemoryUsage getMemoryUsage() { return memoryUsage; } + public AnalysisStats getAnalysisStats() { + return analysisStats; + } + public DiscoveryNode getNode() { return node; } @@ -308,12 +309,8 @@ public XContentBuilder toUnwrappedXContent(XContentBuilder builder) throws IOExc if (progress != null) { builder.field("progress", progress); } - if (dataCounts != null) { - builder.field("data_counts", dataCounts); - } - if (memoryUsage != null) { - builder.field("memory_usage", memoryUsage); - } + builder.field("data_counts", dataCounts); + builder.field("memory_usage", memoryUsage); if (analysisStats != null) { builder.startObject("analysis_stats"); builder.field(analysisStats.getWriteableName(), analysisStats); @@ -350,10 +347,10 @@ public void writeTo(StreamOutput out) throws IOException { out.writeList(progress); } if (out.getVersion().onOrAfter(Version.V_7_7_0)) { - out.writeOptionalWriteable(dataCounts); + dataCounts.writeTo(out); } if (out.getVersion().onOrAfter(Version.V_7_7_0)) { - out.writeOptionalWriteable(memoryUsage); + memoryUsage.writeTo(out); } if (out.getVersion().onOrAfter(Version.V_7_7_0)) { out.writeOptionalNamedWriteable(analysisStats); diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/dataframe/stats/MemoryUsage.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/dataframe/stats/MemoryUsage.java index b672c3809d163..209beb6e3ebf9 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/dataframe/stats/MemoryUsage.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/dataframe/stats/MemoryUsage.java @@ -46,27 +46,38 @@ private static ConstructingObjectParser createParser(boolean } private final String jobId; + /** + * timestamp may only be null when we construct a zero usage object + */ private final Instant timestamp; private final long peakUsageBytes; + /** + * Creates a zero usage object + */ + public MemoryUsage(String jobId) { + this(jobId, null, 0); + } + public MemoryUsage(String jobId, Instant timestamp, long peakUsageBytes) { this.jobId = Objects.requireNonNull(jobId); // We intend to store this timestamp in millis granularity. Thus we're rounding here to ensure // internal representation matches toXContent - this.timestamp = Instant.ofEpochMilli(ExceptionsHelper.requireNonNull(timestamp, Fields.TIMESTAMP).toEpochMilli()); + this.timestamp = timestamp == null ? null : Instant.ofEpochMilli( + ExceptionsHelper.requireNonNull(timestamp, Fields.TIMESTAMP).toEpochMilli()); this.peakUsageBytes = peakUsageBytes; } public MemoryUsage(StreamInput in) throws IOException { jobId = in.readString(); - timestamp = in.readInstant(); + timestamp = in.readOptionalInstant(); peakUsageBytes = in.readVLong(); } @Override public void writeTo(StreamOutput out) throws IOException { out.writeString(jobId); - out.writeInstant(timestamp); + out.writeOptionalInstant(timestamp); out.writeVLong(peakUsageBytes); } @@ -77,7 +88,10 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws builder.field(Fields.TYPE.getPreferredName(), TYPE_VALUE); builder.field(Fields.JOB_ID.getPreferredName(), jobId); } - builder.timeField(Fields.TIMESTAMP.getPreferredName(), Fields.TIMESTAMP.getPreferredName() + "_string", timestamp.toEpochMilli()); + if (timestamp != null) { + builder.timeField(Fields.TIMESTAMP.getPreferredName(), Fields.TIMESTAMP.getPreferredName() + "_string", + timestamp.toEpochMilli()); + } builder.field(PEAK_USAGE_BYTES.getPreferredName(), peakUsageBytes); builder.endObject(); return builder; @@ -105,6 +119,7 @@ public String toString() { } public String documentId(String jobId) { + assert timestamp != null; return documentIdPrefix(jobId) + timestamp.toEpochMilli(); } diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/dataframe/stats/common/DataCounts.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/dataframe/stats/common/DataCounts.java index f77cc781c746a..93287e06772c4 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/dataframe/stats/common/DataCounts.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/dataframe/stats/common/DataCounts.java @@ -46,6 +46,10 @@ private static ConstructingObjectParser createParser(boolean i private final long testDocsCount; private final long skippedDocsCount; + public DataCounts(String jobId) { + this(jobId, 0, 0, 0); + } + public DataCounts(String jobId, long trainingDocsCount, long testDocsCount, long skippedDocsCount) { this.jobId = Objects.requireNonNull(jobId); this.trainingDocsCount = trainingDocsCount; diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/action/GetDataFrameAnalyticsStatsActionResponseTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/action/GetDataFrameAnalyticsStatsActionResponseTests.java index 3f957f95a902c..c69466e47decf 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/action/GetDataFrameAnalyticsStatsActionResponseTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/action/GetDataFrameAnalyticsStatsActionResponseTests.java @@ -14,19 +14,22 @@ import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsState; import org.elasticsearch.xpack.core.ml.dataframe.stats.AnalysisStats; import org.elasticsearch.xpack.core.ml.dataframe.stats.AnalysisStatsNamedWriteablesProvider; -import org.elasticsearch.xpack.core.ml.dataframe.stats.common.DataCounts; -import org.elasticsearch.xpack.core.ml.dataframe.stats.common.DataCountsTests; import org.elasticsearch.xpack.core.ml.dataframe.stats.MemoryUsage; import org.elasticsearch.xpack.core.ml.dataframe.stats.MemoryUsageTests; import org.elasticsearch.xpack.core.ml.dataframe.stats.classification.ClassificationStatsTests; +import org.elasticsearch.xpack.core.ml.dataframe.stats.common.DataCounts; +import org.elasticsearch.xpack.core.ml.dataframe.stats.common.DataCountsTests; import org.elasticsearch.xpack.core.ml.dataframe.stats.outlierdetection.OutlierDetectionStatsTests; import org.elasticsearch.xpack.core.ml.dataframe.stats.regression.RegressionStatsTests; import org.elasticsearch.xpack.core.ml.utils.PhaseProgress; import java.util.ArrayList; +import java.util.Collections; import java.util.List; import java.util.stream.IntStream; +import static org.hamcrest.Matchers.equalTo; + public class GetDataFrameAnalyticsStatsActionResponseTests extends AbstractWireSerializingTestCase { @Override @@ -69,4 +72,20 @@ protected Response createTestInstance() { protected Writeable.Reader instanceReader() { return Response::new; } + + public void testStats_GivenNulls() { + Response.Stats stats = new Response.Stats(randomAlphaOfLength(10), + randomFrom(DataFrameAnalyticsState.values()), + null, + Collections.emptyList(), + null, + null, + null, + null, + null + ); + + assertThat(stats.getDataCounts(), equalTo(new DataCounts(stats.getId()))); + assertThat(stats.getMemoryUsage(), equalTo(new MemoryUsage(stats.getId()))); + } } diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/dataframe/stats/MemoryUsageTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/dataframe/stats/MemoryUsageTests.java index 44ce79b98c076..7441acff7b2bb 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/dataframe/stats/MemoryUsageTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/dataframe/stats/MemoryUsageTests.java @@ -5,6 +5,7 @@ */ package org.elasticsearch.xpack.core.ml.dataframe.stats; +import org.elasticsearch.common.Strings; import org.elasticsearch.common.io.stream.Writeable; import org.elasticsearch.common.xcontent.ToXContent; import org.elasticsearch.common.xcontent.XContentParser; @@ -16,6 +17,8 @@ import java.time.Instant; import java.util.Collections; +import static org.hamcrest.Matchers.equalTo; + public class MemoryUsageTests extends AbstractSerializingTestCase { private boolean lenient; @@ -53,4 +56,10 @@ protected Writeable.Reader instanceReader() { protected MemoryUsage createTestInstance() { return createRandom(); } + + public void testZeroUsage() { + MemoryUsage memoryUsage = new MemoryUsage("zero_usage_job"); + String asJson = Strings.toString(memoryUsage); + assertThat(asJson, equalTo("{\"peak_usage_bytes\":0}")); + } } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportGetDataFrameAnalyticsStatsAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportGetDataFrameAnalyticsStatsAction.java index 1d8dd1ebfaa8a..30bc18875a700 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportGetDataFrameAnalyticsStatsAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportGetDataFrameAnalyticsStatsAction.java @@ -42,10 +42,10 @@ import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsState; import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsTaskState; import org.elasticsearch.xpack.core.ml.dataframe.stats.AnalysisStats; -import org.elasticsearch.xpack.core.ml.dataframe.stats.common.DataCounts; import org.elasticsearch.xpack.core.ml.dataframe.stats.Fields; import org.elasticsearch.xpack.core.ml.dataframe.stats.MemoryUsage; import org.elasticsearch.xpack.core.ml.dataframe.stats.classification.ClassificationStats; +import org.elasticsearch.xpack.core.ml.dataframe.stats.common.DataCounts; import org.elasticsearch.xpack.core.ml.dataframe.stats.outlierdetection.OutlierDetectionStats; import org.elasticsearch.xpack.core.ml.dataframe.stats.regression.RegressionStats; import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndex; @@ -140,6 +140,7 @@ protected void doExecute(Task task, GetDataFrameAnalyticsStatsAction.Request req runningTasksStatsResponse -> gatherStatsForStoppedTasks(request.getExpandedIds(), runningTasksStatsResponse, ActionListener.wrap( finalResponse -> { + // While finalResponse has all the stats objects we need, we should report the count // from the get response QueryPage finalStats = new QueryPage<>(finalResponse.getResponse().results(), diff --git a/x-pack/plugin/src/test/resources/rest-api-spec/test/ml/data_frame_analytics_crud.yml b/x-pack/plugin/src/test/resources/rest-api-spec/test/ml/data_frame_analytics_crud.yml index 5ec3c978b73d6..5f8647898c922 100644 --- a/x-pack/plugin/src/test/resources/rest-api-spec/test/ml/data_frame_analytics_crud.yml +++ b/x-pack/plugin/src/test/resources/rest-api-spec/test/ml/data_frame_analytics_crud.yml @@ -902,6 +902,36 @@ setup: - match: { data_frame_analytics.0.id: "foo-1" } - match: { data_frame_analytics.0.state: "stopped" } +--- +"Test get stats on newly created congig": + + - do: + ml.put_data_frame_analytics: + id: "foo-1" + body: > + { + "source": { + "index": "index-source" + }, + "dest": { + "index": "index-foo-1_dest" + }, + "analysis": {"outlier_detection":{}} + } + - match: { id: "foo-1" } + + - do: + ml.get_data_frame_analytics_stats: + id: "foo-1" + - match: { count: 1 } + - length: { data_frame_analytics: 1 } + - match: { data_frame_analytics.0.id: "foo-1" } + - match: { data_frame_analytics.0.state: "stopped" } + - match: { data_frame_analytics.0.data_counts.training_docs_count: 0 } + - match: { data_frame_analytics.0.data_counts.test_docs_count: 0 } + - match: { data_frame_analytics.0.data_counts.skipped_docs_count: 0 } + - match: { data_frame_analytics.0.memory_usage.peak_usage_bytes: 0 } + --- "Test delete given stopped config":