Skip to content

Commit

Permalink
[ML][Transforms] fix bwc serialization with 7.3 (#48021) (#48048)
Browse files Browse the repository at this point in the history
  • Loading branch information
benwtrent authored Oct 15, 2019
1 parent 83321b0 commit 361e7ad
Show file tree
Hide file tree
Showing 2 changed files with 33 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ public TransformStats(StreamInput in) throws IOException {
TransformState transformState = new TransformState(in);
this.state = State.fromComponents(transformState.getTaskState(), transformState.getIndexerState());
this.reason = transformState.getReason();
this.node = null;
this.node = transformState.getNode();
this.indexerStats = new TransformIndexerStats(in);
this.checkpointingInfo = new TransformCheckpointingInfo(in);
}
Expand Down Expand Up @@ -171,8 +171,8 @@ public void writeTo(StreamOutput out) throws IOException {
checkpointingInfo.getNext().getPosition(),
checkpointingInfo.getLast().getCheckpoint(),
reason,
checkpointingInfo.getNext().getCheckpointProgress()).writeTo(out);
out.writeBoolean(false);
checkpointingInfo.getNext().getCheckpointProgress(),
node).writeTo(out);
indexerStats.writeTo(out);
checkpointingInfo.writeTo(out);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,19 @@

package org.elasticsearch.xpack.core.transform.transforms;

import org.elasticsearch.Version;
import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.Writeable.Reader;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.test.AbstractSerializingTestCase;

import java.io.IOException;
import java.util.function.Predicate;

import static org.elasticsearch.xpack.core.transform.transforms.TransformStats.State.STARTED;
import static org.hamcrest.Matchers.equalTo;

public class TransformStatsTests extends AbstractSerializingTestCase<TransformStats> {

public static TransformStats randomDataFrameTransformStats() {
Expand Down Expand Up @@ -53,4 +59,28 @@ protected String[] getShuffleFieldsExceptions() {
protected Predicate<String> getRandomFieldsExcludeFilter() {
return field -> !field.isEmpty();
}

public void testBwcWith73() throws IOException {
for(int i = 0; i < NUMBER_OF_TEST_RUNS; i++) {
TransformStats stats = new TransformStats("bwc-id",
STARTED,
randomBoolean() ? null : randomAlphaOfLength(100),
randomBoolean() ? null : NodeAttributeTests.randomNodeAttributes(),
new TransformIndexerStats(1, 2, 3, 4, 5, 6, 7, 8, 9, 10),
new TransformCheckpointingInfo(
new TransformCheckpointStats(0, null, null, 10, 100),
new TransformCheckpointStats(0, null, null, 100, 1000),
// changesLastDetectedAt aren't serialized back
100, null));
try (BytesStreamOutput output = new BytesStreamOutput()) {
output.setVersion(Version.V_7_3_0);
stats.writeTo(output);
try (StreamInput in = output.bytes().streamInput()) {
in.setVersion(Version.V_7_3_0);
TransformStats statsFromOld = new TransformStats(in);
assertThat(statsFromOld, equalTo(stats));
}
}
}
}
}

0 comments on commit 361e7ad

Please sign in to comment.