diff --git a/server/src/main/java/org/elasticsearch/cluster/DiffableUtils.java b/server/src/main/java/org/elasticsearch/cluster/DiffableUtils.java index 78eceeb12bcca..725da675952f2 100644 --- a/server/src/main/java/org/elasticsearch/cluster/DiffableUtils.java +++ b/server/src/main/java/org/elasticsearch/cluster/DiffableUtils.java @@ -474,8 +474,10 @@ public void writeTo(StreamOutput out) throws IOException { } out.writeVInt(upsertsCount); for (Map.Entry entry : upserts.entrySet()) { - keySerializer.writeKey(entry.getKey(), out); - valueSerializer.write(entry.getValue(), out); + if(valueSerializer.supportsVersion(entry.getValue(), version)) { + keySerializer.writeKey(entry.getKey(), out); + valueSerializer.write(entry.getValue(), out); + } } } } diff --git a/server/src/test/java/org/elasticsearch/cluster/serialization/ClusterSerializationTests.java b/server/src/test/java/org/elasticsearch/cluster/serialization/ClusterSerializationTests.java index 061defa600219..6e39eea6e51b3 100644 --- a/server/src/test/java/org/elasticsearch/cluster/serialization/ClusterSerializationTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/serialization/ClusterSerializationTests.java @@ -20,11 +20,14 @@ package org.elasticsearch.cluster.serialization; import org.elasticsearch.Version; +import org.elasticsearch.cluster.AbstractNamedDiffable; import org.elasticsearch.cluster.ClusterModule; import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.ClusterState.Custom; import org.elasticsearch.cluster.Diff; import org.elasticsearch.cluster.ESAllocationTestCase; +import org.elasticsearch.cluster.NamedDiff; import org.elasticsearch.cluster.RestoreInProgress; import org.elasticsearch.cluster.SnapshotDeletionsInProgress; import org.elasticsearch.cluster.metadata.IndexMetaData; @@ -39,7 +42,9 @@ import org.elasticsearch.common.io.stream.NamedWriteableAwareStreamInput; import org.elasticsearch.common.io.stream.NamedWriteableRegistry; import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.snapshots.Snapshot; import org.elasticsearch.snapshots.SnapshotId; import org.elasticsearch.test.VersionUtils; @@ -47,6 +52,7 @@ import java.io.IOException; import java.util.Arrays; import java.util.Collections; +import java.util.List; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.notNullValue; @@ -214,4 +220,142 @@ public void testObjectReuseWhenApplyingClusterStateDiff() throws Exception { assertSame("template", serializedClusterState2.metaData().templates().get("test-template"), serializedClusterState3.metaData().templates().get("test-template")); } + + public static class TestCustomOne extends AbstractNamedDiffable implements Custom { + + public static final String TYPE = "test_custom_one"; + private final String strObject; + + public TestCustomOne(String strObject) { + this.strObject = strObject; + } + + public TestCustomOne(StreamInput in) throws IOException { + this.strObject = in.readString(); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeString(strObject); + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.startObject(); + { + builder.field("custom_string_object", strObject); + } + builder.endObject(); + return builder; + } + + @Override + public String getWriteableName() { + return TYPE; + } + + public static NamedDiff readDiffFrom(StreamInput in) throws IOException { + return readDiffFrom(Custom.class, TYPE, in); + } + + @Override + public Version getMinimalSupportedVersion() { + return Version.CURRENT; + } + + } + + public static class TestCustomTwo extends AbstractNamedDiffable implements Custom { + + public static final String TYPE = "test_custom_two"; + private final Integer intObject; + + public TestCustomTwo(Integer intObject) { + this.intObject = intObject; + } + + public TestCustomTwo(StreamInput in) throws IOException { + this.intObject = in.readInt(); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeInt(intObject); + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.startObject(); + { + builder.field("custom_integer_object", intObject); + } + builder.endObject(); + return builder; + } + + @Override + public String getWriteableName() { + return TYPE; + } + + public static NamedDiff readDiffFrom(StreamInput in) throws IOException { + return readDiffFrom(Custom.class, TYPE, in); + } + + @Override + public Version getMinimalSupportedVersion() { + return Version.CURRENT.minimumCompatibilityVersion(); + } + + } + + public void testCustomSerialization() throws Exception { + ClusterState.Builder builder = ClusterState.builder(ClusterState.EMPTY_STATE) + .putCustom(TestCustomOne.TYPE, new TestCustomOne("test_custom_one")) + .putCustom(TestCustomTwo.TYPE, new TestCustomTwo(10)); + + ClusterState clusterState = builder.incrementVersion().build(); + + Diff diffs = clusterState.diff(ClusterState.EMPTY_STATE); + + // Add the new customs to named writeables + final List entries = ClusterModule.getNamedWriteables(); + entries.add(new NamedWriteableRegistry.Entry(ClusterState.Custom.class, TestCustomOne.TYPE, TestCustomOne::new)); + entries.add(new NamedWriteableRegistry.Entry(NamedDiff.class, TestCustomOne.TYPE, TestCustomOne::readDiffFrom)); + entries.add(new NamedWriteableRegistry.Entry(ClusterState.Custom.class, TestCustomTwo.TYPE, TestCustomTwo::new)); + entries.add(new NamedWriteableRegistry.Entry(NamedDiff.class, TestCustomTwo.TYPE, TestCustomTwo::readDiffFrom)); + + // serialize with current version + BytesStreamOutput outStream = new BytesStreamOutput(); + Version version = Version.CURRENT; + outStream.setVersion(version); + diffs.writeTo(outStream); + StreamInput inStream = outStream.bytes().streamInput(); + + inStream = new NamedWriteableAwareStreamInput(inStream, new NamedWriteableRegistry(entries)); + inStream.setVersion(version); + Diff serializedDiffs = ClusterState.readDiffFrom(inStream, clusterState.nodes().getLocalNode()); + ClusterState stateAfterDiffs = serializedDiffs.apply(ClusterState.EMPTY_STATE); + + // Current version - Both the customs are non null + assertThat(stateAfterDiffs.custom(TestCustomOne.TYPE), notNullValue()); + assertThat(stateAfterDiffs.custom(TestCustomTwo.TYPE), notNullValue()); + + // serialize with minimum compatibile version + outStream = new BytesStreamOutput(); + version = Version.CURRENT.minimumCompatibilityVersion(); + outStream.setVersion(version); + diffs.writeTo(outStream); + inStream = outStream.bytes().streamInput(); + + inStream = new NamedWriteableAwareStreamInput(inStream, new NamedWriteableRegistry(entries)); + inStream.setVersion(version); + serializedDiffs = ClusterState.readDiffFrom(inStream, clusterState.nodes().getLocalNode()); + stateAfterDiffs = serializedDiffs.apply(ClusterState.EMPTY_STATE); + + // Old version - TestCustomOne is null and TestCustomTwo is not null + assertThat(stateAfterDiffs.custom(TestCustomOne.TYPE), nullValue()); + assertThat(stateAfterDiffs.custom(TestCustomTwo.TYPE), notNullValue()); + } + }