From d5b80bfc8f0d040d3a101385ab38dc62941efbf2 Mon Sep 17 00:00:00 2001
From: Benjamin Trent <ben.w.trent@gmail.com>
Date: Tue, 15 Oct 2019 06:46:58 -0400
Subject: [PATCH] [ML][Transforms] fix bwc serialization with 7.3 (#48021)

---
 .../transform/transforms/TransformStats.java  |  6 ++--
 .../transforms/TransformStatsTests.java       | 30 +++++++++++++++++++
 2 files changed, 33 insertions(+), 3 deletions(-)

diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/transforms/TransformStats.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/transforms/TransformStats.java
index c719cf7723d3f..4253cfdca068b 100644
--- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/transforms/TransformStats.java
+++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/transforms/TransformStats.java
@@ -123,7 +123,7 @@ public TransformStats(StreamInput in) throws IOException {
             TransformState transformState = new TransformState(in);
             this.state = State.fromComponents(transformState.getTaskState(), transformState.getIndexerState());
             this.reason = transformState.getReason();
-            this.node = null;
+            this.node = transformState.getNode();
             this.indexerStats = new TransformIndexerStats(in);
             this.checkpointingInfo = new TransformCheckpointingInfo(in);
         }
@@ -171,8 +171,8 @@ public void writeTo(StreamOutput out) throws IOException {
                 checkpointingInfo.getNext().getPosition(),
                 checkpointingInfo.getLast().getCheckpoint(),
                 reason,
-                checkpointingInfo.getNext().getCheckpointProgress()).writeTo(out);
-            out.writeBoolean(false);
+                checkpointingInfo.getNext().getCheckpointProgress(),
+                node).writeTo(out);
             indexerStats.writeTo(out);
             checkpointingInfo.writeTo(out);
         }
diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/transform/transforms/TransformStatsTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/transform/transforms/TransformStatsTests.java
index 093e05f4f9002..387e9d115c483 100644
--- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/transform/transforms/TransformStatsTests.java
+++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/transform/transforms/TransformStatsTests.java
@@ -6,6 +6,9 @@
 
 package org.elasticsearch.xpack.core.transform.transforms;
 
+import org.elasticsearch.Version;
+import org.elasticsearch.common.io.stream.BytesStreamOutput;
+import org.elasticsearch.common.io.stream.StreamInput;
 import org.elasticsearch.common.io.stream.Writeable.Reader;
 import org.elasticsearch.common.xcontent.XContentParser;
 import org.elasticsearch.test.AbstractSerializingTestCase;
@@ -13,6 +16,9 @@
 import java.io.IOException;
 import java.util.function.Predicate;
 
+import static org.elasticsearch.xpack.core.transform.transforms.TransformStats.State.STARTED;
+import static org.hamcrest.Matchers.equalTo;
+
 public class TransformStatsTests extends AbstractSerializingTestCase<TransformStats> {
 
     public static TransformStats randomDataFrameTransformStats() {
@@ -53,4 +59,28 @@ protected String[] getShuffleFieldsExceptions() {
     protected Predicate<String> getRandomFieldsExcludeFilter() {
         return field -> !field.isEmpty();
     }
+
+    public void testBwcWith73() throws IOException {
+        for(int i = 0; i < NUMBER_OF_TEST_RUNS; i++) {
+            TransformStats stats = new TransformStats("bwc-id",
+                STARTED,
+                randomBoolean() ? null : randomAlphaOfLength(100),
+                randomBoolean() ? null : NodeAttributeTests.randomNodeAttributes(),
+                new TransformIndexerStats(1, 2, 3, 4, 5, 6, 7, 8, 9, 10),
+                new TransformCheckpointingInfo(
+                    new TransformCheckpointStats(0, null, null, 10, 100),
+                    new TransformCheckpointStats(0, null, null, 100, 1000),
+                    // changesLastDetectedAt aren't serialized back
+                    100, null));
+            try (BytesStreamOutput output = new BytesStreamOutput()) {
+                output.setVersion(Version.V_7_3_0);
+                stats.writeTo(output);
+                try (StreamInput in = output.bytes().streamInput()) {
+                    in.setVersion(Version.V_7_3_0);
+                    TransformStats statsFromOld = new TransformStats(in);
+                    assertThat(statsFromOld, equalTo(stats));
+                }
+            }
+        }
+    }
 }