From 5270a2b29533702671f0d0f1995223c1c684a653 Mon Sep 17 00:00:00 2001 From: David Turner Date: Wed, 24 Apr 2024 18:35:02 +0100 Subject: [PATCH] Fix serialization of put-shutdown request Closes #107857 --- .../org/elasticsearch/TransportVersions.java | 1 + .../xpack/shutdown/PutShutdownNodeAction.java | 15 ++ .../shutdown/PutShutdownRequestTests.java | 196 ++++++++++++++++++ 3 files changed, 212 insertions(+) create mode 100644 x-pack/plugin/shutdown/src/test/java/org/elasticsearch/xpack/shutdown/PutShutdownRequestTests.java diff --git a/server/src/main/java/org/elasticsearch/TransportVersions.java b/server/src/main/java/org/elasticsearch/TransportVersions.java index 2bdb3368e1b5c..3a545ada442fe 100644 --- a/server/src/main/java/org/elasticsearch/TransportVersions.java +++ b/server/src/main/java/org/elasticsearch/TransportVersions.java @@ -183,6 +183,7 @@ static TransportVersion def(int id) { public static final TransportVersion INDEX_SEGMENTS_VECTOR_FORMATS = def(8_642_00_0); public static final TransportVersion ADD_RESOURCE_ALREADY_UPLOADED_EXCEPTION = def(8_643_00_0); public static final TransportVersion ESQL_MV_ORDERING_SORTED_ASCENDING = def(8_644_00_0); + public static final TransportVersion PUT_SHUTDOWN_REQUEST_TIMEOUTS_FIX = def(8_645_00_0); /* * STOP! READ THIS FIRST! No, really, diff --git a/x-pack/plugin/shutdown/src/main/java/org/elasticsearch/xpack/shutdown/PutShutdownNodeAction.java b/x-pack/plugin/shutdown/src/main/java/org/elasticsearch/xpack/shutdown/PutShutdownNodeAction.java index d05b60cd947f5..485e02e7dfa3c 100644 --- a/x-pack/plugin/shutdown/src/main/java/org/elasticsearch/xpack/shutdown/PutShutdownNodeAction.java +++ b/x-pack/plugin/shutdown/src/main/java/org/elasticsearch/xpack/shutdown/PutShutdownNodeAction.java @@ -7,6 +7,7 @@ package org.elasticsearch.xpack.shutdown; +import org.elasticsearch.TransportVersions; import org.elasticsearch.action.ActionRequestValidationException; import org.elasticsearch.action.ActionType; import org.elasticsearch.action.support.master.AcknowledgedRequest; @@ -17,11 +18,14 @@ import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.core.Nullable; import org.elasticsearch.core.TimeValue; +import org.elasticsearch.core.UpdateForV9; +import org.elasticsearch.tasks.TaskId; import org.elasticsearch.xcontent.ConstructingObjectParser; import org.elasticsearch.xcontent.ParseField; import org.elasticsearch.xcontent.XContentParser; import java.io.IOException; +import java.util.Objects; import static org.elasticsearch.cluster.metadata.SingleNodeShutdownMetadata.GRACE_PERIOD_ADDED_VERSION; import static org.elasticsearch.cluster.metadata.SingleNodeShutdownMetadata.REPLACE_SHUTDOWN_TYPE_ADDED_VERSION; @@ -95,7 +99,14 @@ public Request( this.gracePeriod = gracePeriod; } + @UpdateForV9 // TODO call super(in) instead of explicitly reading superclass contents once bwc no longer needed public Request(StreamInput in) throws IOException { + if (in.getTransportVersion().onOrAfter(TransportVersions.PUT_SHUTDOWN_REQUEST_TIMEOUTS_FIX)) { + // effectively super(in): + setParentTask(TaskId.readFromStream(in)); + masterNodeTimeout(in.readTimeValue()); + ackTimeout(in.readTimeValue()); + } this.nodeId = in.readString(); this.type = in.readEnum(SingleNodeShutdownMetadata.Type.class); this.reason = in.readString(); @@ -114,6 +125,9 @@ public Request(StreamInput in) throws IOException { @Override public void writeTo(StreamOutput out) throws IOException { + if (out.getTransportVersion().onOrAfter(TransportVersions.PUT_SHUTDOWN_REQUEST_TIMEOUTS_FIX)) { + super.writeTo(out); + } out.writeString(nodeId); if (out.getTransportVersion().before(REPLACE_SHUTDOWN_TYPE_ADDED_VERSION) && this.type == SingleNodeShutdownMetadata.Type.REPLACE) { @@ -207,5 +221,6 @@ public ActionRequestValidationException validate() { return null; } } + } } diff --git a/x-pack/plugin/shutdown/src/test/java/org/elasticsearch/xpack/shutdown/PutShutdownRequestTests.java b/x-pack/plugin/shutdown/src/test/java/org/elasticsearch/xpack/shutdown/PutShutdownRequestTests.java new file mode 100644 index 0000000000000..516431f40c170 --- /dev/null +++ b/x-pack/plugin/shutdown/src/test/java/org/elasticsearch/xpack/shutdown/PutShutdownRequestTests.java @@ -0,0 +1,196 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.shutdown; + +import org.elasticsearch.cluster.metadata.SingleNodeShutdownMetadata; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.io.stream.Writeable; +import org.elasticsearch.core.TimeValue; +import org.elasticsearch.tasks.TaskId; +import org.elasticsearch.test.AbstractWireSerializingTestCase; +import org.elasticsearch.test.ESTestCase; + +import java.io.IOException; + +public class PutShutdownRequestTests extends AbstractWireSerializingTestCase { + + /** + * Wraps a {@link org.elasticsearch.xpack.shutdown.PutShutdownNodeAction.Request} to add proper equality checks + */ + record RequestWrapper( + String nodeId, + SingleNodeShutdownMetadata.Type type, + String reason, + TimeValue allocationDelay, + String targetNodeName, + TimeValue gracePeriod, + TaskId parentTask, + TimeValue masterNodeTimeout, + TimeValue ackTimeout + ) implements Writeable { + @Override + public void writeTo(StreamOutput out) throws IOException { + final var request = new PutShutdownNodeAction.Request(nodeId, type, reason, allocationDelay, targetNodeName, gracePeriod); + request.setParentTask(parentTask); + request.ackTimeout(ackTimeout); + request.masterNodeTimeout(masterNodeTimeout); + request.writeTo(out); + } + } + + @Override + protected Writeable.Reader instanceReader() { + return in -> { + final var request = new PutShutdownNodeAction.Request(in); + return new RequestWrapper( + request.getNodeId(), + request.getType(), + request.getReason(), + request.getAllocationDelay(), + request.getTargetNodeName(), + request.getGracePeriod(), + request.getParentTask(), + request.masterNodeTimeout(), + request.ackTimeout() + ); + }; + } + + @Override + protected RequestWrapper createTestInstance() { + return new RequestWrapper( + randomIdentifier(), + randomFrom(SingleNodeShutdownMetadata.Type.values()), + randomIdentifier(), + randomOptionalTimeValue(), + randomOptionalIdentifier(), + randomOptionalTimeValue(), + randomTaskId(), + randomTimeValue(), + randomTimeValue() + ); + } + + private static String randomOptionalIdentifier() { + return randomBoolean() ? null : randomIdentifier(); + } + + private static TimeValue randomOptionalTimeValue() { + return randomBoolean() ? null : randomTimeValue(); + } + + private static TaskId randomTaskId() { + return randomBoolean() ? TaskId.EMPTY_TASK_ID : new TaskId(randomIdentifier(), randomNonNegativeLong()); + } + + @Override + protected RequestWrapper mutateInstance(RequestWrapper instance) { + return switch (between(1, 9)) { + case 1 -> new RequestWrapper( + randomValueOtherThan(instance.nodeId, ESTestCase::randomIdentifier), + instance.type, + instance.reason, + instance.allocationDelay, + instance.targetNodeName, + instance.gracePeriod, + instance.parentTask, + instance.ackTimeout, + instance.masterNodeTimeout + ); + case 2 -> new RequestWrapper( + instance.nodeId, + randomValueOtherThan(instance.type, () -> randomFrom(SingleNodeShutdownMetadata.Type.values())), + instance.reason, + instance.allocationDelay, + instance.targetNodeName, + instance.gracePeriod, + instance.parentTask, + instance.ackTimeout, + instance.masterNodeTimeout + ); + case 3 -> new RequestWrapper( + instance.nodeId, + instance.type, + randomValueOtherThan(instance.reason, ESTestCase::randomIdentifier), + instance.allocationDelay, + instance.targetNodeName, + instance.gracePeriod, + instance.parentTask, + instance.ackTimeout, + instance.masterNodeTimeout + ); + case 4 -> new RequestWrapper( + instance.nodeId, + instance.type, + instance.reason, + randomValueOtherThan(instance.allocationDelay, PutShutdownRequestTests::randomOptionalTimeValue), + instance.targetNodeName, + instance.gracePeriod, + instance.parentTask, + instance.ackTimeout, + instance.masterNodeTimeout + ); + case 5 -> new RequestWrapper( + instance.nodeId, + instance.type, + instance.reason, + instance.allocationDelay, + randomValueOtherThan(instance.targetNodeName, PutShutdownRequestTests::randomOptionalIdentifier), + instance.gracePeriod, + instance.parentTask, + instance.ackTimeout, + instance.masterNodeTimeout + ); + case 6 -> new RequestWrapper( + instance.nodeId, + instance.type, + instance.reason, + instance.allocationDelay, + instance.targetNodeName, + randomValueOtherThan(instance.gracePeriod, PutShutdownRequestTests::randomOptionalTimeValue), + instance.parentTask, + instance.ackTimeout, + instance.masterNodeTimeout + ); + case 7 -> new RequestWrapper( + instance.nodeId, + instance.type, + instance.reason, + instance.allocationDelay, + instance.targetNodeName, + instance.gracePeriod, + randomValueOtherThan(instance.parentTask, PutShutdownRequestTests::randomTaskId), + instance.ackTimeout, + instance.masterNodeTimeout + ); + case 8 -> new RequestWrapper( + instance.nodeId, + instance.type, + instance.reason, + instance.allocationDelay, + instance.targetNodeName, + instance.gracePeriod, + instance.parentTask, + randomValueOtherThan(instance.ackTimeout, ESTestCase::randomTimeValue), + instance.masterNodeTimeout + ); + case 9 -> new RequestWrapper( + instance.nodeId, + instance.type, + instance.reason, + instance.allocationDelay, + instance.targetNodeName, + instance.gracePeriod, + instance.parentTask, + instance.ackTimeout, + randomValueOtherThan(instance.masterNodeTimeout, ESTestCase::randomTimeValue) + ); + default -> throw new AssertionError("impossible"); + }; + } +}