Skip to content

Commit

Permalink
[ML] Improve progress reportings for DF analytics (#45856)
Browse files Browse the repository at this point in the history
Previously, the stats API reports a progress percentage
for DF analytics tasks that are running and are in the
`reindexing` or `analyzing` state.

This means that when the task is `stopped` there is no progress
reported. Thus, one cannot distinguish between a task that never
run to one that completed.

In addition, there are blind spots in the progress reporting.
In particular, we do not account for when data is loaded into the
process. We also do not account for when results are written.

This commit addresses the above issues. It changes progress
to being a list of objects, each one describing the phase
and its progress as a percentage. We currently have 4 phases:
reindexing, loading_data, analyzing, writing_results.

When the task stops, progress is persisted as a document in the
state index. The stats API now reports progress from in-memory
if the task is running, or returns the persisted document
(if there is one).
  • Loading branch information
dimitris-athanasiou authored Aug 23, 2019
1 parent c8aceb5 commit f6a97de
Show file tree
Hide file tree
Showing 27 changed files with 980 additions and 128 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.elasticsearch.common.xcontent.XContentParser;

import java.io.IOException;
import java.util.List;
import java.util.Objects;

import static org.elasticsearch.common.xcontent.ConstructingObjectParser.constructorArg;
Expand All @@ -42,17 +43,18 @@ public static DataFrameAnalyticsStats fromXContent(XContentParser parser) throws
static final ParseField ID = new ParseField("id");
static final ParseField STATE = new ParseField("state");
static final ParseField FAILURE_REASON = new ParseField("failure_reason");
static final ParseField PROGRESS_PERCENT = new ParseField("progress_percent");
static final ParseField PROGRESS = new ParseField("progress");
static final ParseField NODE = new ParseField("node");
static final ParseField ASSIGNMENT_EXPLANATION = new ParseField("assignment_explanation");

@SuppressWarnings("unchecked")
private static final ConstructingObjectParser<DataFrameAnalyticsStats, Void> PARSER =
new ConstructingObjectParser<>("data_frame_analytics_stats", true,
args -> new DataFrameAnalyticsStats(
(String) args[0],
(DataFrameAnalyticsState) args[1],
(String) args[2],
(Integer) args[3],
(List<PhaseProgress>) args[3],
(NodeAttributes) args[4],
(String) args[5]));

Expand All @@ -65,25 +67,25 @@ public static DataFrameAnalyticsStats fromXContent(XContentParser parser) throws
throw new IllegalArgumentException("Unsupported token [" + p.currentToken() + "]");
}, STATE, ObjectParser.ValueType.STRING);
PARSER.declareString(optionalConstructorArg(), FAILURE_REASON);
PARSER.declareInt(optionalConstructorArg(), PROGRESS_PERCENT);
PARSER.declareObjectArray(optionalConstructorArg(), PhaseProgress.PARSER, PROGRESS);
PARSER.declareObject(optionalConstructorArg(), NodeAttributes.PARSER, NODE);
PARSER.declareString(optionalConstructorArg(), ASSIGNMENT_EXPLANATION);
}

private final String id;
private final DataFrameAnalyticsState state;
private final String failureReason;
private final Integer progressPercent;
private final List<PhaseProgress> progress;
private final NodeAttributes node;
private final String assignmentExplanation;

public DataFrameAnalyticsStats(String id, DataFrameAnalyticsState state, @Nullable String failureReason,
@Nullable Integer progressPercent, @Nullable NodeAttributes node,
@Nullable List<PhaseProgress> progress, @Nullable NodeAttributes node,
@Nullable String assignmentExplanation) {
this.id = id;
this.state = state;
this.failureReason = failureReason;
this.progressPercent = progressPercent;
this.progress = progress;
this.node = node;
this.assignmentExplanation = assignmentExplanation;
}
Expand All @@ -100,8 +102,8 @@ public String getFailureReason() {
return failureReason;
}

