From dd331935b31f83024633a6ec84da5be946b4e7d1 Mon Sep 17 00:00:00 2001 From: Dimitris Athanasiou Date: Fri, 28 Feb 2020 17:35:07 +0200 Subject: [PATCH] [ML] Parse and report memory usage for DF Analytics (#52778) Adds reporting of memory usage for data frame analytics jobs. This commit introduces a new index pattern `.ml-stats-*` whose first concrete index will be `.ml-stats-000001`. This index serves to store instrumentation information for those jobs. --- .../ml/dataframe/DataFrameAnalyticsStats.java | 22 ++- .../client/ml/dataframe/MemoryUsage.java | 88 +++++++++++ .../client/MachineLearningIT.java | 1 + .../DataFrameAnalyticsStatsTests.java | 4 + .../client/ml/dataframe/MemoryUsageTests.java | 47 ++++++ docs/reference/ml/ml-shared.asciidoc | 16 +- .../xpack/core/common/time/TimeUtils.java | 7 +- .../xpack/core/ml/MlStatsIndex.java | 50 ++++++ .../GetDataFrameAnalyticsStatsAction.java | 26 ++- .../core/ml/dataframe/stats/MemoryUsage.java | 117 ++++++++++++++ .../persistence/AnomalyDetectorsIndex.java | 70 +-------- .../xpack/core/ml/utils/MlIndexAndAlias.java | 120 ++++++++++++++ .../xpack/core/ml/stats_index_mappings.json | 21 +++ .../xpack/core/ml/stats_index_template.json | 15 ++ ...rameAnalyticsStatsActionResponseTests.java | 5 +- .../ml/dataframe/stats/MemoryUsageTests.java | 56 +++++++ .../MlIndexAndAliasTests.java} | 86 +++++----- .../xpack/ml/MlIndexTemplateRegistry.java | 17 +- ...sportGetDataFrameAnalyticsStatsAction.java | 148 +++++++++++------- .../dataframe/DataFrameAnalyticsManager.java | 24 ++- .../process/AnalyticsProcessManager.java | 3 +- .../process/AnalyticsResultProcessor.java | 38 ++++- .../process/results/AnalyticsResult.java | 29 +++- .../xpack/ml/dataframe/stats/StatsHolder.java | 20 ++- .../job/persistence/JobResultsProvider.java | 38 ++--- .../ml/utils/persistence/MlParserUtils.java | 42 +++++ .../process/AnalyticsProcessManagerTests.java | 2 +- .../AnalyticsResultProcessorTests.java | 17 +- .../process/results/AnalyticsResultTests.java | 10 +- 29 files changed, 921 insertions(+), 218 deletions(-) create mode 100644 client/rest-high-level/src/main/java/org/elasticsearch/client/ml/dataframe/MemoryUsage.java create mode 100644 client/rest-high-level/src/test/java/org/elasticsearch/client/ml/dataframe/MemoryUsageTests.java create mode 100644 x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/MlStatsIndex.java create mode 100644 x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/dataframe/stats/MemoryUsage.java create mode 100644 x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/utils/MlIndexAndAlias.java create mode 100644 x-pack/plugin/core/src/main/resources/org/elasticsearch/xpack/core/ml/stats_index_mappings.json create mode 100644 x-pack/plugin/core/src/main/resources/org/elasticsearch/xpack/core/ml/stats_index_template.json create mode 100644 x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/dataframe/stats/MemoryUsageTests.java rename x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/{job/persistence/AnomalyDetectorsIndexTests.java => utils/MlIndexAndAliasTests.java} (75%) create mode 100644 x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/utils/persistence/MlParserUtils.java diff --git a/client/rest-high-level/src/main/java/org/elasticsearch/client/ml/dataframe/DataFrameAnalyticsStats.java b/client/rest-high-level/src/main/java/org/elasticsearch/client/ml/dataframe/DataFrameAnalyticsStats.java index bfef47727f631..53e3adf2b8433 100644 --- a/client/rest-high-level/src/main/java/org/elasticsearch/client/ml/dataframe/DataFrameAnalyticsStats.java +++ b/client/rest-high-level/src/main/java/org/elasticsearch/client/ml/dataframe/DataFrameAnalyticsStats.java @@ -44,6 +44,7 @@ public static DataFrameAnalyticsStats fromXContent(XContentParser parser) throws static final ParseField STATE = new ParseField("state"); static final ParseField FAILURE_REASON = new ParseField("failure_reason"); static final ParseField PROGRESS = new ParseField("progress"); + static final ParseField MEMORY_USAGE = new ParseField("memory_usage"); static final ParseField NODE = new ParseField("node"); static final ParseField ASSIGNMENT_EXPLANATION = new ParseField("assignment_explanation"); @@ -55,8 +56,9 @@ public static DataFrameAnalyticsStats fromXContent(XContentParser parser) throws (DataFrameAnalyticsState) args[1], (String) args[2], (List) args[3], - (NodeAttributes) args[4], - (String) args[5])); + (MemoryUsage) args[4], + (NodeAttributes) args[5], + (String) args[6])); static { PARSER.declareString(constructorArg(), ID); @@ -68,6 +70,7 @@ public static DataFrameAnalyticsStats fromXContent(XContentParser parser) throws }, STATE, ObjectParser.ValueType.STRING); PARSER.declareString(optionalConstructorArg(), FAILURE_REASON); PARSER.declareObjectArray(optionalConstructorArg(), PhaseProgress.PARSER, PROGRESS); + PARSER.declareObject(optionalConstructorArg(), MemoryUsage.PARSER, MEMORY_USAGE); PARSER.declareObject(optionalConstructorArg(), NodeAttributes.PARSER, NODE); PARSER.declareString(optionalConstructorArg(), ASSIGNMENT_EXPLANATION); } @@ -76,16 +79,18 @@ public static DataFrameAnalyticsStats fromXContent(XContentParser parser) throws private final DataFrameAnalyticsState state; private final String failureReason; private final List progress; + private final MemoryUsage memoryUsage; private final NodeAttributes node; private final String assignmentExplanation; public DataFrameAnalyticsStats(String id, DataFrameAnalyticsState state, @Nullable String failureReason, - @Nullable List progress, @Nullable NodeAttributes node, - @Nullable String assignmentExplanation) { + @Nullable List progress, @Nullable MemoryUsage memoryUsage, + @Nullable NodeAttributes node, @Nullable String assignmentExplanation) { this.id = id; this.state = state; this.failureReason = failureReason; this.progress = progress; + this.memoryUsage = memoryUsage; this.node = node; this.assignmentExplanation = assignmentExplanation; } @@ -106,6 +111,11 @@ public List getProgress() { return progress; } + @Nullable + public MemoryUsage getMemoryUsage() { + return memoryUsage; + } + public NodeAttributes getNode() { return node; } @@ -124,13 +134,14 @@ public boolean equals(Object o) { && Objects.equals(state, other.state) && Objects.equals(failureReason, other.failureReason) && Objects.equals(progress, other.progress) + && Objects.equals(memoryUsage, other.memoryUsage) && Objects.equals(node, other.node) && Objects.equals(assignmentExplanation, other.assignmentExplanation); } @Override public int hashCode() { - return Objects.hash(id, state, failureReason, progress, node, assignmentExplanation); + return Objects.hash(id, state, failureReason, progress, memoryUsage, node, assignmentExplanation); } @Override @@ -140,6 +151,7 @@ public String toString() { .add("state", state) .add("failureReason", failureReason) .add("progress", progress) + .add("memoryUsage", memoryUsage) .add("node", node) .add("assignmentExplanation", assignmentExplanation) .toString(); diff --git a/client/rest-high-level/src/main/java/org/elasticsearch/client/ml/dataframe/MemoryUsage.java b/client/rest-high-level/src/main/java/org/elasticsearch/client/ml/dataframe/MemoryUsage.java new file mode 100644 index 0000000000000..323ebb52a7aed --- /dev/null +++ b/client/rest-high-level/src/main/java/org/elasticsearch/client/ml/dataframe/MemoryUsage.java @@ -0,0 +1,88 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.client.ml.dataframe; + +import org.elasticsearch.client.common.TimeUtil; +import org.elasticsearch.common.ParseField; +import org.elasticsearch.common.inject.internal.ToStringBuilder; +import org.elasticsearch.common.xcontent.ConstructingObjectParser; +import org.elasticsearch.common.xcontent.ObjectParser; +import org.elasticsearch.common.xcontent.ToXContentObject; +import org.elasticsearch.common.xcontent.XContentBuilder; + +import java.io.IOException; +import java.time.Instant; +import java.util.Objects; + +public class MemoryUsage implements ToXContentObject { + + static final ParseField TIMESTAMP = new ParseField("timestamp"); + static final ParseField PEAK_USAGE_BYTES = new ParseField("peak_usage_bytes"); + + public static final ConstructingObjectParser PARSER = new ConstructingObjectParser<>("analytics_memory_usage", + true, a -> new MemoryUsage((Instant) a[0], (long) a[1])); + + static { + PARSER.declareField(ConstructingObjectParser.constructorArg(), + p -> TimeUtil.parseTimeFieldToInstant(p, TIMESTAMP.getPreferredName()), + TIMESTAMP, + ObjectParser.ValueType.VALUE); + PARSER.declareLong(ConstructingObjectParser.constructorArg(), PEAK_USAGE_BYTES); + } + + private final Instant timestamp; + private final long peakUsageBytes; + + public MemoryUsage(Instant timestamp, long peakUsageBytes) { + this.timestamp = Instant.ofEpochMilli(Objects.requireNonNull(timestamp).toEpochMilli()); + this.peakUsageBytes = peakUsageBytes; + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.startObject(); + builder.timeField(TIMESTAMP.getPreferredName(), TIMESTAMP.getPreferredName() + "_string", timestamp.toEpochMilli()); + builder.field(PEAK_USAGE_BYTES.getPreferredName(), peakUsageBytes); + builder.endObject(); + return builder; + } + + @Override + public boolean equals(Object o) { + if (o == this) return true; + if (o == null || getClass() != o.getClass()) return false; + + MemoryUsage other = (MemoryUsage) o; + return Objects.equals(timestamp, other.timestamp) + && peakUsageBytes == other.peakUsageBytes; + } + + @Override + public int hashCode() { + return Objects.hash(timestamp, peakUsageBytes); + } + + @Override + public String toString() { + return new ToStringBuilder(getClass()) + .add(TIMESTAMP.getPreferredName(), timestamp.getEpochSecond()) + .add(PEAK_USAGE_BYTES.getPreferredName(), peakUsageBytes) + .toString(); + } +} diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/MachineLearningIT.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/MachineLearningIT.java index 6fe08f8a507de..3e9cc1f55ff38 100644 --- a/client/rest-high-level/src/test/java/org/elasticsearch/client/MachineLearningIT.java +++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/MachineLearningIT.java @@ -1506,6 +1506,7 @@ public void testGetDataFrameAnalyticsStats() throws Exception { assertThat(progress.get(1), equalTo(new PhaseProgress("loading_data", 0))); assertThat(progress.get(2), equalTo(new PhaseProgress("analyzing", 0))); assertThat(progress.get(3), equalTo(new PhaseProgress("writing_results", 0))); + assertThat(stats.getMemoryUsage(), is(nullValue())); } public void testStartDataFrameAnalyticsConfig() throws Exception { diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/ml/dataframe/DataFrameAnalyticsStatsTests.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/ml/dataframe/DataFrameAnalyticsStatsTests.java index f8eddd36bc6d9..48ebf71e36023 100644 --- a/client/rest-high-level/src/test/java/org/elasticsearch/client/ml/dataframe/DataFrameAnalyticsStatsTests.java +++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/ml/dataframe/DataFrameAnalyticsStatsTests.java @@ -47,6 +47,7 @@ public static DataFrameAnalyticsStats randomDataFrameAnalyticsStats() { randomFrom(DataFrameAnalyticsState.values()), randomBoolean() ? null : randomAlphaOfLength(10), randomBoolean() ? null : createRandomProgress(), + randomBoolean() ? null : MemoryUsageTests.createRandom(), randomBoolean() ? null : NodeAttributesTests.createRandom(), randomBoolean() ? null : randomAlphaOfLengthBetween(1, 20)); } @@ -70,6 +71,9 @@ public static void toXContent(DataFrameAnalyticsStats stats, XContentBuilder bui if (stats.getProgress() != null) { builder.field(DataFrameAnalyticsStats.PROGRESS.getPreferredName(), stats.getProgress()); } + if (stats.getMemoryUsage() != null) { + builder.field(DataFrameAnalyticsStats.MEMORY_USAGE.getPreferredName(), stats.getMemoryUsage()); + } if (stats.getNode() != null) { builder.field(DataFrameAnalyticsStats.NODE.getPreferredName(), stats.getNode()); } diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/ml/dataframe/MemoryUsageTests.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/ml/dataframe/MemoryUsageTests.java new file mode 100644 index 0000000000000..8e06db6f2b37f --- /dev/null +++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/ml/dataframe/MemoryUsageTests.java @@ -0,0 +1,47 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.client.ml.dataframe; + +import org.elasticsearch.common.xcontent.XContentParser; +import org.elasticsearch.test.AbstractXContentTestCase; + +import java.io.IOException; +import java.time.Instant; + +public class MemoryUsageTests extends AbstractXContentTestCase { + + @Override + protected MemoryUsage createTestInstance() { + return createRandom(); + } + + public static MemoryUsage createRandom() { + return new MemoryUsage(Instant.now(), randomNonNegativeLong()); + } + + @Override + protected MemoryUsage doParseInstance(XContentParser parser) throws IOException { + return MemoryUsage.PARSER.apply(parser, null); + } + + @Override + protected boolean supportsUnknownFields() { + return true; + } +} diff --git a/docs/reference/ml/ml-shared.asciidoc b/docs/reference/ml/ml-shared.asciidoc index dbd1c0ef7fa4d..4eb2b60dccb47 100644 --- a/docs/reference/ml/ml-shared.asciidoc +++ b/docs/reference/ml/ml-shared.asciidoc @@ -451,13 +451,25 @@ sorted by the `id` value in ascending order. `progress`::: (array) The progress report of the {dfanalytics-job} by phase. -`phase`::: +`phase`:::: (string) Defines the phase of the {dfanalytics-job}. Possible phases: `reindexing`, `loading_data`, `analyzing`, and `writing_results`. -`progress_percent`::: +`progress_percent`:::: (integer) The progress that the {dfanalytics-job} has made expressed in percentage. + +`memory_usage`::: +(Optional, Object) An object describing memory usage of the analytics. +It will be present only after the job has started and memory usage has +been reported. + +`timestamp`:::: +(date) The timestamp when memory usage was calculated. + +`peak_usage_bytes`:::: +(long) The number of bytes used at the highest peak of memory usage. + end::data-frame-analytics-stats[] tag::datafeed-id[] diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/common/time/TimeUtils.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/common/time/TimeUtils.java index e345feb59b04e..01667f8a48160 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/common/time/TimeUtils.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/common/time/TimeUtils.java @@ -22,6 +22,10 @@ private TimeUtils() { // Do nothing } + /** + * @deprecated Please use {@link #parseTimeFieldToInstant(XContentParser, String)} instead. + */ + @Deprecated public static Date parseTimeField(XContentParser parser, String fieldName) throws IOException { if (parser.currentToken() == XContentParser.Token.VALUE_NUMBER) { return new Date(parser.longValue()); @@ -36,7 +40,7 @@ public static Instant parseTimeFieldToInstant(XContentParser parser, String fiel if (parser.currentToken() == XContentParser.Token.VALUE_NUMBER) { return Instant.ofEpochMilli(parser.longValue()); } else if (parser.currentToken() == XContentParser.Token.VALUE_STRING) { - return Instant.ofEpochMilli(dateStringToEpoch(parser.text())); + return Instant.from(DateFieldMapper.DEFAULT_DATE_TIME_FORMATTER.parse(parser.text())); } throw new IllegalArgumentException( "unexpected token [" + parser.currentToken() + "] for [" + fieldName + "]"); @@ -54,6 +58,7 @@ public static Instant parseTimeFieldToInstant(XContentParser parser, String fiel * @return The epoch time in milliseconds or -1 if the date cannot be * parsed. */ + @Deprecated public static long dateStringToEpoch(String date) { try { long epoch = Long.parseLong(date); diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/MlStatsIndex.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/MlStatsIndex.java new file mode 100644 index 0000000000000..36cf890362428 --- /dev/null +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/MlStatsIndex.java @@ -0,0 +1,50 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.core.ml; + +import org.elasticsearch.Version; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.client.Client; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; +import org.elasticsearch.xpack.core.ml.utils.MlIndexAndAlias; +import org.elasticsearch.xpack.core.template.TemplateUtils; + +/** + * Describes the indices where ML is storing various stats about the users jobs. + */ +public class MlStatsIndex { + + public static final String TEMPLATE_NAME = ".ml-stats"; + + private static final String MAPPINGS_VERSION_VARIABLE = "xpack.ml.version"; + + private MlStatsIndex() {} + + public static String mapping() { + return TemplateUtils.loadTemplate("/org/elasticsearch/xpack/core/ml/stats_index_mappings.json", + Version.CURRENT.toString(), MAPPINGS_VERSION_VARIABLE); + } + + public static String indexPattern() { + return TEMPLATE_NAME + "-*"; + } + + public static String writeAlias() { + return ".ml-stats-write"; + } + + /** + * Creates the first concrete .ml-stats-000001 index (if necessary) + * Creates the .ml-stats-write alias for that index. + * The listener will be notified with a boolean to indicate if the index was created because of this call, + * but unless there is a failure after this method returns the index and alias should be present. + */ + public static void createStatsIndexAndAliasIfNecessary(Client client, ClusterState state, IndexNameExpressionResolver resolver, + ActionListener listener) { + MlIndexAndAlias.createIndexAndAliasIfNecessary(client, state, resolver, TEMPLATE_NAME, writeAlias(), listener); + } +} diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/GetDataFrameAnalyticsStatsAction.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/GetDataFrameAnalyticsStatsAction.java index 80dbf8bed1cef..7f6cdd97f9f3c 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/GetDataFrameAnalyticsStatsAction.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/GetDataFrameAnalyticsStatsAction.java @@ -28,6 +28,7 @@ import org.elasticsearch.xpack.core.action.util.QueryPage; import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsConfig; import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsState; +import org.elasticsearch.xpack.core.ml.dataframe.stats.MemoryUsage; import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper; import org.elasticsearch.xpack.core.ml.utils.PhaseProgress; @@ -163,17 +164,21 @@ public static class Stats implements ToXContentObject, Writeable { */ private final List progress; + @Nullable + private final MemoryUsage memoryUsage; + @Nullable private final DiscoveryNode node; @Nullable private final String assignmentExplanation; public Stats(String id, DataFrameAnalyticsState state, @Nullable String failureReason, List progress, - @Nullable DiscoveryNode node, @Nullable String assignmentExplanation) { + @Nullable MemoryUsage memoryUsage, @Nullable DiscoveryNode node, @Nullable String assignmentExplanation) { this.id = Objects.requireNonNull(id); this.state = Objects.requireNonNull(state); this.failureReason = failureReason; this.progress = Objects.requireNonNull(progress); + this.memoryUsage = memoryUsage; this.node = node; this.assignmentExplanation = assignmentExplanation; } @@ -187,6 +192,11 @@ public Stats(StreamInput in) throws IOException { } else { progress = in.readList(PhaseProgress::new); } + if (in.getVersion().onOrAfter(Version.V_8_0_0)) { + memoryUsage = in.readOptionalWriteable(MemoryUsage::new); + } else { + memoryUsage = null; + } node = in.readOptionalWriteable(DiscoveryNode::new); assignmentExplanation = in.readOptionalString(); } @@ -240,6 +250,11 @@ public List getProgress() { return progress; } + @Nullable + public MemoryUsage getMemoryUsage() { + return memoryUsage; + } + public DiscoveryNode getNode() { return node; } @@ -267,6 +282,9 @@ public XContentBuilder toUnwrappedXContent(XContentBuilder builder) throws IOExc if (progress != null) { builder.field("progress", progress); } + if (memoryUsage != null) { + builder.field("memory_usage", memoryUsage); + } if (node != null) { builder.startObject("node"); builder.field("id", node.getId()); @@ -297,6 +315,9 @@ public void writeTo(StreamOutput out) throws IOException { } else { out.writeList(progress); } + if (out.getVersion().onOrAfter(Version.V_8_0_0)) { + out.writeOptionalWriteable(memoryUsage); + } out.writeOptionalWriteable(node); out.writeOptionalString(assignmentExplanation); } @@ -329,7 +350,7 @@ private void writeProgressToLegacy(StreamOutput out) throws IOException { @Override public int hashCode() { - return Objects.hash(id, state, failureReason, progress, node, assignmentExplanation); + return Objects.hash(id, state, failureReason, progress, memoryUsage, node, assignmentExplanation); } @Override @@ -345,6 +366,7 @@ public boolean equals(Object obj) { && Objects.equals(this.state, other.state) && Objects.equals(this.failureReason, other.failureReason) && Objects.equals(this.progress, other.progress) + && Objects.equals(this.memoryUsage, other.memoryUsage) && Objects.equals(this.node, other.node) && Objects.equals(this.assignmentExplanation, other.assignmentExplanation); } diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/dataframe/stats/MemoryUsage.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/dataframe/stats/MemoryUsage.java new file mode 100644 index 0000000000000..5131d88d95924 --- /dev/null +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/dataframe/stats/MemoryUsage.java @@ -0,0 +1,117 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.core.ml.dataframe.stats; + +import org.elasticsearch.common.ParseField; +import org.elasticsearch.common.Strings; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.io.stream.Writeable; +import org.elasticsearch.common.xcontent.ConstructingObjectParser; +import org.elasticsearch.common.xcontent.ObjectParser; +import org.elasticsearch.common.xcontent.ToXContentObject; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.xpack.core.common.time.TimeUtils; +import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper; +import org.elasticsearch.xpack.core.ml.utils.ToXContentParams; + +import java.io.IOException; +import java.time.Instant; +import java.util.Objects; + +public class MemoryUsage implements Writeable, ToXContentObject { + + public static final String TYPE_VALUE = "analytics_memory_usage"; + + public static final ParseField TYPE = new ParseField("type"); + public static final ParseField JOB_ID = new ParseField("job_id"); + public static final ParseField TIMESTAMP = new ParseField("timestamp"); + public static final ParseField PEAK_USAGE_BYTES = new ParseField("peak_usage_bytes"); + + public static final ConstructingObjectParser STRICT_PARSER = createParser(false); + public static final ConstructingObjectParser LENIENT_PARSER = createParser(true); + + private static ConstructingObjectParser createParser(boolean ignoreUnknownFields) { + ConstructingObjectParser parser = new ConstructingObjectParser<>(TYPE_VALUE, + ignoreUnknownFields, a -> new MemoryUsage((String) a[0], (Instant) a[1], (long) a[2])); + + parser.declareString((bucket, s) -> {}, TYPE); + parser.declareString(ConstructingObjectParser.constructorArg(), JOB_ID); + parser.declareField(ConstructingObjectParser.constructorArg(), + p -> TimeUtils.parseTimeFieldToInstant(p, TIMESTAMP.getPreferredName()), + TIMESTAMP, + ObjectParser.ValueType.VALUE); + parser.declareLong(ConstructingObjectParser.constructorArg(), PEAK_USAGE_BYTES); + return parser; + } + + private final String jobId; + private final Instant timestamp; + private final long peakUsageBytes; + + public MemoryUsage(String jobId, Instant timestamp, long peakUsageBytes) { + this.jobId = Objects.requireNonNull(jobId); + // We intend to store this timestamp in millis granularity. Thus we're rounding here to ensure + // internal representation matches toXContent + this.timestamp = Instant.ofEpochMilli(ExceptionsHelper.requireNonNull(timestamp, TIMESTAMP).toEpochMilli()); + this.peakUsageBytes = peakUsageBytes; + } + + public MemoryUsage(StreamInput in) throws IOException { + jobId = in.readString(); + timestamp = in.readInstant(); + peakUsageBytes = in.readVLong(); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeString(jobId); + out.writeInstant(timestamp); + out.writeVLong(peakUsageBytes); + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.startObject(); + if (params.paramAsBoolean(ToXContentParams.FOR_INTERNAL_STORAGE, false)) { + builder.field(TYPE.getPreferredName(), TYPE_VALUE); + builder.field(JOB_ID.getPreferredName(), jobId); + } + builder.timeField(TIMESTAMP.getPreferredName(), TIMESTAMP.getPreferredName() + "_string", timestamp.toEpochMilli()); + builder.field(PEAK_USAGE_BYTES.getPreferredName(), peakUsageBytes); + builder.endObject(); + return builder; + } + + @Override + public boolean equals(Object o) { + if (o == this) return true; + if (o == null || getClass() != o.getClass()) return false; + + MemoryUsage other = (MemoryUsage) o; + return Objects.equals(jobId, other.jobId) + && Objects.equals(timestamp, other.timestamp) + && peakUsageBytes == other.peakUsageBytes; + } + + @Override + public int hashCode() { + return Objects.hash(jobId, timestamp, peakUsageBytes); + } + + @Override + public String toString() { + return Strings.toString(this); + } + + public String documentId(String jobId) { + return documentIdPrefix(jobId) + timestamp.toEpochMilli(); + } + + public static String documentIdPrefix(String jobId) { + return TYPE_VALUE + "_" + jobId + "_"; + } +} diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/persistence/AnomalyDetectorsIndex.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/persistence/AnomalyDetectorsIndex.java index 63859a6a171ac..306ea243120e5 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/persistence/AnomalyDetectorsIndex.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/persistence/AnomalyDetectorsIndex.java @@ -5,29 +5,18 @@ */ package org.elasticsearch.xpack.core.ml.job.persistence; -import org.elasticsearch.ResourceAlreadyExistsException; import org.elasticsearch.Version; import org.elasticsearch.action.ActionListener; -import org.elasticsearch.action.admin.indices.alias.Alias; -import org.elasticsearch.action.admin.indices.alias.IndicesAliasesRequest; -import org.elasticsearch.action.admin.indices.create.CreateIndexRequest; -import org.elasticsearch.action.admin.indices.create.CreateIndexResponse; -import org.elasticsearch.action.support.IndicesOptions; -import org.elasticsearch.action.support.master.AcknowledgedResponse; import org.elasticsearch.client.Client; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; -import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper; +import org.elasticsearch.xpack.core.ml.utils.MlIndexAndAlias; import org.elasticsearch.xpack.core.template.TemplateUtils; -import java.util.Arrays; import java.util.Comparator; import java.util.function.Predicate; import java.util.regex.Pattern; -import static org.elasticsearch.xpack.core.ClientHelper.ML_ORIGIN; -import static org.elasticsearch.xpack.core.ClientHelper.executeAsyncWithOrigin; - /** * Methods for handling index naming related functions */ @@ -119,61 +108,8 @@ public static String configIndexName() { */ public static void createStateIndexAndAliasIfNecessary(Client client, ClusterState state, IndexNameExpressionResolver resolver, final ActionListener finalListener) { - - if (state.getMetaData().getAliasAndIndexLookup().containsKey(jobStateIndexWriteAlias())) { - finalListener.onResponse(false); - return; - } - - final ActionListener createAliasListener = ActionListener.wrap( - concreteIndexName -> { - final IndicesAliasesRequest request = client.admin() - .indices() - .prepareAliases() - .addAlias(concreteIndexName, jobStateIndexWriteAlias()) - .request(); - executeAsyncWithOrigin(client.threadPool().getThreadContext(), - ML_ORIGIN, - request, - ActionListener.wrap( - resp -> finalListener.onResponse(resp.isAcknowledged()), - finalListener::onFailure), - client.admin().indices()::aliases); - }, - finalListener::onFailure - ); - - String[] stateIndices = resolver.concreteIndexNames(state, - IndicesOptions.lenientExpandOpen(), - jobStateIndexPattern()); - if (stateIndices.length > 0) { - String latestStateIndex = Arrays.stream(stateIndices).max(STATE_INDEX_NAME_COMPARATOR).get(); - createAliasListener.onResponse(latestStateIndex); - } else { - // The initial index name must be suitable for rollover functionality. - String initialJobStateIndex = AnomalyDetectorsIndexFields.STATE_INDEX_PREFIX + "-000001"; - CreateIndexRequest createIndexRequest = client.admin() - .indices() - .prepareCreate(initialJobStateIndex) - .addAlias(new Alias(jobStateIndexWriteAlias())) - .request(); - executeAsyncWithOrigin(client.threadPool().getThreadContext(), - ML_ORIGIN, - createIndexRequest, - ActionListener.wrap( - createIndexResponse -> finalListener.onResponse(true), - createIndexFailure -> { - // If it was created between our last check, and this request being handled, we should add the alias - // Adding an alias that already exists is idempotent. So, no need to double check if the alias exists - // as well. - if (ExceptionsHelper.unwrapCause(createIndexFailure) instanceof ResourceAlreadyExistsException) { - createAliasListener.onResponse(initialJobStateIndex); - } else { - finalListener.onFailure(createIndexFailure); - } - }), - client.admin().indices()::create); - } + MlIndexAndAlias.createIndexAndAliasIfNecessary(client, state, resolver, + AnomalyDetectorsIndexFields.STATE_INDEX_PREFIX, AnomalyDetectorsIndex.jobStateIndexWriteAlias(), finalListener); } public static String resultsMapping() { diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/utils/MlIndexAndAlias.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/utils/MlIndexAndAlias.java new file mode 100644 index 0000000000000..79b9987d35193 --- /dev/null +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/utils/MlIndexAndAlias.java @@ -0,0 +1,120 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.core.ml.utils; + +import org.elasticsearch.ResourceAlreadyExistsException; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.admin.indices.alias.Alias; +import org.elasticsearch.action.admin.indices.alias.IndicesAliasesRequest; +import org.elasticsearch.action.admin.indices.create.CreateIndexRequest; +import org.elasticsearch.action.admin.indices.create.CreateIndexResponse; +import org.elasticsearch.action.support.IndicesOptions; +import org.elasticsearch.action.support.master.AcknowledgedResponse; +import org.elasticsearch.client.Client; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; + +import java.util.Arrays; +import java.util.Comparator; +import java.util.function.Predicate; +import java.util.regex.Pattern; + +import static org.elasticsearch.xpack.core.ClientHelper.ML_ORIGIN; +import static org.elasticsearch.xpack.core.ClientHelper.executeAsyncWithOrigin; + +/** + * Utils to create an ML index with alias ready for rollover with a 6-digit suffix + */ +public final class MlIndexAndAlias { + + // Visible for testing + static final Comparator INDEX_NAME_COMPARATOR = new Comparator<>() { + + private final Predicate HAS_SIX_DIGIT_SUFFIX = Pattern.compile("\\d{6}").asMatchPredicate(); + + @Override + public int compare(String index1, String index2) { + String[] index1Parts = index1.split("-"); + String index1Suffix = index1Parts[index1Parts.length - 1]; + boolean index1HasSixDigitsSuffix = HAS_SIX_DIGIT_SUFFIX.test(index1Suffix); + String[] index2Parts = index2.split("-"); + String index2Suffix = index2Parts[index2Parts.length - 1]; + boolean index2HasSixDigitsSuffix = HAS_SIX_DIGIT_SUFFIX.test(index2Suffix); + if (index1HasSixDigitsSuffix && index2HasSixDigitsSuffix) { + return index1Suffix.compareTo(index2Suffix); + } else if (index1HasSixDigitsSuffix != index2HasSixDigitsSuffix) { + return Boolean.compare(index1HasSixDigitsSuffix, index2HasSixDigitsSuffix); + } else { + return index1.compareTo(index2); + } + } + }; + + private MlIndexAndAlias() {} + + /** + * Creates the first index with a name of the given {@code indexPatternPrefix} followed by "-000001", if the index is missing. + * Adds an {@code alias} to that index if it was created, + * or to the index with the highest suffix if the index did not have to be created. + * The listener is notified with a {@code boolean} that informs whether the index or the alias were created. + */ + public static void createIndexAndAliasIfNecessary(Client client, ClusterState clusterState, IndexNameExpressionResolver resolver, + String indexPatternPrefix, String alias, ActionListener listener) { + if (clusterState.getMetaData().getAliasAndIndexLookup().containsKey(alias)) { + listener.onResponse(false); + return; + } + + final ActionListener createAliasListener = ActionListener.wrap( + concreteIndexName -> { + final IndicesAliasesRequest request = client.admin() + .indices() + .prepareAliases() + .addAlias(concreteIndexName, alias) + .request(); + executeAsyncWithOrigin(client.threadPool().getThreadContext(), + ML_ORIGIN, + request, + ActionListener.wrap( + resp -> listener.onResponse(resp.isAcknowledged()), + listener::onFailure), + client.admin().indices()::aliases); + }, + listener::onFailure + ); + + String[] stateIndices = resolver.concreteIndexNames(clusterState, + IndicesOptions.lenientExpandOpen(), indexPatternPrefix + "*"); + if (stateIndices.length > 0) { + String latestStateIndex = Arrays.stream(stateIndices).max(INDEX_NAME_COMPARATOR).get(); + createAliasListener.onResponse(latestStateIndex); + } else { + // The initial index name must be suitable for rollover functionality. + String initialJobStateIndex = indexPatternPrefix + "-000001"; + CreateIndexRequest createIndexRequest = client.admin() + .indices() + .prepareCreate(initialJobStateIndex) + .addAlias(new Alias(alias)) + .request(); + executeAsyncWithOrigin(client.threadPool().getThreadContext(), + ML_ORIGIN, + createIndexRequest, + ActionListener.wrap( + createIndexResponse -> listener.onResponse(true), + createIndexFailure -> { + // If it was created between our last check, and this request being handled, we should add the alias + // Adding an alias that already exists is idempotent. So, no need to double check if the alias exists + // as well. + if (ExceptionsHelper.unwrapCause(createIndexFailure) instanceof ResourceAlreadyExistsException) { + createAliasListener.onResponse(initialJobStateIndex); + } else { + listener.onFailure(createIndexFailure); + } + }), + client.admin().indices()::create); + } + } +} diff --git a/x-pack/plugin/core/src/main/resources/org/elasticsearch/xpack/core/ml/stats_index_mappings.json b/x-pack/plugin/core/src/main/resources/org/elasticsearch/xpack/core/ml/stats_index_mappings.json new file mode 100644 index 0000000000000..5a1215057bb22 --- /dev/null +++ b/x-pack/plugin/core/src/main/resources/org/elasticsearch/xpack/core/ml/stats_index_mappings.json @@ -0,0 +1,21 @@ +{ + "_doc": { + "_meta": { + "version" : "${xpack.ml.version}" + }, + "properties" : { + "type" : { + "type" : "keyword" + }, + "job_id" : { + "type" : "keyword" + }, + "timestamp" : { + "type" : "date" + }, + "peak_usage_bytes" : { + "type" : "long" + } + } + } +} diff --git a/x-pack/plugin/core/src/main/resources/org/elasticsearch/xpack/core/ml/stats_index_template.json b/x-pack/plugin/core/src/main/resources/org/elasticsearch/xpack/core/ml/stats_index_template.json new file mode 100644 index 0000000000000..1c694d9d1a7e5 --- /dev/null +++ b/x-pack/plugin/core/src/main/resources/org/elasticsearch/xpack/core/ml/stats_index_template.json @@ -0,0 +1,15 @@ +{ + "order" : 0, + "version" : ${xpack.ml.version.id}, + "index_patterns" : [ + ".ml-stats-*" + ], + "settings": { + "index" : { + "number_of_shards" : "1", + "auto_expand_replicas" : "0-1", + "hidden": true + } + }, + "mappings" : ${xpack.ml.stats.mappings} +} diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/action/GetDataFrameAnalyticsStatsActionResponseTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/action/GetDataFrameAnalyticsStatsActionResponseTests.java index 84c77a5fb38b1..f5dab116b38e1 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/action/GetDataFrameAnalyticsStatsActionResponseTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/action/GetDataFrameAnalyticsStatsActionResponseTests.java @@ -11,6 +11,8 @@ import org.elasticsearch.xpack.core.ml.action.GetDataFrameAnalyticsStatsAction.Response; import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsConfigTests; import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsState; +import org.elasticsearch.xpack.core.ml.dataframe.stats.MemoryUsage; +import org.elasticsearch.xpack.core.ml.dataframe.stats.MemoryUsageTests; import org.elasticsearch.xpack.core.ml.utils.PhaseProgress; import java.util.ArrayList; @@ -27,8 +29,9 @@ public static Response randomResponse(int listSize) { List progress = new ArrayList<>(progressSize); IntStream.of(progressSize).forEach(progressIndex -> progress.add( new PhaseProgress(randomAlphaOfLength(10), randomIntBetween(0, 100)))); + MemoryUsage memoryUsage = randomBoolean() ? null : MemoryUsageTests.createRandom(); Response.Stats stats = new Response.Stats(DataFrameAnalyticsConfigTests.randomValidId(), - randomFrom(DataFrameAnalyticsState.values()), failureReason, progress, null, randomAlphaOfLength(20)); + randomFrom(DataFrameAnalyticsState.values()), failureReason, progress, memoryUsage, null, randomAlphaOfLength(20)); analytics.add(stats); } return new Response(new QueryPage<>(analytics, analytics.size(), GetDataFrameAnalyticsAction.Response.RESULTS_FIELD)); diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/dataframe/stats/MemoryUsageTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/dataframe/stats/MemoryUsageTests.java new file mode 100644 index 0000000000000..44ce79b98c076 --- /dev/null +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/dataframe/stats/MemoryUsageTests.java @@ -0,0 +1,56 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.core.ml.dataframe.stats; + +import org.elasticsearch.common.io.stream.Writeable; +import org.elasticsearch.common.xcontent.ToXContent; +import org.elasticsearch.common.xcontent.XContentParser; +import org.elasticsearch.test.AbstractSerializingTestCase; +import org.elasticsearch.xpack.core.ml.utils.ToXContentParams; +import org.junit.Before; + +import java.io.IOException; +import java.time.Instant; +import java.util.Collections; + +public class MemoryUsageTests extends AbstractSerializingTestCase { + + private boolean lenient; + + @Before + public void chooseStrictOrLenient() { + lenient = randomBoolean(); + } + + @Override + protected boolean supportsUnknownFields() { + return lenient; + } + + @Override + protected MemoryUsage doParseInstance(XContentParser parser) throws IOException { + return lenient ? MemoryUsage.LENIENT_PARSER.parse(parser, null) : MemoryUsage.STRICT_PARSER.parse(parser, null); + } + + @Override + protected ToXContent.Params getToXContentParams() { + return new ToXContent.MapParams(Collections.singletonMap(ToXContentParams.FOR_INTERNAL_STORAGE, "true")); + } + + public static MemoryUsage createRandom() { + return new MemoryUsage(randomAlphaOfLength(10), Instant.now(), randomNonNegativeLong()); + } + + @Override + protected Writeable.Reader instanceReader() { + return MemoryUsage::new; + } + + @Override + protected MemoryUsage createTestInstance() { + return createRandom(); + } +} diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/job/persistence/AnomalyDetectorsIndexTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/utils/MlIndexAndAliasTests.java similarity index 75% rename from x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/job/persistence/AnomalyDetectorsIndexTests.java rename to x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/utils/MlIndexAndAliasTests.java index ce5b038728651..80a55394a4003 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/job/persistence/AnomalyDetectorsIndexTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/utils/MlIndexAndAliasTests.java @@ -3,7 +3,7 @@ * or more contributor license agreements. Licensed under the Elastic License; * you may not use this file except in compliance with the Elastic License. */ -package org.elasticsearch.xpack.core.ml.job.persistence; +package org.elasticsearch.xpack.core.ml.utils; import org.elasticsearch.Version; import org.elasticsearch.action.ActionListener; @@ -56,11 +56,12 @@ import static org.mockito.Mockito.verifyNoMoreInteractions; import static org.mockito.Mockito.when; -public class AnomalyDetectorsIndexTests extends ESTestCase { +public class MlIndexAndAliasTests extends ESTestCase { - private static final String LEGACY_ML_STATE = ".ml-state"; - private static final String INITIAL_ML_STATE = ".ml-state-000001"; - private static final String ML_STATE_WRITE_ALIAS = ".ml-state-write"; + private static final String TEST_INDEX_PREFIX = "test"; + private static final String TEST_INDEX_ALIAS = "test-alias"; + private static final String LEGACY_INDEX_WITHOUT_SUFFIX = TEST_INDEX_PREFIX; + private static final String FIRST_CONCRETE_INDEX = "test-000001"; private ThreadPool threadPool; private IndicesAdminClient indicesAdminClient; @@ -77,9 +78,9 @@ public void setUpMocks() { when(threadPool.getThreadContext()).thenReturn(new ThreadContext(Settings.EMPTY)); indicesAdminClient = mock(IndicesAdminClient.class); - when(indicesAdminClient.prepareCreate(INITIAL_ML_STATE)) - .thenReturn(new CreateIndexRequestBuilder(client, CreateIndexAction.INSTANCE, INITIAL_ML_STATE)); - doAnswer(withResponse(new CreateIndexResponse(true, true, INITIAL_ML_STATE))).when(indicesAdminClient).create(any(), any()); + when(indicesAdminClient.prepareCreate(FIRST_CONCRETE_INDEX)) + .thenReturn(new CreateIndexRequestBuilder(client, CreateIndexAction.INSTANCE, FIRST_CONCRETE_INDEX)); + doAnswer(withResponse(new CreateIndexResponse(true, true, FIRST_CONCRETE_INDEX))).when(indicesAdminClient).create(any(), any()); when(indicesAdminClient.prepareAliases()).thenReturn(new IndicesAliasesRequestBuilder(client, IndicesAliasesAction.INSTANCE)); doAnswer(withResponse(new AcknowledgedResponse(true))).when(indicesAdminClient).aliases(any(), any()); @@ -103,31 +104,31 @@ public void verifyNoMoreInteractionsWithMocks() { public void testCreateStateIndexAndAliasIfNecessary_CleanState() { ClusterState clusterState = createClusterState(Collections.emptyMap()); - AnomalyDetectorsIndex.createStateIndexAndAliasIfNecessary(client, clusterState, new IndexNameExpressionResolver(), finalListener); + createIndexAndAliasIfNecessary(clusterState); InOrder inOrder = inOrder(indicesAdminClient, finalListener); - inOrder.verify(indicesAdminClient).prepareCreate(INITIAL_ML_STATE); + inOrder.verify(indicesAdminClient).prepareCreate(FIRST_CONCRETE_INDEX); inOrder.verify(indicesAdminClient).create(createRequestCaptor.capture(), any()); inOrder.verify(finalListener).onResponse(true); CreateIndexRequest createRequest = createRequestCaptor.getValue(); - assertThat(createRequest.index(), equalTo(INITIAL_ML_STATE)); - assertThat(createRequest.aliases(), equalTo(Collections.singleton(new Alias(ML_STATE_WRITE_ALIAS)))); + assertThat(createRequest.index(), equalTo(FIRST_CONCRETE_INDEX)); + assertThat(createRequest.aliases(), equalTo(Collections.singleton(new Alias(TEST_INDEX_ALIAS)))); } private void assertNoClientInteractionsWhenWriteAliasAlreadyExists(String indexName) { ClusterState clusterState = createClusterState(Collections.singletonMap(indexName, createIndexMetaDataWithAlias(indexName))); - AnomalyDetectorsIndex.createStateIndexAndAliasIfNecessary(client, clusterState, new IndexNameExpressionResolver(), finalListener); + createIndexAndAliasIfNecessary(clusterState); verify(finalListener).onResponse(false); } - public void testCreateStateIndexAndAliasIfNecessary_WriteAliasAlreadyExistsAndPointsAtLegacyStateIndex() { - assertNoClientInteractionsWhenWriteAliasAlreadyExists(LEGACY_ML_STATE); + public void testCreateStateIndexAndAliasIfNecessary_WriteAliasAlreadyExistsAndPointsAtLegacyIndex() { + assertNoClientInteractionsWhenWriteAliasAlreadyExists(LEGACY_INDEX_WITHOUT_SUFFIX); } public void testCreateStateIndexAndAliasIfNecessary_WriteAliasAlreadyExistsAndPointsAtInitialStateIndex() { - assertNoClientInteractionsWhenWriteAliasAlreadyExists(INITIAL_ML_STATE); + assertNoClientInteractionsWhenWriteAliasAlreadyExists(FIRST_CONCRETE_INDEX); } public void testCreateStateIndexAndAliasIfNecessary_WriteAliasAlreadyExistsAndPointsAtSubsequentStateIndex() { @@ -141,8 +142,8 @@ public void testCreateStateIndexAndAliasIfNecessary_WriteAliasAlreadyExistsAndPo private void assertMlStateWriteAliasAddedToMostRecentMlStateIndex(List existingIndexNames, String expectedWriteIndexName) { ClusterState clusterState = createClusterState( - existingIndexNames.stream().collect(toMap(Function.identity(), AnomalyDetectorsIndexTests::createIndexMetaData))); - AnomalyDetectorsIndex.createStateIndexAndAliasIfNecessary(client, clusterState, new IndexNameExpressionResolver(), finalListener); + existingIndexNames.stream().collect(toMap(Function.identity(), MlIndexAndAliasTests::createIndexMetaData))); + createIndexAndAliasIfNecessary(clusterState); InOrder inOrder = inOrder(indicesAdminClient, finalListener); inOrder.verify(indicesAdminClient).prepareAliases(); @@ -152,54 +153,59 @@ private void assertMlStateWriteAliasAddedToMostRecentMlStateIndex(List e IndicesAliasesRequest indicesAliasesRequest = aliasesRequestCaptor.getValue(); assertThat( indicesAliasesRequest.getAliasActions(), - contains(AliasActions.add().alias(ML_STATE_WRITE_ALIAS).index(expectedWriteIndexName))); + contains(AliasActions.add().alias(TEST_INDEX_ALIAS).index(expectedWriteIndexName))); } - public void testCreateStateIndexAndAliasIfNecessary_WriteAliasDoesNotExistButLegacyStateIndexExists() { + public void testCreateStateIndexAndAliasIfNecessary_WriteAliasDoesNotExistButLegacyIndexExists() { assertMlStateWriteAliasAddedToMostRecentMlStateIndex( - Arrays.asList(LEGACY_ML_STATE), LEGACY_ML_STATE); + Arrays.asList(LEGACY_INDEX_WITHOUT_SUFFIX), LEGACY_INDEX_WITHOUT_SUFFIX); } public void testCreateStateIndexAndAliasIfNecessary_WriteAliasDoesNotExistButInitialStateIndexExists() { assertMlStateWriteAliasAddedToMostRecentMlStateIndex( - Arrays.asList(INITIAL_ML_STATE), INITIAL_ML_STATE); + Arrays.asList(FIRST_CONCRETE_INDEX), FIRST_CONCRETE_INDEX); } public void testCreateStateIndexAndAliasIfNecessary_WriteAliasDoesNotExistButSubsequentStateIndicesExist() { assertMlStateWriteAliasAddedToMostRecentMlStateIndex( - Arrays.asList(".ml-state-000003", ".ml-state-000040", ".ml-state-000500"), ".ml-state-000500"); + Arrays.asList("test-000003", "test-000040", "test-000500"), "test-000500"); } - public void testCreateStateIndexAndAliasIfNecessary_WriteAliasDoesNotExistButBothLegacyAndNewStateIndicesDoExist() { + public void testCreateStateIndexAndAliasIfNecessary_WriteAliasDoesNotExistButBothLegacyAndNewIndicesExist() { assertMlStateWriteAliasAddedToMostRecentMlStateIndex( - Arrays.asList(LEGACY_ML_STATE, ".ml-state-000003", ".ml-state-000040", ".ml-state-000500"), ".ml-state-000500"); + Arrays.asList(LEGACY_INDEX_WITHOUT_SUFFIX, "test-000003", "test-000040", "test-000500"), "test-000500"); } - public void testStateIndexNameComparator() { - Comparator comparator = AnomalyDetectorsIndex.STATE_INDEX_NAME_COMPARATOR; + public void testIndexNameComparator() { + Comparator comparator = MlIndexAndAlias.INDEX_NAME_COMPARATOR; assertThat( - Stream.of(".ml-state-000001").max(comparator).get(), - equalTo(".ml-state-000001")); + Stream.of("test-000001").max(comparator).get(), + equalTo("test-000001")); assertThat( - Stream.of(".ml-state-000002", ".ml-state-000001").max(comparator).get(), - equalTo(".ml-state-000002")); + Stream.of("test-000002", "test-000001").max(comparator).get(), + equalTo("test-000002")); assertThat( - Stream.of(".ml-state-000003", ".ml-state-000040", ".ml-state-000500").max(comparator).get(), - equalTo(".ml-state-000500")); + Stream.of("test-000003", "test-000040", "test-000500").max(comparator).get(), + equalTo("test-000500")); assertThat( - Stream.of(".ml-state-000042", ".ml-state-000049", ".ml-state-000038").max(comparator).get(), - equalTo(".ml-state-000049")); + Stream.of("test-000042", "test-000049", "test-000038").max(comparator).get(), + equalTo("test-000049")); assertThat( - Stream.of(".ml-state", ".ml-state-000003", ".ml-state-000040", ".ml-state-000500").max(comparator).get(), - equalTo(".ml-state-000500")); + Stream.of("test", "test-000003", "test-000040", "test-000500").max(comparator).get(), + equalTo("test-000500")); assertThat( - Stream.of(".reindexed-6-ml-state", ".ml-state-000042").max(comparator).get(), - equalTo(".ml-state-000042")); + Stream.of(".reindexed-6-test", "test-000042").max(comparator).get(), + equalTo("test-000042")); assertThat( Stream.of(".a-000002", ".b-000001").max(comparator).get(), equalTo(".a-000002")); } + private void createIndexAndAliasIfNecessary(ClusterState clusterState) { + MlIndexAndAlias.createIndexAndAliasIfNecessary(client, clusterState, new IndexNameExpressionResolver(), + TEST_INDEX_PREFIX, TEST_INDEX_ALIAS, finalListener); + } + @SuppressWarnings("unchecked") private static Answer withResponse(Response response) { return invocationOnMock -> { @@ -234,7 +240,7 @@ private static IndexMetaData createIndexMetaData(String indexName, boolean withA IndexMetaData.Builder builder = IndexMetaData.builder(indexName) .settings(settings); if (withAlias) { - builder.putAlias(AliasMetaData.builder(ML_STATE_WRITE_ALIAS).build()); + builder.putAlias(AliasMetaData.builder(TEST_INDEX_ALIAS).build()); } return builder.build(); } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlIndexTemplateRegistry.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlIndexTemplateRegistry.java index be52cc9202c78..ae516b4f8ce7a 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlIndexTemplateRegistry.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlIndexTemplateRegistry.java @@ -14,6 +14,7 @@ import org.elasticsearch.xpack.core.ClientHelper; import org.elasticsearch.xpack.core.ml.MlConfigIndex; import org.elasticsearch.xpack.core.ml.MlMetaIndex; +import org.elasticsearch.xpack.core.ml.MlStatsIndex; import org.elasticsearch.xpack.core.ml.inference.persistence.InferenceIndexConstants; import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndex; import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndexFields; @@ -56,6 +57,8 @@ public class MlIndexTemplateRegistry extends IndexTemplateRegistry { ROOT_RESOURCE_PATH + "inference_index_template.json", Version.CURRENT.id, VERSION_PATTERN, Collections.singletonMap(VERSION_ID_PATTERN, String.valueOf(Version.CURRENT.id))); + private static final IndexTemplateConfig STATS_TEMPLATE = statsTemplate(); + private static IndexTemplateConfig configTemplate() { Map variables = new HashMap<>(); variables.put(VERSION_ID_PATTERN, String.valueOf(Version.CURRENT.id)); @@ -80,6 +83,17 @@ private static IndexTemplateConfig anomalyDetectionResultsTemplate() { variables); } + private static IndexTemplateConfig statsTemplate() { + Map variables = new HashMap<>(); + variables.put(VERSION_ID_PATTERN, String.valueOf(Version.CURRENT.id)); + variables.put("xpack.ml.stats.mappings", MlStatsIndex.mapping()); + + return new IndexTemplateConfig(MlStatsIndex.TEMPLATE_NAME, + ROOT_RESOURCE_PATH + "stats_index_template.json", + Version.CURRENT.id, VERSION_PATTERN, + variables); + } + public MlIndexTemplateRegistry(Settings nodeSettings, ClusterService clusterService, ThreadPool threadPool, Client client, NamedXContentRegistry xContentRegistry) { super(nodeSettings, clusterService, threadPool, client, xContentRegistry); @@ -98,7 +112,8 @@ protected List getTemplateConfigs() { CONFIG_TEMPLATE, INFERENCE_TEMPLATE, META_TEMPLATE, - NOTIFICATIONS_TEMPLATE + NOTIFICATIONS_TEMPLATE, + STATS_TEMPLATE ); } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportGetDataFrameAnalyticsStatsAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportGetDataFrameAnalyticsStatsAction.java index 08edea3329813..ea91f0c77ec69 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportGetDataFrameAnalyticsStatsAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportGetDataFrameAnalyticsStatsAction.java @@ -7,7 +7,6 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; -import org.apache.logging.log4j.message.ParameterizedMessage; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.FailedNodeException; import org.elasticsearch.action.TaskOperationFailure; @@ -22,20 +21,18 @@ import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.service.ClusterService; -import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.inject.Inject; -import org.elasticsearch.common.xcontent.LoggingDeprecationHandler; -import org.elasticsearch.common.xcontent.NamedXContentRegistry; -import org.elasticsearch.common.xcontent.XContentFactory; -import org.elasticsearch.common.xcontent.XContentParser; -import org.elasticsearch.common.xcontent.XContentType; +import org.elasticsearch.common.util.concurrent.AtomicArray; +import org.elasticsearch.index.query.QueryBuilder; import org.elasticsearch.index.query.QueryBuilders; import org.elasticsearch.persistent.PersistentTasksCustomMetaData; import org.elasticsearch.search.SearchHit; +import org.elasticsearch.search.sort.SortOrder; import org.elasticsearch.tasks.Task; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; import org.elasticsearch.xpack.core.action.util.QueryPage; +import org.elasticsearch.xpack.core.ml.MlStatsIndex; import org.elasticsearch.xpack.core.ml.MlTasks; import org.elasticsearch.xpack.core.ml.action.GetDataFrameAnalyticsAction; import org.elasticsearch.xpack.core.ml.action.GetDataFrameAnalyticsStatsAction; @@ -43,20 +40,22 @@ import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsConfig; import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsState; import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsTaskState; +import org.elasticsearch.xpack.core.ml.dataframe.stats.MemoryUsage; import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndex; import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper; import org.elasticsearch.xpack.core.ml.utils.PhaseProgress; import org.elasticsearch.xpack.ml.dataframe.DataFrameAnalyticsTask; import org.elasticsearch.xpack.ml.dataframe.StoredProgress; import org.elasticsearch.xpack.ml.dataframe.stats.ProgressTracker; +import org.elasticsearch.xpack.ml.dataframe.stats.StatsHolder; +import org.elasticsearch.xpack.ml.utils.persistence.MlParserUtils; -import java.io.IOException; -import java.io.InputStream; import java.util.ArrayList; import java.util.Collections; import java.util.Comparator; import java.util.List; import java.util.Set; +import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Collectors; import static org.elasticsearch.xpack.core.ClientHelper.ML_ORIGIN; @@ -98,16 +97,20 @@ protected void taskOperation(GetDataFrameAnalyticsStatsAction.Request request, D ActionListener> listener) { logger.debug("Get stats for running task [{}]", task.getParams().getId()); - ActionListener> progressListener = ActionListener.wrap( - progress -> { - Stats stats = buildStats(task.getParams().getId(), progress); + ActionListener statsHolderListener = ActionListener.wrap( + statsHolder -> { + Stats stats = buildStats( + task.getParams().getId(), + statsHolder.getProgressTracker().report(), + statsHolder.getMemoryUsage() + ); listener.onResponse(new QueryPage<>(Collections.singletonList(stats), 1, GetDataFrameAnalyticsAction.Response.RESULTS_FIELD)); }, listener::onFailure ); ActionListener reindexingProgressListener = ActionListener.wrap( - aVoid -> progressListener.onResponse(task.getStatsHolder().getProgressTracker().report()), + aVoid -> statsHolderListener.onResponse(task.getStatsHolder()), listener::onFailure ); @@ -157,22 +160,25 @@ void gatherStatsForStoppedTasks(List expandedIds, GetDataFrameAnalyticsS return; } - searchStoredProgresses(stoppedTasksIds, ActionListener.wrap( - storedProgresses -> { - List stoppedStats = new ArrayList<>(stoppedTasksIds.size()); - for (int i = 0; i < stoppedTasksIds.size(); i++) { - String configId = stoppedTasksIds.get(i); - StoredProgress storedProgress = storedProgresses.get(i); - stoppedStats.add(buildStats(configId, storedProgress.get())); - } - List allTasksStats = new ArrayList<>(runningTasksResponse.getResponse().results()); - allTasksStats.addAll(stoppedStats); - Collections.sort(allTasksStats, Comparator.comparing(Stats::getId)); - listener.onResponse(new GetDataFrameAnalyticsStatsAction.Response(new QueryPage<>( - allTasksStats, allTasksStats.size(), GetDataFrameAnalyticsAction.Response.RESULTS_FIELD))); - }, - listener::onFailure - )); + AtomicInteger counter = new AtomicInteger(stoppedTasksIds.size()); + AtomicArray jobStats = new AtomicArray<>(stoppedTasksIds.size()); + for (int i = 0; i < stoppedTasksIds.size(); i++) { + final int slot = i; + String jobId = stoppedTasksIds.get(i); + searchStats(jobId, ActionListener.wrap( + stats -> { + jobStats.set(slot, stats); + if (counter.decrementAndGet() == 0) { + List allTasksStats = new ArrayList<>(runningTasksResponse.getResponse().results()); + allTasksStats.addAll(jobStats.asList()); + Collections.sort(allTasksStats, Comparator.comparing(Stats::getId)); + listener.onResponse(new GetDataFrameAnalyticsStatsAction.Response(new QueryPage<>( + allTasksStats, allTasksStats.size(), GetDataFrameAnalyticsAction.Response.RESULTS_FIELD))); + } + }, + listener::onFailure) + ); + } } static List determineStoppedTasksIds(List expandedIds, List runningTasksStats) { @@ -180,19 +186,15 @@ static List determineStoppedTasksIds(List expandedIds, List startedTasksIds.contains(id) == false).collect(Collectors.toList()); } - private void searchStoredProgresses(List configIds, ActionListener> listener) { + private void searchStats(String configId, ActionListener listener) { + RetrievedStatsHolder retrievedStatsHolder = new RetrievedStatsHolder(); + MultiSearchRequest multiSearchRequest = new MultiSearchRequest(); - for (String configId : configIds) { - SearchRequest searchRequest = new SearchRequest(AnomalyDetectorsIndex.jobStateIndexPattern()); - searchRequest.indicesOptions(IndicesOptions.lenientExpandOpen()); - searchRequest.source().size(1); - searchRequest.source().query(QueryBuilders.idsQuery().addIds(StoredProgress.documentId(configId))); - multiSearchRequest.add(searchRequest); - } + multiSearchRequest.add(buildStoredProgressSearch(configId)); + multiSearchRequest.add(buildMemoryUsageSearch(configId)); executeAsyncWithOrigin(client, ML_ORIGIN, MultiSearchAction.INSTANCE, multiSearchRequest, ActionListener.wrap( multiSearchResponse -> { - List progresses = new ArrayList<>(configIds.size()); for (MultiSearchResponse.Item itemResponse : multiSearchResponse.getResponses()) { if (itemResponse.isFailure()) { listener.onFailure(ExceptionsHelper.serverError(itemResponse.getFailureMessage(), itemResponse.getFailure())); @@ -200,32 +202,57 @@ private void searchStoredProgresses(List configIds, ActionListener listener.onFailure(ExceptionsHelper.serverError("Error searching for stored progresses", e)) + e -> listener.onFailure(ExceptionsHelper.serverError("Error searching for stats", e)) )); } - private StoredProgress parseStoredProgress(SearchHit hit) { - BytesReference source = hit.getSourceRef(); - try (InputStream stream = source.streamInput(); - XContentParser parser = XContentFactory.xContent(XContentType.JSON) - .createParser(NamedXContentRegistry.EMPTY, LoggingDeprecationHandler.INSTANCE, stream)) { - StoredProgress storedProgress = StoredProgress.PARSER.apply(parser, null); - return storedProgress; - } catch (IOException e) { - logger.error(new ParameterizedMessage("failed to parse progress from doc with it [{}]", hit.getId()), e); - return new StoredProgress(Collections.emptyList()); + private static SearchRequest buildStoredProgressSearch(String configId) { + SearchRequest searchRequest = new SearchRequest(AnomalyDetectorsIndex.jobStateIndexPattern()); + searchRequest.indicesOptions(IndicesOptions.lenientExpandOpen()); + searchRequest.source().size(1); + searchRequest.source().query(QueryBuilders.idsQuery().addIds(StoredProgress.documentId(configId))); + return searchRequest; + } + + private static SearchRequest buildMemoryUsageSearch(String configId) { + SearchRequest searchRequest = new SearchRequest(MlStatsIndex.indexPattern()); + searchRequest.indicesOptions(IndicesOptions.lenientExpandOpen()); + searchRequest.source().size(1); + QueryBuilder query = QueryBuilders.boolQuery() + .filter(QueryBuilders.termQuery(MemoryUsage.JOB_ID.getPreferredName(), configId)) + .filter(QueryBuilders.termQuery(MemoryUsage.TYPE.getPreferredName(), MemoryUsage.TYPE_VALUE)); + searchRequest.source().query(query); + searchRequest.source().sort(MemoryUsage.TIMESTAMP.getPreferredName(), SortOrder.DESC); + return searchRequest; + } + + private static void parseHit(SearchHit hit, String configId, RetrievedStatsHolder retrievedStatsHolder) { + String hitId = hit.getId(); + if (StoredProgress.documentId(configId).equals(hitId)) { + retrievedStatsHolder.progress = MlParserUtils.parse(hit, StoredProgress.PARSER); + } else if (hitId.startsWith(MemoryUsage.documentIdPrefix(configId))) { + retrievedStatsHolder.memoryUsage = MlParserUtils.parse(hit, MemoryUsage.LENIENT_PARSER); + } else { + throw ExceptionsHelper.serverError("unexpected doc id [" + hitId + "]"); } } - private GetDataFrameAnalyticsStatsAction.Response.Stats buildStats(String concreteAnalyticsId, List progress) { + private GetDataFrameAnalyticsStatsAction.Response.Stats buildStats(String concreteAnalyticsId, + List progress, + MemoryUsage memoryUsage) { ClusterState clusterState = clusterService.state(); PersistentTasksCustomMetaData tasks = clusterState.getMetaData().custom(PersistentTasksCustomMetaData.TYPE); PersistentTasksCustomMetaData.PersistentTask analyticsTask = MlTasks.getDataFrameAnalyticsTask(concreteAnalyticsId, tasks); @@ -242,6 +269,19 @@ private GetDataFrameAnalyticsStatsAction.Response.Stats buildStats(String concre assignmentExplanation = analyticsTask.getAssignment().getExplanation(); } return new GetDataFrameAnalyticsStatsAction.Response.Stats( - concreteAnalyticsId, analyticsState, failureReason, progress, node, assignmentExplanation); + concreteAnalyticsId, + analyticsState, + failureReason, + progress, + memoryUsage, + node, + assignmentExplanation + ); + } + + private static class RetrievedStatsHolder { + + private volatile StoredProgress progress = new StoredProgress(new ProgressTracker().report()); + private volatile MemoryUsage memoryUsage; } } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/DataFrameAnalyticsManager.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/DataFrameAnalyticsManager.java index 8757ec7c4942f..46e3dcaf74561 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/DataFrameAnalyticsManager.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/DataFrameAnalyticsManager.java @@ -30,11 +30,13 @@ import org.elasticsearch.script.Script; import org.elasticsearch.tasks.Task; import org.elasticsearch.xpack.core.ClientHelper; +import org.elasticsearch.xpack.core.ml.MlStatsIndex; import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsConfig; import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsState; import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsTaskState; import org.elasticsearch.xpack.core.ml.job.messages.Messages; import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndex; +import org.elasticsearch.xpack.core.ml.job.persistence.ElasticsearchMappings; import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper; import org.elasticsearch.xpack.ml.dataframe.extractor.DataFrameDataExtractorFactory; import org.elasticsearch.xpack.ml.dataframe.persistence.DataFrameAnalyticsConfigProvider; @@ -102,15 +104,35 @@ public void execute(DataFrameAnalyticsTask task, DataFrameAnalyticsState current ); // Retrieve configuration - ActionListener stateAliasListener = ActionListener.wrap( + ActionListener statsIndexListener = ActionListener.wrap( aBoolean -> configProvider.get(task.getParams().getId(), configListener), configListener::onFailure ); + // Make sure the stats index and alias exist + ActionListener stateAliasListener = ActionListener.wrap( + aBoolean -> createStatsIndexAndUpdateMappingsIfNecessary(clusterState, statsIndexListener), + configListener::onFailure + ); + // Make sure the state index and alias exist AnomalyDetectorsIndex.createStateIndexAndAliasIfNecessary(client, clusterState, expressionResolver, stateAliasListener); } + private void createStatsIndexAndUpdateMappingsIfNecessary(ClusterState clusterState, ActionListener listener) { + ActionListener createIndexListener = ActionListener.wrap( + aBoolean -> ElasticsearchMappings.addDocMappingIfMissing( + MlStatsIndex.writeAlias(), + MlStatsIndex::mapping, + client, + clusterState, + listener) + , listener::onFailure + ); + + MlStatsIndex.createStatsIndexAndAliasIfNecessary(client, clusterState, expressionResolver, createIndexListener); + } + private void executeStartingJob(DataFrameAnalyticsTask task, DataFrameAnalyticsConfig config) { DataFrameAnalyticsTaskState reindexingState = new DataFrameAnalyticsTaskState(DataFrameAnalyticsState.REINDEXING, task.getAllocationId(), null); diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/process/AnalyticsProcessManager.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/process/AnalyticsProcessManager.java index da65cd768e556..ee38f2a63d0f4 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/process/AnalyticsProcessManager.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/process/AnalyticsProcessManager.java @@ -426,7 +426,8 @@ private AnalyticsResultProcessor createResultProcessor(DataFrameAnalyticsTask ta DataFrameRowsJoiner dataFrameRowsJoiner = new DataFrameRowsJoiner(config.getId(), dataExtractorFactory.newExtractor(true), resultsPersisterService); return new AnalyticsResultProcessor( - config, dataFrameRowsJoiner, task.getStatsHolder(), trainedModelProvider, auditor, dataExtractor.get().getFieldNames()); + config, dataFrameRowsJoiner, task.getStatsHolder(), trainedModelProvider, auditor, resultsPersisterService, + dataExtractor.get().getFieldNames()); } } } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/process/AnalyticsResultProcessor.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/process/AnalyticsResultProcessor.java index 636502dbe0058..cd990ca11ab33 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/process/AnalyticsResultProcessor.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/process/AnalyticsResultProcessor.java @@ -11,24 +11,32 @@ import org.elasticsearch.Version; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.LatchedActionListener; +import org.elasticsearch.action.support.WriteRequest; import org.elasticsearch.common.Nullable; +import org.elasticsearch.common.xcontent.ToXContent; +import org.elasticsearch.common.xcontent.ToXContentObject; import org.elasticsearch.common.xcontent.XContentHelper; import org.elasticsearch.common.xcontent.json.JsonXContent; import org.elasticsearch.license.License; +import org.elasticsearch.xpack.core.ml.MlStatsIndex; import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsConfig; import org.elasticsearch.xpack.core.ml.dataframe.analyses.Classification; import org.elasticsearch.xpack.core.ml.dataframe.analyses.Regression; +import org.elasticsearch.xpack.core.ml.dataframe.stats.MemoryUsage; import org.elasticsearch.xpack.core.ml.inference.TrainedModelConfig; import org.elasticsearch.xpack.core.ml.inference.TrainedModelDefinition; import org.elasticsearch.xpack.core.ml.inference.TrainedModelInput; import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper; +import org.elasticsearch.xpack.core.ml.utils.ToXContentParams; import org.elasticsearch.xpack.core.security.user.XPackUser; import org.elasticsearch.xpack.ml.dataframe.process.results.AnalyticsResult; import org.elasticsearch.xpack.ml.dataframe.process.results.RowResults; import org.elasticsearch.xpack.ml.dataframe.stats.StatsHolder; import org.elasticsearch.xpack.ml.inference.persistence.TrainedModelProvider; import org.elasticsearch.xpack.ml.notifications.DataFrameAnalyticsAuditor; +import org.elasticsearch.xpack.ml.utils.persistence.ResultsPersisterService; +import java.io.IOException; import java.time.Instant; import java.util.Collections; import java.util.Iterator; @@ -36,6 +44,7 @@ import java.util.Objects; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; +import java.util.function.Function; import static java.util.stream.Collectors.toList; @@ -60,6 +69,7 @@ public class AnalyticsResultProcessor { private final StatsHolder statsHolder; private final TrainedModelProvider trainedModelProvider; private final DataFrameAnalyticsAuditor auditor; + private final ResultsPersisterService resultsPersisterService; private final List fieldNames; private final CountDownLatch completionLatch = new CountDownLatch(1); private volatile String failure; @@ -67,12 +77,14 @@ public class AnalyticsResultProcessor { public AnalyticsResultProcessor(DataFrameAnalyticsConfig analytics, DataFrameRowsJoiner dataFrameRowsJoiner, StatsHolder statsHolder, TrainedModelProvider trainedModelProvider, - DataFrameAnalyticsAuditor auditor, List fieldNames) { + DataFrameAnalyticsAuditor auditor, ResultsPersisterService resultsPersisterService, + List fieldNames) { this.analytics = Objects.requireNonNull(analytics); this.dataFrameRowsJoiner = Objects.requireNonNull(dataFrameRowsJoiner); this.statsHolder = Objects.requireNonNull(statsHolder); this.trainedModelProvider = Objects.requireNonNull(trainedModelProvider); this.auditor = Objects.requireNonNull(auditor); + this.resultsPersisterService = Objects.requireNonNull(resultsPersisterService); this.fieldNames = Collections.unmodifiableList(Objects.requireNonNull(fieldNames)); } @@ -148,6 +160,11 @@ private void processResult(AnalyticsResult result, DataFrameRowsJoiner resultsJo if (inferenceModelBuilder != null) { createAndIndexInferenceModel(inferenceModelBuilder); } + MemoryUsage memoryUsage = result.getMemoryUsage(); + if (memoryUsage != null) { + statsHolder.setMemoryUsage(memoryUsage); + indexStatsResult(memoryUsage, memoryUsage::documentId); + } } private void createAndIndexInferenceModel(TrainedModelDefinition.Builder inferenceModel) { @@ -224,4 +241,23 @@ private void setAndReportFailure(Exception e) { failure = "error processing results; " + e.getMessage(); auditor.error(analytics.getId(), "Error processing results; " + e.getMessage()); } + + private void indexStatsResult(ToXContentObject result, Function docIdSupplier) { + try { + resultsPersisterService.indexWithRetry(analytics.getId(), + MlStatsIndex.writeAlias(), + result, + new ToXContent.MapParams(Collections.singletonMap(ToXContentParams.FOR_INTERNAL_STORAGE, "true")), + WriteRequest.RefreshPolicy.IMMEDIATE, + docIdSupplier.apply(analytics.getId()), + () -> true, + errorMsg -> auditor.error(analytics.getId(), + "failed to persist result with id [" + docIdSupplier.apply(analytics.getId()) + "]; " + errorMsg) + ); + } catch (IOException ioe) { + LOGGER.error(() -> new ParameterizedMessage("[{}] Failed indexing stats result", analytics.getId()), ioe); + } catch (Exception e) { + LOGGER.error(() -> new ParameterizedMessage("[{}] Failed indexing stats result", analytics.getId()), e); + } + } } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/process/results/AnalyticsResult.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/process/results/AnalyticsResult.java index b1c7bf6599a75..fcac851fa13ef 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/process/results/AnalyticsResult.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/process/results/AnalyticsResult.java @@ -5,11 +5,13 @@ */ package org.elasticsearch.xpack.ml.dataframe.process.results; +import org.elasticsearch.common.Nullable; import org.elasticsearch.common.ParseField; import org.elasticsearch.common.xcontent.ConstructingObjectParser; import org.elasticsearch.common.xcontent.ToXContent; import org.elasticsearch.common.xcontent.ToXContentObject; import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.xpack.core.ml.dataframe.stats.MemoryUsage; import org.elasticsearch.xpack.core.ml.inference.TrainedModelDefinition; import java.io.IOException; @@ -23,29 +25,36 @@ public class AnalyticsResult implements ToXContentObject { public static final ParseField TYPE = new ParseField("analytics_result"); - public static final ParseField PROGRESS_PERCENT = new ParseField("progress_percent"); - public static final ParseField INFERENCE_MODEL = new ParseField("inference_model"); + private static final ParseField PROGRESS_PERCENT = new ParseField("progress_percent"); + private static final ParseField INFERENCE_MODEL = new ParseField("inference_model"); + private static final ParseField ANALYTICS_MEMORY_USAGE = new ParseField("analytics_memory_usage"); public static final ConstructingObjectParser PARSER = new ConstructingObjectParser<>(TYPE.getPreferredName(), - a -> new AnalyticsResult((RowResults) a[0], (Integer) a[1], (TrainedModelDefinition.Builder) a[2])); + a -> new AnalyticsResult((RowResults) a[0], (Integer) a[1], (TrainedModelDefinition.Builder) a[2], (MemoryUsage) a[3])); static { PARSER.declareObject(optionalConstructorArg(), RowResults.PARSER, RowResults.TYPE); PARSER.declareInt(optionalConstructorArg(), PROGRESS_PERCENT); // TODO change back to STRICT_PARSER once native side is aligned PARSER.declareObject(optionalConstructorArg(), TrainedModelDefinition.LENIENT_PARSER, INFERENCE_MODEL); + PARSER.declareObject(optionalConstructorArg(), MemoryUsage.STRICT_PARSER, ANALYTICS_MEMORY_USAGE); } private final RowResults rowResults; private final Integer progressPercent; private final TrainedModelDefinition.Builder inferenceModelBuilder; private final TrainedModelDefinition inferenceModel; + private final MemoryUsage memoryUsage; - public AnalyticsResult(RowResults rowResults, Integer progressPercent, TrainedModelDefinition.Builder inferenceModelBuilder) { + public AnalyticsResult(@Nullable RowResults rowResults, + @Nullable Integer progressPercent, + @Nullable TrainedModelDefinition.Builder inferenceModelBuilder, + @Nullable MemoryUsage memoryUsage) { this.rowResults = rowResults; this.progressPercent = progressPercent; this.inferenceModelBuilder = inferenceModelBuilder; this.inferenceModel = inferenceModelBuilder == null ? null : inferenceModelBuilder.build(); + this.memoryUsage = memoryUsage; } public RowResults getRowResults() { @@ -60,6 +69,10 @@ public TrainedModelDefinition.Builder getInferenceModelBuilder() { return inferenceModelBuilder; } + public MemoryUsage getMemoryUsage() { + return memoryUsage; + } + @Override public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { builder.startObject(); @@ -74,6 +87,9 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws inferenceModel, new ToXContent.MapParams(Collections.singletonMap(FOR_INTERNAL_STORAGE, "true"))); } + if (memoryUsage != null) { + builder.field(ANALYTICS_MEMORY_USAGE.getPreferredName(), memoryUsage, params); + } builder.endObject(); return builder; } @@ -90,11 +106,12 @@ public boolean equals(Object other) { AnalyticsResult that = (AnalyticsResult) other; return Objects.equals(rowResults, that.rowResults) && Objects.equals(progressPercent, that.progressPercent) - && Objects.equals(inferenceModel, that.inferenceModel); + && Objects.equals(inferenceModel, that.inferenceModel) + && Objects.equals(memoryUsage, that.memoryUsage); } @Override public int hashCode() { - return Objects.hash(rowResults, progressPercent, inferenceModel); + return Objects.hash(rowResults, progressPercent, inferenceModel, memoryUsage); } } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/stats/StatsHolder.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/stats/StatsHolder.java index ac0396b3e81ca..d2e9bdd957ec6 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/stats/StatsHolder.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/stats/StatsHolder.java @@ -5,15 +5,33 @@ */ package org.elasticsearch.xpack.ml.dataframe.stats; +import org.elasticsearch.xpack.core.ml.dataframe.stats.MemoryUsage; + +import java.util.concurrent.atomic.AtomicReference; + /** * Holds data frame analytics stats in memory so that they may be retrieved * from the get stats api for started jobs efficiently. */ public class StatsHolder { - private final ProgressTracker progressTracker = new ProgressTracker(); + private final ProgressTracker progressTracker; + private final AtomicReference memoryUsageHolder; + + public StatsHolder() { + progressTracker = new ProgressTracker(); + memoryUsageHolder = new AtomicReference<>(); + } public ProgressTracker getProgressTracker() { return progressTracker; } + + public void setMemoryUsage(MemoryUsage memoryUsage) { + memoryUsageHolder.set(memoryUsage); + } + + public MemoryUsage getMemoryUsage() { + return memoryUsageHolder.get(); + } } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobResultsProvider.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobResultsProvider.java index d5d85d3fa6088..48e8fc9ecdaa2 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobResultsProvider.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobResultsProvider.java @@ -112,6 +112,7 @@ import org.elasticsearch.xpack.ml.job.persistence.InfluencersQueryBuilder.InfluencersQuery; import org.elasticsearch.xpack.ml.job.process.autodetect.params.AutodetectParams; import org.elasticsearch.xpack.ml.utils.MlIndicesUtils; +import org.elasticsearch.xpack.ml.utils.persistence.MlParserUtils; import java.io.IOException; import java.io.InputStream; @@ -507,7 +508,7 @@ public void datafeedTimingStats(List jobIds, ActionListener T parseSearchHit(SearchHit hit, BiFunction objectParser) { - BytesReference source = hit.getSourceRef(); - try (InputStream stream = source.streamInput(); - XContentParser parser = XContentFactory.xContent(XContentType.JSON) - .createParser(NamedXContentRegistry.EMPTY, LoggingDeprecationHandler.INSTANCE, stream)) { - return objectParser.apply(parser, null); - } catch (IOException e) { - throw new ElasticsearchParseException("failed to parse " + hit.getId(), e); - } - } - /** * Search for buckets with the parameters in the {@link BucketsQueryBuilder} * Uses the internal client, so runs as the _xpack user @@ -1119,7 +1103,7 @@ private void searchSingleResult(String jobId, String resultDescription, S handler.accept(new Result<>(null, notFoundSupplier.get())); } else if (hits.length == 1) { try { - T result = parseSearchHit(hits[0], objectParser); + T result = MlParserUtils.parse(hits[0], objectParser); handler.accept(new Result<>(hits[0].getIndex(), result)); } catch (Exception e) { errorHandler.accept(e); @@ -1263,7 +1247,7 @@ public void scheduledEvents(ScheduledEventsQueryBuilder query, ActionListener(calendars, response.getHits().getTotalHits().value, Calendar.RESULTS_FIELD)); diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/utils/persistence/MlParserUtils.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/utils/persistence/MlParserUtils.java new file mode 100644 index 0000000000000..5585972ad8404 --- /dev/null +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/utils/persistence/MlParserUtils.java @@ -0,0 +1,42 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.ml.utils.persistence; + +import org.elasticsearch.ElasticsearchException; +import org.elasticsearch.ElasticsearchParseException; +import org.elasticsearch.common.bytes.BytesReference; +import org.elasticsearch.common.xcontent.LoggingDeprecationHandler; +import org.elasticsearch.common.xcontent.NamedXContentRegistry; +import org.elasticsearch.common.xcontent.XContentFactory; +import org.elasticsearch.common.xcontent.XContentParser; +import org.elasticsearch.common.xcontent.XContentType; +import org.elasticsearch.search.SearchHit; + +import java.io.IOException; +import java.io.InputStream; +import java.util.function.BiFunction; + +public final class MlParserUtils { + + private MlParserUtils() {} + + /** + * @param hit The search hit to parse + * @param objectParser Parser for the object of type T + * @return The parsed value of T from the search hit + * @throws ElasticsearchException on failure + */ + public static T parse(SearchHit hit, BiFunction objectParser) { + BytesReference source = hit.getSourceRef(); + try (InputStream stream = source.streamInput(); + XContentParser parser = XContentFactory.xContent(XContentType.JSON) + .createParser(NamedXContentRegistry.EMPTY, LoggingDeprecationHandler.INSTANCE, stream)) { + return objectParser.apply(parser, null); + } catch (IOException e) { + throw new ElasticsearchParseException("failed to parse " + hit.getId(), e); + } + } +} diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/dataframe/process/AnalyticsProcessManagerTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/dataframe/process/AnalyticsProcessManagerTests.java index 84a75506ab4a1..329e7c552d453 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/dataframe/process/AnalyticsProcessManagerTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/dataframe/process/AnalyticsProcessManagerTests.java @@ -53,7 +53,7 @@ public class AnalyticsProcessManagerTests extends ESTestCase { private static final String CONFIG_ID = "config-id"; private static final int NUM_ROWS = 100; private static final int NUM_COLS = 4; - private static final AnalyticsResult PROCESS_RESULT = new AnalyticsResult(null, null, null); + private static final AnalyticsResult PROCESS_RESULT = new AnalyticsResult(null, null, null, null); private Client client; private DataFrameAnalyticsAuditor auditor; diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/dataframe/process/AnalyticsResultProcessorTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/dataframe/process/AnalyticsResultProcessorTests.java index a93e3f4b0f126..044a06efbb2ce 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/dataframe/process/AnalyticsResultProcessorTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/dataframe/process/AnalyticsResultProcessorTests.java @@ -27,6 +27,7 @@ import org.elasticsearch.xpack.ml.extractor.ExtractedFields; import org.elasticsearch.xpack.ml.inference.persistence.TrainedModelProvider; import org.elasticsearch.xpack.ml.notifications.DataFrameAnalyticsAuditor; +import org.elasticsearch.xpack.ml.utils.persistence.ResultsPersisterService; import org.junit.Before; import org.mockito.ArgumentCaptor; import org.mockito.InOrder; @@ -61,6 +62,7 @@ public class AnalyticsResultProcessorTests extends ESTestCase { private StatsHolder statsHolder = new StatsHolder(); private TrainedModelProvider trainedModelProvider; private DataFrameAnalyticsAuditor auditor; + private ResultsPersisterService resultsPersisterService; private DataFrameAnalyticsConfig analyticsConfig; @Before @@ -70,6 +72,7 @@ public void setUpMocks() { dataFrameRowsJoiner = mock(DataFrameRowsJoiner.class); trainedModelProvider = mock(TrainedModelProvider.class); auditor = mock(DataFrameAnalyticsAuditor.class); + resultsPersisterService = mock(ResultsPersisterService.class); analyticsConfig = new DataFrameAnalyticsConfig.Builder() .setId(JOB_ID) .setDescription(JOB_DESCRIPTION) @@ -93,7 +96,7 @@ public void testProcess_GivenNoResults() { public void testProcess_GivenEmptyResults() { givenDataFrameRows(2); - givenProcessResults(Arrays.asList(new AnalyticsResult(null, 50, null), new AnalyticsResult(null, 100, null))); + givenProcessResults(Arrays.asList(new AnalyticsResult(null, 50, null, null), new AnalyticsResult(null, 100, null, null))); AnalyticsResultProcessor resultProcessor = createResultProcessor(); resultProcessor.process(process); @@ -108,7 +111,8 @@ public void testProcess_GivenRowResults() { givenDataFrameRows(2); RowResults rowResults1 = mock(RowResults.class); RowResults rowResults2 = mock(RowResults.class); - givenProcessResults(Arrays.asList(new AnalyticsResult(rowResults1, 50, null), new AnalyticsResult(rowResults2, 100, null))); + givenProcessResults(Arrays.asList(new AnalyticsResult(rowResults1, 50, null, null), + new AnalyticsResult(rowResults2, 100, null, null))); AnalyticsResultProcessor resultProcessor = createResultProcessor(); resultProcessor.process(process); @@ -125,7 +129,8 @@ public void testProcess_GivenDataFrameRowsJoinerFails() { givenDataFrameRows(2); RowResults rowResults1 = mock(RowResults.class); RowResults rowResults2 = mock(RowResults.class); - givenProcessResults(Arrays.asList(new AnalyticsResult(rowResults1, 50, null), new AnalyticsResult(rowResults2, 100, null))); + givenProcessResults(Arrays.asList(new AnalyticsResult(rowResults1, 50, null, null), + new AnalyticsResult(rowResults2, 100, null, null))); doThrow(new RuntimeException("some failure")).when(dataFrameRowsJoiner).processRowResults(any(RowResults.class)); @@ -155,7 +160,7 @@ public void testProcess_GivenInferenceModelIsStoredSuccessfully() { List expectedFieldNames = Arrays.asList("foo", "bar", "baz"); TrainedModelDefinition.Builder inferenceModel = TrainedModelDefinitionTests.createRandomBuilder(); - givenProcessResults(Arrays.asList(new AnalyticsResult(null, null, inferenceModel))); + givenProcessResults(Arrays.asList(new AnalyticsResult(null, null, inferenceModel, null))); AnalyticsResultProcessor resultProcessor = createResultProcessor(expectedFieldNames); resultProcessor.process(process); @@ -199,7 +204,7 @@ public void testProcess_GivenInferenceModelFailedToStore() { }).when(trainedModelProvider).storeTrainedModel(any(TrainedModelConfig.class), any(ActionListener.class)); TrainedModelDefinition.Builder inferenceModel = TrainedModelDefinitionTests.createRandomBuilder(); - givenProcessResults(Arrays.asList(new AnalyticsResult(null, null, inferenceModel))); + givenProcessResults(Arrays.asList(new AnalyticsResult(null, null, inferenceModel, null))); AnalyticsResultProcessor resultProcessor = createResultProcessor(); resultProcessor.process(process); @@ -232,6 +237,6 @@ private AnalyticsResultProcessor createResultProcessor() { private AnalyticsResultProcessor createResultProcessor(List fieldNames) { return new AnalyticsResultProcessor( - analyticsConfig, dataFrameRowsJoiner, statsHolder, trainedModelProvider, auditor, fieldNames); + analyticsConfig, dataFrameRowsJoiner, statsHolder, trainedModelProvider, auditor, resultsPersisterService, fieldNames); } } diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/dataframe/process/results/AnalyticsResultTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/dataframe/process/results/AnalyticsResultTests.java index 1758659bf3c20..3b949d84ed0ef 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/dataframe/process/results/AnalyticsResultTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/dataframe/process/results/AnalyticsResultTests.java @@ -7,12 +7,15 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.xcontent.NamedXContentRegistry; +import org.elasticsearch.common.xcontent.ToXContent; import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.search.SearchModule; import org.elasticsearch.test.AbstractXContentTestCase; +import org.elasticsearch.xpack.core.ml.dataframe.stats.MemoryUsageTests; import org.elasticsearch.xpack.core.ml.inference.MlInferenceNamedXContentProvider; import org.elasticsearch.xpack.core.ml.inference.TrainedModelDefinition; import org.elasticsearch.xpack.core.ml.inference.TrainedModelDefinitionTests; +import org.elasticsearch.xpack.core.ml.utils.ToXContentParams; import java.util.ArrayList; import java.util.Collections; @@ -42,7 +45,7 @@ protected AnalyticsResult createTestInstance() { if (randomBoolean()) { inferenceModel = TrainedModelDefinitionTests.createRandomBuilder(); } - return new AnalyticsResult(rowResults, progressPercent, inferenceModel); + return new AnalyticsResult(rowResults, progressPercent, inferenceModel, MemoryUsageTests.createRandom()); } @Override @@ -50,6 +53,11 @@ protected AnalyticsResult doParseInstance(XContentParser parser) { return AnalyticsResult.PARSER.apply(parser, null); } + @Override + protected ToXContent.Params getToXContentParams() { + return new ToXContent.MapParams(Collections.singletonMap(ToXContentParams.FOR_INTERNAL_STORAGE, "true")); + } + @Override protected boolean supportsUnknownFields() { return false;