Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix timeouts in Datastream Lifecycle module #108621

Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import org.elasticsearch.action.datastreams.CreateDataStreamAction;
import org.elasticsearch.action.datastreams.lifecycle.GetDataStreamLifecycleAction;
import org.elasticsearch.action.datastreams.lifecycle.PutDataStreamLifecycleAction;
import org.elasticsearch.action.support.master.AcknowledgedRequest;
import org.elasticsearch.cluster.metadata.DataStreamLifecycle;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.datastreams.DataStreamsPlugin;
Expand Down Expand Up @@ -229,6 +230,8 @@ public void testDeleteLifecycle() throws Exception {
// Remove lifecycle from concrete data stream
{
DeleteDataStreamLifecycleAction.Request deleteDataLifecycleRequest = new DeleteDataStreamLifecycleAction.Request(
TimeValue.THIRTY_SECONDS,
AcknowledgedRequest.DEFAULT_ACK_TIMEOUT,
new String[] { "with-lifecycle-1" }
);
assertThat(
Expand All @@ -254,6 +257,8 @@ public void testDeleteLifecycle() throws Exception {
// Remove lifecycle from all data streams
{
DeleteDataStreamLifecycleAction.Request deleteDataLifecycleRequest = new DeleteDataStreamLifecycleAction.Request(
TimeValue.THIRTY_SECONDS,
AcknowledgedRequest.DEFAULT_ACK_TIMEOUT,
new String[] { "*" }
);
assertThat(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,7 @@ public void testSystemDataStreamRetention() throws Exception {
client().execute(
PutDataStreamGlobalRetentionAction.INSTANCE,
new PutDataStreamGlobalRetentionAction.Request(
TimeValue.THIRTY_SECONDS,
TimeValue.timeValueSeconds(globalRetentionSeconds),
TimeValue.timeValueSeconds(globalRetentionSeconds)
)
Expand Down Expand Up @@ -290,7 +291,10 @@ public void testSystemDataStreamRetention() throws Exception {

client().execute(DeleteDataStreamAction.INSTANCE, new DeleteDataStreamAction.Request(SYSTEM_DATA_STREAM_NAME)).actionGet();
} finally {
client().execute(DeleteDataStreamGlobalRetentionAction.INSTANCE, new DeleteDataStreamGlobalRetentionAction.Request());
client().execute(
DeleteDataStreamGlobalRetentionAction.INSTANCE,
new DeleteDataStreamGlobalRetentionAction.Request(TimeValue.THIRTY_SECONDS)
);
}
} finally {
dataStreamLifecycleServices.forEach(dataStreamLifecycleService -> dataStreamLifecycleService.setNowSupplier(clock::millis));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,7 @@ public void testSystemExplainLifecycle() throws Exception {
client().execute(
PutDataStreamGlobalRetentionAction.INSTANCE,
new PutDataStreamGlobalRetentionAction.Request(
TimeValue.THIRTY_SECONDS,
TimeValue.timeValueSeconds(globalRetentionSeconds),
TimeValue.timeValueSeconds(globalRetentionSeconds)
)
Expand Down Expand Up @@ -260,7 +261,10 @@ public void testSystemExplainLifecycle() throws Exception {
);
}
} finally {
client().execute(DeleteDataStreamGlobalRetentionAction.INSTANCE, new DeleteDataStreamGlobalRetentionAction.Request());
client().execute(
DeleteDataStreamGlobalRetentionAction.INSTANCE,
new DeleteDataStreamGlobalRetentionAction.Request(TimeValue.THIRTY_SECONDS)
);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.datastreams.lifecycle.UpdateDataStreamGlobalRetentionService;
import org.elasticsearch.features.FeatureService;
import org.elasticsearch.tasks.Task;
Expand Down Expand Up @@ -64,8 +65,8 @@ public void writeTo(StreamOutput out) throws IOException {
out.writeBoolean(dryRun);
}

public Request() {
super(TRAPPY_IMPLICIT_DEFAULT_MASTER_NODE_TIMEOUT);
public Request(TimeValue masterNodeTimeout) {
super(masterNodeTimeout);
}

public boolean dryRun() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.core.TimeValue;

import java.io.IOException;
import java.util.Arrays;
Expand Down Expand Up @@ -47,8 +48,8 @@ public void writeTo(StreamOutput out) throws IOException {
indicesOptions.writeIndicesOptions(out);
}

public Request(String[] names) {
super(TRAPPY_IMPLICIT_DEFAULT_MASTER_NODE_TIMEOUT, DEFAULT_ACK_TIMEOUT);
public Request(TimeValue masterNodeTimeout, TimeValue ackTimeout, String[] names) {
super(masterNodeTimeout, ackTimeout);
this.names = names;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,10 +47,6 @@ private GetDataStreamGlobalRetentionAction() {/* no instances */}

public static final class Request extends MasterNodeReadRequest<Request> {

public Request() {
super(TRAPPY_IMPLICIT_DEFAULT_MASTER_NODE_TIMEOUT);
}

public Request(StreamInput in) throws IOException {
super(in);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,8 @@ public Request(StreamInput in) throws IOException {
super(in);
}

public Request() {
super(TRAPPY_IMPLICIT_DEFAULT_MASTER_NODE_TIMEOUT);
public Request(TimeValue masterNodeTimeout) {
super(masterNodeTimeout);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,6 @@
import org.elasticsearch.tasks.Task;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xcontent.ConstructingObjectParser;
import org.elasticsearch.xcontent.ObjectParser;
import org.elasticsearch.xcontent.XContentParser;

import java.io.IOException;
import java.util.List;
Expand All @@ -53,34 +50,9 @@ private PutDataStreamGlobalRetentionAction() {/* no instances */}

public static final class Request extends MasterNodeRequest<Request> {

public static final ConstructingObjectParser<PutDataStreamGlobalRetentionAction.Request, Void> PARSER =
new ConstructingObjectParser<>(
"put_data_stream_global_retention_request",
args -> new PutDataStreamGlobalRetentionAction.Request((TimeValue) args[0], (TimeValue) args[1])
);

static {
PARSER.declareField(
ConstructingObjectParser.optionalConstructorArg(),
(p, c) -> TimeValue.parseTimeValue(p.textOrNull(), DataStreamGlobalRetention.DEFAULT_RETENTION_FIELD.getPreferredName()),
DataStreamGlobalRetention.DEFAULT_RETENTION_FIELD,
ObjectParser.ValueType.STRING_OR_NULL
);
PARSER.declareField(
ConstructingObjectParser.optionalConstructorArg(),
(p, c) -> TimeValue.parseTimeValue(p.textOrNull(), DataStreamGlobalRetention.MAX_RETENTION_FIELD.getPreferredName()),
DataStreamGlobalRetention.MAX_RETENTION_FIELD,
ObjectParser.ValueType.STRING_OR_NULL
);
}

private final DataStreamGlobalRetention globalRetention;
private boolean dryRun = false;

public static PutDataStreamGlobalRetentionAction.Request parseRequest(XContentParser parser) {
return PARSER.apply(parser, null);
}

public Request(StreamInput in) throws IOException {
super(in);
globalRetention = DataStreamGlobalRetention.read(in);
Expand All @@ -107,8 +79,8 @@ public void writeTo(StreamOutput out) throws IOException {
out.writeBoolean(dryRun);
}

public Request(@Nullable TimeValue defaultRetention, @Nullable TimeValue maxRetention) {
super(TRAPPY_IMPLICIT_DEFAULT_MASTER_NODE_TIMEOUT);
public Request(TimeValue masterNodeTimeout, @Nullable TimeValue defaultRetention, @Nullable TimeValue maxRetention) {
super(masterNodeTimeout);
this.globalRetention = new DataStreamGlobalRetention(defaultRetention, maxRetention);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,7 @@ public List<Route> routes() {

@Override
protected RestChannelConsumer prepareRequest(RestRequest restRequest, NodeClient client) {
GetDataStreamLifecycleStatsAction.Request request = new GetDataStreamLifecycleStatsAction.Request();
request.masterNodeTimeout(getMasterNodeTimeout(restRequest));
final var request = new GetDataStreamLifecycleStatsAction.Request(getMasterNodeTimeout(restRequest));
return channel -> client.execute(
GetDataStreamLifecycleStatsAction.INSTANCE,
request,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
package org.elasticsearch.datastreams.lifecycle.rest;

import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.action.support.master.AcknowledgedRequest;
import org.elasticsearch.client.internal.node.NodeClient;
import org.elasticsearch.common.Strings;
import org.elasticsearch.datastreams.lifecycle.action.DeleteDataStreamLifecycleAction;
Expand All @@ -20,6 +21,7 @@
import java.util.List;

import static org.elasticsearch.rest.RestRequest.Method.DELETE;
import static org.elasticsearch.rest.RestUtils.getMasterNodeTimeout;

@ServerlessScope(Scope.INTERNAL)
public class RestDeleteDataStreamLifecycleAction extends BaseRestHandler {
Expand All @@ -36,7 +38,9 @@ public List<Route> routes() {

@Override
protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient client) {
DeleteDataStreamLifecycleAction.Request deleteDataLifecycleRequest = new DeleteDataStreamLifecycleAction.Request(
final var deleteDataLifecycleRequest = new DeleteDataStreamLifecycleAction.Request(
getMasterNodeTimeout(request),
request.paramAsTime("timeout", AcknowledgedRequest.DEFAULT_ACK_TIMEOUT),
Strings.splitStringByCommaToArray(request.param("name"))
);
deleteDataLifecycleRequest.indicesOptions(IndicesOptions.fromRequest(request, deleteDataLifecycleRequest.indicesOptions()));
Expand Down