From d0e81e148533e40a57da5c8a6320cbe25ab5268d Mon Sep 17 00:00:00 2001 From: David Turner Date: Tue, 14 May 2024 14:57:43 +0100 Subject: [PATCH] Fix timeouts in Datastream Lifecycle module Relates #107984 --- .../lifecycle/CrudDataStreamLifecycleIT.java | 5 +++ .../DataStreamLifecycleServiceIT.java | 6 +++- .../ExplainDataStreamLifecycleIT.java | 6 +++- ...DeleteDataStreamGlobalRetentionAction.java | 5 +-- .../DeleteDataStreamLifecycleAction.java | 5 +-- .../GetDataStreamGlobalRetentionAction.java | 4 --- .../GetDataStreamLifecycleStatsAction.java | 4 +-- .../PutDataStreamGlobalRetentionAction.java | 32 ++----------------- .../RestDataStreamLifecycleStatsAction.java | 3 +- .../RestDeleteDataStreamLifecycleAction.java | 6 +++- 10 files changed, 31 insertions(+), 45 deletions(-) diff --git a/modules/data-streams/src/internalClusterTest/java/org/elasticsearch/datastreams/lifecycle/CrudDataStreamLifecycleIT.java b/modules/data-streams/src/internalClusterTest/java/org/elasticsearch/datastreams/lifecycle/CrudDataStreamLifecycleIT.java index d43dad87a6067..7712be94b4326 100644 --- a/modules/data-streams/src/internalClusterTest/java/org/elasticsearch/datastreams/lifecycle/CrudDataStreamLifecycleIT.java +++ b/modules/data-streams/src/internalClusterTest/java/org/elasticsearch/datastreams/lifecycle/CrudDataStreamLifecycleIT.java @@ -11,6 +11,7 @@ import org.elasticsearch.action.datastreams.CreateDataStreamAction; import org.elasticsearch.action.datastreams.lifecycle.GetDataStreamLifecycleAction; import org.elasticsearch.action.datastreams.lifecycle.PutDataStreamLifecycleAction; +import org.elasticsearch.action.support.master.AcknowledgedRequest; import org.elasticsearch.cluster.metadata.DataStreamLifecycle; import org.elasticsearch.core.TimeValue; import org.elasticsearch.datastreams.DataStreamsPlugin; @@ -229,6 +230,8 @@ public void testDeleteLifecycle() throws Exception { // Remove lifecycle from concrete data stream { DeleteDataStreamLifecycleAction.Request deleteDataLifecycleRequest = new DeleteDataStreamLifecycleAction.Request( + TimeValue.THIRTY_SECONDS, + AcknowledgedRequest.DEFAULT_ACK_TIMEOUT, new String[] { "with-lifecycle-1" } ); assertThat( @@ -254,6 +257,8 @@ public void testDeleteLifecycle() throws Exception { // Remove lifecycle from all data streams { DeleteDataStreamLifecycleAction.Request deleteDataLifecycleRequest = new DeleteDataStreamLifecycleAction.Request( + TimeValue.THIRTY_SECONDS, + AcknowledgedRequest.DEFAULT_ACK_TIMEOUT, new String[] { "*" } ); assertThat( diff --git a/modules/data-streams/src/internalClusterTest/java/org/elasticsearch/datastreams/lifecycle/DataStreamLifecycleServiceIT.java b/modules/data-streams/src/internalClusterTest/java/org/elasticsearch/datastreams/lifecycle/DataStreamLifecycleServiceIT.java index 7252d31d838c5..97c6c1ddff977 100644 --- a/modules/data-streams/src/internalClusterTest/java/org/elasticsearch/datastreams/lifecycle/DataStreamLifecycleServiceIT.java +++ b/modules/data-streams/src/internalClusterTest/java/org/elasticsearch/datastreams/lifecycle/DataStreamLifecycleServiceIT.java @@ -203,6 +203,7 @@ public void testSystemDataStreamRetention() throws Exception { client().execute( PutDataStreamGlobalRetentionAction.INSTANCE, new PutDataStreamGlobalRetentionAction.Request( + TimeValue.THIRTY_SECONDS, TimeValue.timeValueSeconds(globalRetentionSeconds), TimeValue.timeValueSeconds(globalRetentionSeconds) ) @@ -290,7 +291,10 @@ public void testSystemDataStreamRetention() throws Exception { client().execute(DeleteDataStreamAction.INSTANCE, new DeleteDataStreamAction.Request(SYSTEM_DATA_STREAM_NAME)).actionGet(); } finally { - client().execute(DeleteDataStreamGlobalRetentionAction.INSTANCE, new DeleteDataStreamGlobalRetentionAction.Request()); + client().execute( + DeleteDataStreamGlobalRetentionAction.INSTANCE, + new DeleteDataStreamGlobalRetentionAction.Request(TimeValue.THIRTY_SECONDS) + ); } } finally { dataStreamLifecycleServices.forEach(dataStreamLifecycleService -> dataStreamLifecycleService.setNowSupplier(clock::millis)); diff --git a/modules/data-streams/src/internalClusterTest/java/org/elasticsearch/datastreams/lifecycle/ExplainDataStreamLifecycleIT.java b/modules/data-streams/src/internalClusterTest/java/org/elasticsearch/datastreams/lifecycle/ExplainDataStreamLifecycleIT.java index 2723637b2959b..35ee41fca18e8 100644 --- a/modules/data-streams/src/internalClusterTest/java/org/elasticsearch/datastreams/lifecycle/ExplainDataStreamLifecycleIT.java +++ b/modules/data-streams/src/internalClusterTest/java/org/elasticsearch/datastreams/lifecycle/ExplainDataStreamLifecycleIT.java @@ -213,6 +213,7 @@ public void testSystemExplainLifecycle() throws Exception { client().execute( PutDataStreamGlobalRetentionAction.INSTANCE, new PutDataStreamGlobalRetentionAction.Request( + TimeValue.THIRTY_SECONDS, TimeValue.timeValueSeconds(globalRetentionSeconds), TimeValue.timeValueSeconds(globalRetentionSeconds) ) @@ -260,7 +261,10 @@ public void testSystemExplainLifecycle() throws Exception { ); } } finally { - client().execute(DeleteDataStreamGlobalRetentionAction.INSTANCE, new DeleteDataStreamGlobalRetentionAction.Request()); + client().execute( + DeleteDataStreamGlobalRetentionAction.INSTANCE, + new DeleteDataStreamGlobalRetentionAction.Request(TimeValue.THIRTY_SECONDS) + ); } } diff --git a/modules/data-streams/src/main/java/org/elasticsearch/datastreams/lifecycle/action/DeleteDataStreamGlobalRetentionAction.java b/modules/data-streams/src/main/java/org/elasticsearch/datastreams/lifecycle/action/DeleteDataStreamGlobalRetentionAction.java index e3cdd6a8c14d9..92cb855b7cb4e 100644 --- a/modules/data-streams/src/main/java/org/elasticsearch/datastreams/lifecycle/action/DeleteDataStreamGlobalRetentionAction.java +++ b/modules/data-streams/src/main/java/org/elasticsearch/datastreams/lifecycle/action/DeleteDataStreamGlobalRetentionAction.java @@ -24,6 +24,7 @@ import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.core.TimeValue; import org.elasticsearch.datastreams.lifecycle.UpdateDataStreamGlobalRetentionService; import org.elasticsearch.features.FeatureService; import org.elasticsearch.tasks.Task; @@ -64,8 +65,8 @@ public void writeTo(StreamOutput out) throws IOException { out.writeBoolean(dryRun); } - public Request() { - super(TRAPPY_IMPLICIT_DEFAULT_MASTER_NODE_TIMEOUT); + public Request(TimeValue masterNodeTimeout) { + super(masterNodeTimeout); } public boolean dryRun() { diff --git a/modules/data-streams/src/main/java/org/elasticsearch/datastreams/lifecycle/action/DeleteDataStreamLifecycleAction.java b/modules/data-streams/src/main/java/org/elasticsearch/datastreams/lifecycle/action/DeleteDataStreamLifecycleAction.java index 3bd100a106dd6..70f822ddee72a 100644 --- a/modules/data-streams/src/main/java/org/elasticsearch/datastreams/lifecycle/action/DeleteDataStreamLifecycleAction.java +++ b/modules/data-streams/src/main/java/org/elasticsearch/datastreams/lifecycle/action/DeleteDataStreamLifecycleAction.java @@ -15,6 +15,7 @@ import org.elasticsearch.action.support.master.AcknowledgedResponse; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.core.TimeValue; import java.io.IOException; import java.util.Arrays; @@ -47,8 +48,8 @@ public void writeTo(StreamOutput out) throws IOException { indicesOptions.writeIndicesOptions(out); } - public Request(String[] names) { - super(TRAPPY_IMPLICIT_DEFAULT_MASTER_NODE_TIMEOUT, DEFAULT_ACK_TIMEOUT); + public Request(TimeValue masterNodeTimeout, TimeValue ackTimeout, String[] names) { + super(masterNodeTimeout, ackTimeout); this.names = names; } diff --git a/modules/data-streams/src/main/java/org/elasticsearch/datastreams/lifecycle/action/GetDataStreamGlobalRetentionAction.java b/modules/data-streams/src/main/java/org/elasticsearch/datastreams/lifecycle/action/GetDataStreamGlobalRetentionAction.java index 5816823ed710a..1d1064dd42b1a 100644 --- a/modules/data-streams/src/main/java/org/elasticsearch/datastreams/lifecycle/action/GetDataStreamGlobalRetentionAction.java +++ b/modules/data-streams/src/main/java/org/elasticsearch/datastreams/lifecycle/action/GetDataStreamGlobalRetentionAction.java @@ -47,10 +47,6 @@ private GetDataStreamGlobalRetentionAction() {/* no instances */} public static final class Request extends MasterNodeReadRequest { - public Request() { - super(TRAPPY_IMPLICIT_DEFAULT_MASTER_NODE_TIMEOUT); - } - public Request(StreamInput in) throws IOException { super(in); } diff --git a/modules/data-streams/src/main/java/org/elasticsearch/datastreams/lifecycle/action/GetDataStreamLifecycleStatsAction.java b/modules/data-streams/src/main/java/org/elasticsearch/datastreams/lifecycle/action/GetDataStreamLifecycleStatsAction.java index cc61c7fe664be..6e930defd4e0b 100644 --- a/modules/data-streams/src/main/java/org/elasticsearch/datastreams/lifecycle/action/GetDataStreamLifecycleStatsAction.java +++ b/modules/data-streams/src/main/java/org/elasticsearch/datastreams/lifecycle/action/GetDataStreamLifecycleStatsAction.java @@ -43,8 +43,8 @@ public Request(StreamInput in) throws IOException { super(in); } - public Request() { - super(TRAPPY_IMPLICIT_DEFAULT_MASTER_NODE_TIMEOUT); + public Request(TimeValue masterNodeTimeout) { + super(masterNodeTimeout); } @Override diff --git a/modules/data-streams/src/main/java/org/elasticsearch/datastreams/lifecycle/action/PutDataStreamGlobalRetentionAction.java b/modules/data-streams/src/main/java/org/elasticsearch/datastreams/lifecycle/action/PutDataStreamGlobalRetentionAction.java index 65ca34a99da23..cd9156ad8b2c8 100644 --- a/modules/data-streams/src/main/java/org/elasticsearch/datastreams/lifecycle/action/PutDataStreamGlobalRetentionAction.java +++ b/modules/data-streams/src/main/java/org/elasticsearch/datastreams/lifecycle/action/PutDataStreamGlobalRetentionAction.java @@ -32,9 +32,6 @@ import org.elasticsearch.tasks.Task; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; -import org.elasticsearch.xcontent.ConstructingObjectParser; -import org.elasticsearch.xcontent.ObjectParser; -import org.elasticsearch.xcontent.XContentParser; import java.io.IOException; import java.util.List; @@ -53,34 +50,9 @@ private PutDataStreamGlobalRetentionAction() {/* no instances */} public static final class Request extends MasterNodeRequest { - public static final ConstructingObjectParser PARSER = - new ConstructingObjectParser<>( - "put_data_stream_global_retention_request", - args -> new PutDataStreamGlobalRetentionAction.Request((TimeValue) args[0], (TimeValue) args[1]) - ); - - static { - PARSER.declareField( - ConstructingObjectParser.optionalConstructorArg(), - (p, c) -> TimeValue.parseTimeValue(p.textOrNull(), DataStreamGlobalRetention.DEFAULT_RETENTION_FIELD.getPreferredName()), - DataStreamGlobalRetention.DEFAULT_RETENTION_FIELD, - ObjectParser.ValueType.STRING_OR_NULL - ); - PARSER.declareField( - ConstructingObjectParser.optionalConstructorArg(), - (p, c) -> TimeValue.parseTimeValue(p.textOrNull(), DataStreamGlobalRetention.MAX_RETENTION_FIELD.getPreferredName()), - DataStreamGlobalRetention.MAX_RETENTION_FIELD, - ObjectParser.ValueType.STRING_OR_NULL - ); - } - private final DataStreamGlobalRetention globalRetention; private boolean dryRun = false; - public static PutDataStreamGlobalRetentionAction.Request parseRequest(XContentParser parser) { - return PARSER.apply(parser, null); - } - public Request(StreamInput in) throws IOException { super(in); globalRetention = DataStreamGlobalRetention.read(in); @@ -107,8 +79,8 @@ public void writeTo(StreamOutput out) throws IOException { out.writeBoolean(dryRun); } - public Request(@Nullable TimeValue defaultRetention, @Nullable TimeValue maxRetention) { - super(TRAPPY_IMPLICIT_DEFAULT_MASTER_NODE_TIMEOUT); + public Request(TimeValue masterNodeTimeout, @Nullable TimeValue defaultRetention, @Nullable TimeValue maxRetention) { + super(masterNodeTimeout); this.globalRetention = new DataStreamGlobalRetention(defaultRetention, maxRetention); } diff --git a/modules/data-streams/src/main/java/org/elasticsearch/datastreams/lifecycle/rest/RestDataStreamLifecycleStatsAction.java b/modules/data-streams/src/main/java/org/elasticsearch/datastreams/lifecycle/rest/RestDataStreamLifecycleStatsAction.java index a10a955b33975..a3959ae818218 100644 --- a/modules/data-streams/src/main/java/org/elasticsearch/datastreams/lifecycle/rest/RestDataStreamLifecycleStatsAction.java +++ b/modules/data-streams/src/main/java/org/elasticsearch/datastreams/lifecycle/rest/RestDataStreamLifecycleStatsAction.java @@ -36,8 +36,7 @@ public List routes() { @Override protected RestChannelConsumer prepareRequest(RestRequest restRequest, NodeClient client) { - GetDataStreamLifecycleStatsAction.Request request = new GetDataStreamLifecycleStatsAction.Request(); - request.masterNodeTimeout(getMasterNodeTimeout(restRequest)); + final var request = new GetDataStreamLifecycleStatsAction.Request(getMasterNodeTimeout(restRequest)); return channel -> client.execute( GetDataStreamLifecycleStatsAction.INSTANCE, request, diff --git a/modules/data-streams/src/main/java/org/elasticsearch/datastreams/lifecycle/rest/RestDeleteDataStreamLifecycleAction.java b/modules/data-streams/src/main/java/org/elasticsearch/datastreams/lifecycle/rest/RestDeleteDataStreamLifecycleAction.java index b624892ac6bba..a8a64eaf5cfa3 100644 --- a/modules/data-streams/src/main/java/org/elasticsearch/datastreams/lifecycle/rest/RestDeleteDataStreamLifecycleAction.java +++ b/modules/data-streams/src/main/java/org/elasticsearch/datastreams/lifecycle/rest/RestDeleteDataStreamLifecycleAction.java @@ -8,6 +8,7 @@ package org.elasticsearch.datastreams.lifecycle.rest; import org.elasticsearch.action.support.IndicesOptions; +import org.elasticsearch.action.support.master.AcknowledgedRequest; import org.elasticsearch.client.internal.node.NodeClient; import org.elasticsearch.common.Strings; import org.elasticsearch.datastreams.lifecycle.action.DeleteDataStreamLifecycleAction; @@ -20,6 +21,7 @@ import java.util.List; import static org.elasticsearch.rest.RestRequest.Method.DELETE; +import static org.elasticsearch.rest.RestUtils.getMasterNodeTimeout; @ServerlessScope(Scope.INTERNAL) public class RestDeleteDataStreamLifecycleAction extends BaseRestHandler { @@ -36,7 +38,9 @@ public List routes() { @Override protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient client) { - DeleteDataStreamLifecycleAction.Request deleteDataLifecycleRequest = new DeleteDataStreamLifecycleAction.Request( + final var deleteDataLifecycleRequest = new DeleteDataStreamLifecycleAction.Request( + getMasterNodeTimeout(request), + request.paramAsTime("timeout", AcknowledgedRequest.DEFAULT_ACK_TIMEOUT), Strings.splitStringByCommaToArray(request.param("name")) ); deleteDataLifecycleRequest.indicesOptions(IndicesOptions.fromRequest(request, deleteDataLifecycleRequest.indicesOptions()));