Skip to content

Commit

Permalink
[ML-DataFrame] Remove ID field from data frame indexer stats (#44848)
Browse files Browse the repository at this point in the history
This is a followup to #44350. The indexer stats used to
be persisted standalone, but now are only persisted as
part of a state-and-stats document. During the review
of #44350 it was decided that we'll stick with this
design, so there will never be a need for an indexer
stats object to store its transform ID as it is stored
on the enclosing document. This PR removes the indexer
stats document ID.

Backport of #44768
  • Loading branch information
droberts195 authored Jul 25, 2019
1 parent af937b1 commit b2e969f
Show file tree
Hide file tree
Showing 13 changed files with 46 additions and 98 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ public class DataFrameIndexerTransformStatsTests extends AbstractHlrcXContentTes

public static DataFrameIndexerTransformStats fromHlrc(
org.elasticsearch.client.dataframe.transforms.DataFrameIndexerTransformStats instance) {
return DataFrameIndexerTransformStats.withDefaultTransformId(instance.getNumPages(), instance.getNumDocuments(),
return new DataFrameIndexerTransformStats(instance.getNumPages(), instance.getNumDocuments(),
instance.getOutputDocuments(), instance.getNumInvocations(), instance.getIndexTime(), instance.getSearchTime(),
instance.getIndexTotal(), instance.getSearchTotal(), instance.getIndexFailures(), instance.getSearchFailures());
}
Expand All @@ -48,16 +48,16 @@ public DataFrameIndexerTransformStats convertHlrcToInternal(
return fromHlrc(instance);
}

public static DataFrameIndexerTransformStats randomStats(String transformId) {
return new DataFrameIndexerTransformStats(transformId, randomLongBetween(10L, 10000L),
public static DataFrameIndexerTransformStats randomStats() {
return new DataFrameIndexerTransformStats(randomLongBetween(10L, 10000L),
randomLongBetween(0L, 10000L), randomLongBetween(0L, 10000L), randomLongBetween(0L, 10000L), randomLongBetween(0L, 10000L),
randomLongBetween(0L, 10000L), randomLongBetween(0L, 10000L), randomLongBetween(0L, 10000L), randomLongBetween(0L, 10000L),
randomLongBetween(0L, 10000L));
}

@Override
protected DataFrameIndexerTransformStats createTestInstance() {
return randomStats(DataFrameIndexerTransformStats.DEFAULT_TRANSFORM_ID);
return randomStats();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,8 +79,7 @@ public static DataFrameTransformStats randomDataFrameTransformStats() {
randomFrom(DataFrameTransformTaskState.values()),
randomBoolean() ? null : randomAlphaOfLength(100),
randomBoolean() ? null : randomNodeAttributes(),
// TODO: remove this ID field from the server side as it's no longer needed
randomStats("_all"),
randomStats(),
DataFrameTransformCheckpointingInfoTests.randomDataFrameTransformCheckpointingInfo());
}

Expand Down Expand Up @@ -132,8 +131,8 @@ public static NodeAttributes randomNodeAttributes() {
attributes);
}

public static DataFrameIndexerTransformStats randomStats(String transformId) {
return new DataFrameIndexerTransformStats(transformId, randomLongBetween(10L, 10000L),
public static DataFrameIndexerTransformStats randomStats() {
return new DataFrameIndexerTransformStats(randomLongBetween(10L, 10000L),
randomLongBetween(0L, 10000L), randomLongBetween(0L, 10000L), randomLongBetween(0L, 10000L), randomLongBetween(0L, 10000L),
randomLongBetween(0L, 10000L), randomLongBetween(0L, 10000L), randomLongBetween(0L, 10000L), randomLongBetween(0L, 10000L),
randomLongBetween(0L, 10000L));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,24 +6,23 @@

package org.elasticsearch.xpack.core.dataframe.transforms;

import org.elasticsearch.common.Nullable;
import org.elasticsearch.Version;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.xcontent.ConstructingObjectParser;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.xpack.core.dataframe.DataFrameField;
import org.elasticsearch.xpack.core.indexing.IndexerJobStats;

import java.io.IOException;
import java.util.Objects;

import static org.elasticsearch.common.xcontent.ConstructingObjectParser.constructorArg;
import static org.elasticsearch.common.xcontent.ConstructingObjectParser.optionalConstructorArg;

public class DataFrameIndexerTransformStats extends IndexerJobStats {
public static final String DEFAULT_TRANSFORM_ID = "_all";

private static final String DEFAULT_TRANSFORM_ID = "_all"; // TODO remove when no longer needed for wire BWC

public static final String NAME = "data_frame_indexer_transform_stats";
public static ParseField NUM_PAGES = new ParseField("pages_processed");
Expand All @@ -39,12 +38,11 @@ public class DataFrameIndexerTransformStats extends IndexerJobStats {

private static final ConstructingObjectParser<DataFrameIndexerTransformStats, Void> LENIENT_PARSER = new ConstructingObjectParser<>(
NAME, true,
args -> new DataFrameIndexerTransformStats(args[0] != null ? (String) args[0] : DEFAULT_TRANSFORM_ID,
(long) args[1], (long) args[2], (long) args[3], (long) args[4], (long) args[5], (long) args[6], (long) args[7],
(long) args[8], (long) args[9], (long) args[10]));
args -> new DataFrameIndexerTransformStats(
(long) args[0], (long) args[1], (long) args[2], (long) args[3], (long) args[4], (long) args[5], (long) args[6],
(long) args[7], (long) args[8], (long) args[9]));

static {
LENIENT_PARSER.declareString(optionalConstructorArg(), DataFrameField.ID);
LENIENT_PARSER.declareLong(constructorArg(), NUM_PAGES);
LENIENT_PARSER.declareLong(constructorArg(), NUM_INPUT_DOCUMENTS);
LENIENT_PARSER.declareLong(constructorArg(), NUM_OUTPUT_DOCUMENTS);
Expand All @@ -57,60 +55,38 @@ public class DataFrameIndexerTransformStats extends IndexerJobStats {
LENIENT_PARSER.declareLong(constructorArg(), SEARCH_FAILURES);
}

private final String transformId;

/**
* Certain situations call for a default transform ID, e.g. when merging many different transforms for statistics gather.
*
* The returned stats object cannot be stored in the index as the transformId does not refer to a real transform configuration
*
* @return new DataFrameIndexerTransformStats with empty stats and a default transform ID
* Create with all stats set to zero
*/
public static DataFrameIndexerTransformStats withDefaultTransformId() {
return new DataFrameIndexerTransformStats(DEFAULT_TRANSFORM_ID);
}

public static DataFrameIndexerTransformStats withDefaultTransformId(long numPages, long numInputDocuments, long numOutputDocuments,
long numInvocations, long indexTime, long searchTime,
long indexTotal, long searchTotal, long indexFailures,
long searchFailures) {
return new DataFrameIndexerTransformStats(DEFAULT_TRANSFORM_ID, numPages, numInputDocuments,
numOutputDocuments, numInvocations, indexTime, searchTime, indexTotal, searchTotal,
indexFailures, searchFailures);
}

public DataFrameIndexerTransformStats(String transformId) {
public DataFrameIndexerTransformStats() {
super();
this.transformId = Objects.requireNonNull(transformId, "parameter transformId must not be null");
}

public DataFrameIndexerTransformStats(String transformId, long numPages, long numInputDocuments, long numOutputDocuments,
public DataFrameIndexerTransformStats(long numPages, long numInputDocuments, long numOutputDocuments,
long numInvocations, long indexTime, long searchTime, long indexTotal, long searchTotal,
long indexFailures, long searchFailures) {
super(numPages, numInputDocuments, numOutputDocuments, numInvocations, indexTime, searchTime, indexTotal, searchTotal,
indexFailures, searchFailures);
this.transformId = Objects.requireNonNull(transformId, "parameter transformId must not be null");
}

public DataFrameIndexerTransformStats(DataFrameIndexerTransformStats other) {
this(other.transformId, other.numPages, other.numInputDocuments, other.numOuputDocuments, other.numInvocations,
this(other.numPages, other.numInputDocuments, other.numOuputDocuments, other.numInvocations,
other.indexTime, other.searchTime, other.indexTotal, other.searchTotal, other.indexFailures, other.searchFailures);
}

public DataFrameIndexerTransformStats(StreamInput in) throws IOException {
super(in);
transformId = in.readString();
if (in.getVersion().before(Version.V_7_4_0)) {
in.readString(); // was transformId
}
}

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeString(transformId);
}

@Nullable
public String getTransformId() {
return transformId;
if (out.getVersion().before(Version.V_7_4_0)) {
out.writeString(DEFAULT_TRANSFORM_ID);
}
}

@Override
Expand All @@ -126,21 +102,11 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
builder.field(SEARCH_TIME_IN_MS.getPreferredName(), searchTime);
builder.field(SEARCH_TOTAL.getPreferredName(), searchTotal);
builder.field(SEARCH_FAILURES.getPreferredName(), searchFailures);
if (params.paramAsBoolean(DataFrameField.FOR_INTERNAL_STORAGE, false)) {
// If we are storing something, it should have a valid transform ID.
if (transformId.equals(DEFAULT_TRANSFORM_ID)) {
throw new IllegalArgumentException("when storing transform statistics, a valid transform id must be provided");
}
builder.field(DataFrameField.ID.getPreferredName(), transformId);
}
builder.endObject();
return builder;
}

public DataFrameIndexerTransformStats merge(DataFrameIndexerTransformStats other) {
// We should probably not merge two sets of stats unless one is an accumulation object (i.e. with the default transform id)
// or the stats are referencing the same transform
assert transformId.equals(DEFAULT_TRANSFORM_ID) || this.transformId.equals(other.transformId);
numPages += other.numPages;
numInputDocuments += other.numInputDocuments;
numOuputDocuments += other.numOuputDocuments;
Expand All @@ -167,8 +133,7 @@ public boolean equals(Object other) {

DataFrameIndexerTransformStats that = (DataFrameIndexerTransformStats) other;

return Objects.equals(this.transformId, that.transformId)
&& Objects.equals(this.numPages, that.numPages)
return Objects.equals(this.numPages, that.numPages)
&& Objects.equals(this.numInputDocuments, that.numInputDocuments)
&& Objects.equals(this.numOuputDocuments, that.numOuputDocuments)
&& Objects.equals(this.numInvocations, that.numInvocations)
Expand All @@ -182,7 +147,7 @@ public boolean equals(Object other) {

@Override
public int hashCode() {
return Objects.hash(transformId, numPages, numInputDocuments, numOuputDocuments, numInvocations,
return Objects.hash(numPages, numInputDocuments, numOuputDocuments, numInvocations,
indexTime, searchTime, indexFailures, searchFailures, indexTotal, searchTotal);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ public static DataFrameTransformStats fromXContent(XContentParser parser) throws
}

public static DataFrameTransformStats initialStats(String id) {
return stoppedStats(id, new DataFrameIndexerTransformStats(id));
return stoppedStats(id, new DataFrameIndexerTransformStats());
}

public static DataFrameTransformStats stoppedStats(String id, DataFrameIndexerTransformStats indexerTransformStats) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,19 +7,13 @@
package org.elasticsearch.xpack.core.dataframe.transforms;

import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.test.AbstractSerializingTestCase;
import org.elasticsearch.xpack.core.dataframe.DataFrameField;

import java.io.IOException;
import java.util.Collections;

public class DataFrameIndexerTransformStatsTests extends AbstractSerializingTestCase<DataFrameIndexerTransformStats> {

protected static ToXContent.Params TO_XCONTENT_PARAMS = new ToXContent.MapParams(
Collections.singletonMap(DataFrameField.FOR_INTERNAL_STORAGE, "true"));

@Override
protected DataFrameIndexerTransformStats createTestInstance() {
return randomStats();
Expand All @@ -36,36 +30,26 @@ protected DataFrameIndexerTransformStats doParseInstance(XContentParser parser)
}

public static DataFrameIndexerTransformStats randomStats() {
return randomStats(randomAlphaOfLength(10));
}

public static DataFrameIndexerTransformStats randomStats(String transformId) {
return new DataFrameIndexerTransformStats(transformId, randomLongBetween(10L, 10000L),
return new DataFrameIndexerTransformStats(randomLongBetween(10L, 10000L),
randomLongBetween(0L, 10000L), randomLongBetween(0L, 10000L), randomLongBetween(0L, 10000L), randomLongBetween(0L, 10000L),
randomLongBetween(0L, 10000L), randomLongBetween(0L, 10000L), randomLongBetween(0L, 10000L), randomLongBetween(0L, 10000L),
randomLongBetween(0L, 10000L));
}

@Override
protected ToXContent.Params getToXContentParams() {
return TO_XCONTENT_PARAMS;
}

public void testMerge() throws IOException {
String transformId = randomAlphaOfLength(10);
DataFrameIndexerTransformStats emptyStats = new DataFrameIndexerTransformStats(transformId);
DataFrameIndexerTransformStats randomStats = randomStats(transformId);
DataFrameIndexerTransformStats emptyStats = new DataFrameIndexerTransformStats();
DataFrameIndexerTransformStats randomStats = randomStats();

assertEquals(randomStats, emptyStats.merge(randomStats));
assertEquals(randomStats, randomStats.merge(emptyStats));

DataFrameIndexerTransformStats randomStatsClone = copyInstance(randomStats);

DataFrameIndexerTransformStats trippleRandomStats = new DataFrameIndexerTransformStats(transformId, 3 * randomStats.getNumPages(),
DataFrameIndexerTransformStats tripleRandomStats = new DataFrameIndexerTransformStats(3 * randomStats.getNumPages(),
3 * randomStats.getNumDocuments(), 3 * randomStats.getOutputDocuments(), 3 * randomStats.getNumInvocations(),
3 * randomStats.getIndexTime(), 3 * randomStats.getSearchTime(), 3 * randomStats.getIndexTotal(),
3 * randomStats.getSearchTotal(), 3 * randomStats.getIndexFailures(), 3 * randomStats.getSearchFailures());

assertEquals(trippleRandomStats, randomStats.merge(randomStatsClone).merge(randomStatsClone));
assertEquals(tripleRandomStats, randomStats.merge(randomStatsClone).merge(randomStatsClone));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ public static DataFrameTransformStats randomDataFrameTransformStats() {
randomFrom(DataFrameTransformTaskState.values()),
randomBoolean() ? null : randomAlphaOfLength(100),
randomBoolean() ? null : NodeAttributeTests.randomNodeAttributes(),
DataFrameIndexerTransformStatsTests.randomStats(DataFrameIndexerTransformStats.DEFAULT_TRANSFORM_ID),
DataFrameIndexerTransformStatsTests.randomStats(),
DataFrameTransformCheckpointingInfoTests.randomDataFrameTransformCheckpointingInfo());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ public class DataFrameTransformStoredDocTests extends AbstractSerializingDataFra
public static DataFrameTransformStoredDoc randomDataFrameTransformStoredDoc(String id) {
return new DataFrameTransformStoredDoc(id,
DataFrameTransformStateTests.randomDataFrameTransformState(),
DataFrameIndexerTransformStatsTests.randomStats(id));
DataFrameIndexerTransformStatsTests.randomStats());
}

public static DataFrameTransformStoredDoc randomDataFrameTransformStoredDoc() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ public void usage(ActionListener<XPackFeatureSet.Usage> listener) {
listener.onResponse(new DataFrameFeatureSetUsage(available(),
enabled(),
Collections.emptyMap(),
DataFrameIndexerTransformStats.withDefaultTransformId()));
new DataFrameIndexerTransformStats()));
return;
}

Expand Down Expand Up @@ -139,7 +139,7 @@ public void usage(ActionListener<XPackFeatureSet.Usage> listener) {
listener.onResponse(new DataFrameFeatureSetUsage(available(),
enabled(),
transformsCountByState,
DataFrameIndexerTransformStats.withDefaultTransformId()));
new DataFrameIndexerTransformStats()));
return;
}
transformsCountByState.merge(DataFrameTransformTaskState.STOPPED.value(), totalTransforms - taskCount, Long::sum);
Expand Down Expand Up @@ -179,7 +179,7 @@ static DataFrameIndexerTransformStats parseSearchAggs(SearchResponse searchRespo
statisticsList.add(0L);
}
}
return DataFrameIndexerTransformStats.withDefaultTransformId(statisticsList.get(0), // numPages
return new DataFrameIndexerTransformStats(statisticsList.get(0), // numPages
statisticsList.get(1), // numInputDocuments
statisticsList.get(2), // numOutputDocuments
statisticsList.get(3), // numInvocations
Expand Down Expand Up @@ -216,7 +216,7 @@ static void getStatisticSummations(Client client, ActionListener<DataFrameIndexe
},
failure -> {
if (failure instanceof ResourceNotFoundException) {
statsListener.onResponse(DataFrameIndexerTransformStats.withDefaultTransformId());
statsListener.onResponse(new DataFrameIndexerTransformStats());
} else {
statsListener.onFailure(failure);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ private void getPreview(Pivot pivot,
r -> {
try {
final CompositeAggregation agg = r.getAggregations().get(COMPOSITE_AGGREGATION_NAME);
DataFrameIndexerTransformStats stats = DataFrameIndexerTransformStats.withDefaultTransformId();
DataFrameIndexerTransformStats stats = new DataFrameIndexerTransformStats();
// remove all internal fields

if (pipeline == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ public DataFrameTransformState getState() {

public DataFrameIndexerTransformStats getStats() {
if (getIndexer() == null) {
return new DataFrameIndexerTransformStats(getTransformId());
return new DataFrameIndexerTransformStats();
} else {
return getIndexer().getStats();
}
Expand Down Expand Up @@ -425,7 +425,7 @@ static class ClientDataFrameIndexerBuilder {

ClientDataFrameIndexerBuilder(String transformId) {
this.transformId = transformId;
this.initialStats = new DataFrameIndexerTransformStats(transformId);
this.initialStats = new DataFrameIndexerTransformStats();
}

ClientDataFrameIndexer build(DataFrameTransformTask parentTask) {
Expand Down Expand Up @@ -551,7 +551,7 @@ static class ClientDataFrameIndexer extends DataFrameIndexer {
fieldMappings,
ExceptionsHelper.requireNonNull(initialState, "initialState"),
initialPosition,
initialStats == null ? new DataFrameIndexerTransformStats(transformId) : initialStats,
initialStats == null ? new DataFrameIndexerTransformStats() : initialStats,
transformProgress,
lastCheckpoint,
nextCheckpoint);
Expand Down
Loading

0 comments on commit b2e969f

Please sign in to comment.