Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ML] Improve response format of data frame stats endpoint #44350

Merged
merged 19 commits into from
Jul 23, 2019
Merged
Show file tree
Hide file tree
Changes from 14 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@

import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.TaskOperationFailure;
import org.elasticsearch.client.dataframe.transforms.DataFrameTransformStateAndStats;
import org.elasticsearch.client.dataframe.transforms.DataFrameTransformStats;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.xcontent.ConstructingObjectParser;
Expand All @@ -42,11 +42,11 @@ public class GetDataFrameTransformStatsResponse {
@SuppressWarnings("unchecked")
static final ConstructingObjectParser<GetDataFrameTransformStatsResponse, Void> PARSER = new ConstructingObjectParser<>(
"get_data_frame_transform_stats_response", true,
args -> new GetDataFrameTransformStatsResponse((List<DataFrameTransformStateAndStats>) args[0],
args -> new GetDataFrameTransformStatsResponse((List<DataFrameTransformStats>) args[0],
(List<TaskOperationFailure>) args[1], (List<ElasticsearchException>) args[2]));

static {
PARSER.declareObjectArray(constructorArg(), DataFrameTransformStateAndStats.PARSER::apply, TRANSFORMS);
PARSER.declareObjectArray(constructorArg(), DataFrameTransformStats.PARSER::apply, TRANSFORMS);
// Discard the count field which is the size of the transforms array
PARSER.declareInt((a, b) -> {}, COUNT);
PARSER.declareObjectArray(optionalConstructorArg(), (p, c) -> TaskOperationFailure.fromXContent(p),
Expand All @@ -59,20 +59,20 @@ public static GetDataFrameTransformStatsResponse fromXContent(final XContentPars
return GetDataFrameTransformStatsResponse.PARSER.apply(parser, null);
}

private final List<DataFrameTransformStateAndStats> transformsStateAndStats;
private final List<DataFrameTransformStats> transformsStats;
private final List<TaskOperationFailure> taskFailures;
private final List<ElasticsearchException> nodeFailures;

public GetDataFrameTransformStatsResponse(List<DataFrameTransformStateAndStats> transformsStateAndStats,
public GetDataFrameTransformStatsResponse(List<DataFrameTransformStats> transformsStats,
@Nullable List<TaskOperationFailure> taskFailures,
@Nullable List<? extends ElasticsearchException> nodeFailures) {
this.transformsStateAndStats = transformsStateAndStats;
this.transformsStats = transformsStats;
this.taskFailures = taskFailures == null ? Collections.emptyList() : Collections.unmodifiableList(taskFailures);
this.nodeFailures = nodeFailures == null ? Collections.emptyList() : Collections.unmodifiableList(nodeFailures);
}

public List<DataFrameTransformStateAndStats> getTransformsStateAndStats() {
return transformsStateAndStats;
public List<DataFrameTransformStats> getTransformsStats() {
return transformsStats;
}

public List<ElasticsearchException> getNodeFailures() {
Expand All @@ -85,7 +85,7 @@ public List<TaskOperationFailure> getTaskFailures() {

@Override
public int hashCode() {
return Objects.hash(transformsStateAndStats, nodeFailures, taskFailures);
return Objects.hash(transformsStats, nodeFailures, taskFailures);
}

@Override
Expand All @@ -99,7 +99,7 @@ public boolean equals(Object other) {
}

final GetDataFrameTransformStatsResponse that = (GetDataFrameTransformStatsResponse) other;
return Objects.equals(this.transformsStateAndStats, that.transformsStateAndStats)
return Objects.equals(this.transformsStats, that.transformsStats)
&& Objects.equals(this.nodeFailures, that.nodeFailures)
&& Objects.equals(this.taskFailures, that.taskFailures);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,48 +19,86 @@

package org.elasticsearch.client.dataframe.transforms;

import org.elasticsearch.client.core.IndexerState;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.xcontent.ConstructingObjectParser;
import org.elasticsearch.common.xcontent.ObjectParser;
import org.elasticsearch.common.xcontent.XContentParser;

import java.io.IOException;
import java.util.Objects;

import static org.elasticsearch.common.xcontent.ConstructingObjectParser.optionalConstructorArg;

public class DataFrameTransformCheckpointStats {

public static final ParseField CHECKPOINT = new ParseField("checkpoint");
public static final ParseField INDEXER_STATE = new ParseField("indexer_state");
public static final ParseField POSITION = new ParseField("position");
public static final ParseField CHECKPOINT_PROGRESS = new ParseField("checkpoint_progress");
public static final ParseField TIMESTAMP_MILLIS = new ParseField("timestamp_millis");
public static final ParseField TIME_UPPER_BOUND_MILLIS = new ParseField("time_upper_bound_millis");

public static DataFrameTransformCheckpointStats EMPTY = new DataFrameTransformCheckpointStats(0L, 0L);
public static final DataFrameTransformCheckpointStats EMPTY = new DataFrameTransformCheckpointStats(0L, null, null, null, 0L, 0L);

private final long checkpoint;
private final IndexerState indexerState;
private final DataFrameIndexerPosition position;
private final DataFrameTransformProgress checkpointProgress;
private final long timestampMillis;
private final long timeUpperBoundMillis;

public static final ConstructingObjectParser<DataFrameTransformCheckpointStats, Void> LENIENT_PARSER = new ConstructingObjectParser<>(
"data_frame_transform_checkpoint_stats", true, args -> {
long timestamp = args[0] == null ? 0L : (Long) args[0];
long timeUpperBound = args[1] == null ? 0L : (Long) args[1];
long checkpoint = args[0] == null ? 0L : (Long) args[0];
IndexerState indexerState = (IndexerState) args[1];
DataFrameIndexerPosition position = (DataFrameIndexerPosition) args[2];
DataFrameTransformProgress checkpointProgress = (DataFrameTransformProgress) args[3];
long timestamp = args[4] == null ? 0L : (Long) args[4];
long timeUpperBound = args[5] == null ? 0L : (Long) args[5];

return new DataFrameTransformCheckpointStats(timestamp, timeUpperBound);
});
return new DataFrameTransformCheckpointStats(checkpoint, indexerState, position, checkpointProgress, timestamp, timeUpperBound);
});

static {
LENIENT_PARSER.declareLong(ConstructingObjectParser.optionalConstructorArg(), TIMESTAMP_MILLIS);
LENIENT_PARSER.declareLong(ConstructingObjectParser.optionalConstructorArg(), TIME_UPPER_BOUND_MILLIS);
LENIENT_PARSER.declareLong(optionalConstructorArg(), CHECKPOINT);
LENIENT_PARSER.declareField(optionalConstructorArg(), p -> IndexerState.fromString(p.text()), INDEXER_STATE,
ObjectParser.ValueType.STRING);
LENIENT_PARSER.declareObject(optionalConstructorArg(), DataFrameIndexerPosition.PARSER, POSITION);
LENIENT_PARSER.declareObject(optionalConstructorArg(), DataFrameTransformProgress.PARSER, CHECKPOINT_PROGRESS);
LENIENT_PARSER.declareLong(optionalConstructorArg(), TIMESTAMP_MILLIS);
LENIENT_PARSER.declareLong(optionalConstructorArg(), TIME_UPPER_BOUND_MILLIS);
}

public static DataFrameTransformCheckpointStats fromXContent(XContentParser parser) throws IOException {
return LENIENT_PARSER.parse(parser, null);
}

public DataFrameTransformCheckpointStats(final long timestampMillis, final long timeUpperBoundMillis) {
public DataFrameTransformCheckpointStats(final long checkpoint, final IndexerState indexerState,
final DataFrameIndexerPosition position, final DataFrameTransformProgress checkpointProgress,
final long timestampMillis, final long timeUpperBoundMillis) {
this.checkpoint = checkpoint;
this.indexerState = indexerState;
this.position = position;
this.checkpointProgress = checkpointProgress;
this.timestampMillis = timestampMillis;
this.timeUpperBoundMillis = timeUpperBoundMillis;
}

public DataFrameTransformCheckpointStats(StreamInput in) throws IOException {
this.timestampMillis = in.readLong();
this.timeUpperBoundMillis = in.readLong();
public long getCheckpoint() {
return checkpoint;
}

public IndexerState getIndexerState() {
return indexerState;
}

public DataFrameIndexerPosition getPosition() {
return position;
}

public DataFrameTransformProgress getCheckpointProgress() {
return checkpointProgress;
}

public long getTimestampMillis() {
Expand All @@ -73,7 +111,7 @@ public long getTimeUpperBoundMillis() {

@Override
public int hashCode() {
return Objects.hash(timestampMillis, timeUpperBoundMillis);
return Objects.hash(checkpoint, indexerState, position, checkpointProgress, timestampMillis, timeUpperBoundMillis);
}

@Override
Expand All @@ -88,6 +126,11 @@ public boolean equals(Object other) {

DataFrameTransformCheckpointStats that = (DataFrameTransformCheckpointStats) other;

return this.timestampMillis == that.timestampMillis && this.timeUpperBoundMillis == that.timeUpperBoundMillis;
return this.checkpoint == that.checkpoint
&& Objects.equals(this.indexerState, that.indexerState)
&& Objects.equals(this.position, that.position)
&& Objects.equals(this.checkpointProgress, that.checkpointProgress)
&& this.timestampMillis == that.timestampMillis
&& this.timeUpperBoundMillis == that.timeUpperBoundMillis;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,15 +27,14 @@

public class DataFrameTransformCheckpointingInfo {

public static final ParseField CURRENT_CHECKPOINT = new ParseField("current");
public static final ParseField IN_PROGRESS_CHECKPOINT = new ParseField("in_progress");
public static final ParseField LAST_CHECKPOINT = new ParseField("last", "current");
public static final ParseField NEXT_CHECKPOINT = new ParseField("next", "in_progress");
public static final ParseField OPERATIONS_BEHIND = new ParseField("operations_behind");

private final DataFrameTransformCheckpointStats current;
private final DataFrameTransformCheckpointStats inProgress;
private final DataFrameTransformCheckpointStats last;
private final DataFrameTransformCheckpointStats next;
private final long operationsBehind;


private static final ConstructingObjectParser<DataFrameTransformCheckpointingInfo, Void> LENIENT_PARSER =
new ConstructingObjectParser<>(
"data_frame_transform_checkpointing_info", true, a -> {
Expand All @@ -48,25 +47,25 @@ public class DataFrameTransformCheckpointingInfo {

static {
LENIENT_PARSER.declareObject(ConstructingObjectParser.optionalConstructorArg(),
(p, c) -> DataFrameTransformCheckpointStats.fromXContent(p), CURRENT_CHECKPOINT);
(p, c) -> DataFrameTransformCheckpointStats.fromXContent(p), LAST_CHECKPOINT);
LENIENT_PARSER.declareObject(ConstructingObjectParser.optionalConstructorArg(),
(p, c) -> DataFrameTransformCheckpointStats.fromXContent(p), IN_PROGRESS_CHECKPOINT);
(p, c) -> DataFrameTransformCheckpointStats.fromXContent(p), NEXT_CHECKPOINT);
LENIENT_PARSER.declareLong(ConstructingObjectParser.optionalConstructorArg(), OPERATIONS_BEHIND);
}

public DataFrameTransformCheckpointingInfo(DataFrameTransformCheckpointStats current, DataFrameTransformCheckpointStats inProgress,
public DataFrameTransformCheckpointingInfo(DataFrameTransformCheckpointStats last, DataFrameTransformCheckpointStats next,
long operationsBehind) {
this.current = Objects.requireNonNull(current);
this.inProgress = Objects.requireNonNull(inProgress);
this.last = Objects.requireNonNull(last);
this.next = Objects.requireNonNull(next);
this.operationsBehind = operationsBehind;
}

public DataFrameTransformCheckpointStats getCurrent() {
return current;
public DataFrameTransformCheckpointStats getLast() {
return last;
}

public DataFrameTransformCheckpointStats getInProgress() {
return inProgress;
public DataFrameTransformCheckpointStats getNext() {
return next;
}

public long getOperationsBehind() {
Expand All @@ -79,7 +78,7 @@ public static DataFrameTransformCheckpointingInfo fromXContent(XContentParser p)

@Override
public int hashCode() {
return Objects.hash(current, inProgress, operationsBehind);
return Objects.hash(last, next, operationsBehind);
}

@Override
Expand All @@ -94,8 +93,8 @@ public boolean equals(Object other) {

DataFrameTransformCheckpointingInfo that = (DataFrameTransformCheckpointingInfo) other;

return Objects.equals(this.current, that.current) &&
Objects.equals(this.inProgress, that.inProgress) &&
return Objects.equals(this.last, that.last) &&
Objects.equals(this.next, that.next) &&
this.operationsBehind == that.operationsBehind;
}

Expand Down
Loading