public Integer getProgressPercent() {
return progressPercent;
public List<PhaseProgress> getProgress() {
return progress;
}

public NodeAttributes getNode() {
Expand All @@ -121,14 +123,14 @@ public boolean equals(Object o) {
return Objects.equals(id, other.id)
&& Objects.equals(state, other.state)
&& Objects.equals(failureReason, other.failureReason)
&& Objects.equals(progressPercent, other.progressPercent)
&& Objects.equals(progress, other.progress)
&& Objects.equals(node, other.node)
&& Objects.equals(assignmentExplanation, other.assignmentExplanation);
}

@Override
public int hashCode() {
return Objects.hash(id, state, failureReason, progressPercent, node, assignmentExplanation);
return Objects.hash(id, state, failureReason, progress, node, assignmentExplanation);
}

@Override
Expand All @@ -137,7 +139,7 @@ public String toString() {
.add("id", id)
.add("state", state)
.add("failureReason", failureReason)
.add("progressPercent", progressPercent)
.add("progress", progress)
.add("node", node)
.add("assignmentExplanation", assignmentExplanation)
.toString();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.client.ml.dataframe;

import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.inject.internal.ToStringBuilder;
import org.elasticsearch.common.xcontent.ConstructingObjectParser;
import org.elasticsearch.common.xcontent.ToXContentObject;
import org.elasticsearch.common.xcontent.XContentBuilder;

import java.io.IOException;
import java.util.Objects;

/**
* A class that describes a phase and its progress as a percentage
*/
public class PhaseProgress implements ToXContentObject {

static final ParseField PHASE = new ParseField("phase");
static final ParseField PROGRESS_PERCENT = new ParseField("progress_percent");

public static final ConstructingObjectParser<PhaseProgress, Void> PARSER = new ConstructingObjectParser<>("phase_progress",
true, a -> new PhaseProgress((String) a[0], (int) a[1]));

static {
PARSER.declareString(ConstructingObjectParser.constructorArg(), PHASE);
PARSER.declareInt(ConstructingObjectParser.constructorArg(), PROGRESS_PERCENT);
}

private final String phase;
private final int progressPercent;

public PhaseProgress(String phase, int progressPercent) {
this.phase = Objects.requireNonNull(phase);
this.progressPercent = progressPercent;
}

public String getPhase() {
return phase;
}

public int getProgressPercent() {
return progressPercent;
}

@Override
public int hashCode() {
return Objects.hash(phase, progressPercent);
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
PhaseProgress that = (PhaseProgress) o;
return Objects.equals(phase, that.phase) && progressPercent == that.progressPercent;
}

@Override
public String toString() {
return new ToStringBuilder(getClass())
.add(PHASE.getPreferredName(), phase)
.add(PROGRESS_PERCENT.getPreferredName(), progressPercent)
.toString();
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
builder.field(PhaseProgress.PHASE.getPreferredName(), phase);
builder.field(PhaseProgress.PROGRESS_PERCENT.getPreferredName(), progressPercent);
builder.endObject();
return builder;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,7 @@
import org.elasticsearch.client.ml.dataframe.DataFrameAnalyticsState;
import org.elasticsearch.client.ml.dataframe.DataFrameAnalyticsStats;
import org.elasticsearch.client.ml.dataframe.OutlierDetection;
import org.elasticsearch.client.ml.dataframe.PhaseProgress;
import org.elasticsearch.client.ml.dataframe.QueryConfig;
import org.elasticsearch.client.ml.dataframe.evaluation.regression.MeanSquaredErrorMetric;
import org.elasticsearch.client.ml.dataframe.evaluation.regression.RSquaredMetric;
Expand Down Expand Up @@ -1377,11 +1378,17 @@ public void testGetDataFrameAnalyticsStats() throws Exception {
assertThat(stats.getId(), equalTo(configId));
assertThat(stats.getState(), equalTo(DataFrameAnalyticsState.STOPPED));
assertNull(stats.getFailureReason());
assertNull(stats.getProgressPercent());
assertNull(stats.getNode());
assertNull(stats.getAssignmentExplanation());
assertThat(statsResponse.getNodeFailures(), hasSize(0));
assertThat(statsResponse.getTaskFailures(), hasSize(0));
List<PhaseProgress> progress = stats.getProgress();
assertThat(progress, is(notNullValue()));
assertThat(progress.size(), equalTo(4));
assertThat(progress.get(0), equalTo(new PhaseProgress("reindexing", 0)));
assertThat(progress.get(1), equalTo(new PhaseProgress("loading_data", 0)));
assertThat(progress.get(2), equalTo(new PhaseProgress("analyzing", 0)));
assertThat(progress.get(3), equalTo(new PhaseProgress("writing_results", 0)));
}

public void testStartDataFrameAnalyticsConfig() throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
import org.elasticsearch.test.ESTestCase;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

import static org.elasticsearch.test.AbstractXContentTestCase.xContentTester;

Expand All @@ -44,20 +46,29 @@ public static DataFrameAnalyticsStats randomDataFrameAnalyticsStats() {
randomAlphaOfLengthBetween(1, 10),
randomFrom(DataFrameAnalyticsState.values()),
randomBoolean() ? null : randomAlphaOfLength(10),
randomBoolean() ? null : randomIntBetween(0, 100),
randomBoolean() ? null : createRandomProgress(),
randomBoolean() ? null : NodeAttributesTests.createRandom(),
randomBoolean() ? null : randomAlphaOfLengthBetween(1, 20));
}

private static List<PhaseProgress> createRandomProgress() {
int progressPhaseCount = randomIntBetween(3, 7);
List<PhaseProgress> progress = new ArrayList<>(progressPhaseCount);
for (int i = 0; i < progressPhaseCount; i++) {
progress.add(new PhaseProgress(randomAlphaOfLength(20), randomIntBetween(0, 100)));
}
return progress;
}

public static void toXContent(DataFrameAnalyticsStats stats, XContentBuilder builder) throws IOException {
builder.startObject();
builder.field(DataFrameAnalyticsStats.ID.getPreferredName(), stats.getId());
builder.field(DataFrameAnalyticsStats.STATE.getPreferredName(), stats.getState().value());
if (stats.getFailureReason() != null) {
builder.field(DataFrameAnalyticsStats.FAILURE_REASON.getPreferredName(), stats.getFailureReason());
}
if (stats.getProgressPercent() != null) {
builder.field(DataFrameAnalyticsStats.PROGRESS_PERCENT.getPreferredName(), stats.getProgressPercent());
if (stats.getProgress() != null) {
builder.field(DataFrameAnalyticsStats.PROGRESS.getPreferredName(), stats.getProgress());
}
if (stats.getNode() != null) {
builder.field(DataFrameAnalyticsStats.NODE.getPreferredName(), stats.getNode());
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.client.ml.dataframe;

import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.test.AbstractXContentTestCase;

import java.io.IOException;

public class PhaseProgressTests extends AbstractXContentTestCase<PhaseProgress> {

public static PhaseProgress createRandom() {
return new PhaseProgress(randomAlphaOfLength(20), randomIntBetween(0, 100));
}

@Override
protected PhaseProgress createTestInstance() {
return createRandom();
}

@Override
protected PhaseProgress doParseInstance(XContentParser parser) throws IOException {
return PhaseProgress.PARSER.apply(parser, null);
}

@Override
protected boolean supportsUnknownFields() {
return true;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,25 @@ The API returns the following results:
"data_frame_analytics": [
{
"id": "loganalytics",
"state": "stopped"
"state": "stopped",
"progress": [
{
"phase": "reindexing",
"progress_percent": 0
},
{
"phase": "loading_data",
"progress_percent": 0
},
{
"phase": "analyzing",
"progress_percent": 0
},
{
"phase": "writing_results",
"progress_percent": 0
}
]
}
]
}
Expand Down
Loading

0 comments on commit f6a97de

Please sign in to comment.