Skip to content

Commit

Permalink
Fix trappy timeouts in downsample action (#112734)
Browse files Browse the repository at this point in the history
Relates #107984
  • Loading branch information
DaveCTurner authored Sep 11, 2024
1 parent b6b0d3d commit 2db52ae
Show file tree
Hide file tree
Showing 7 changed files with 32 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -561,7 +561,13 @@ private Set<Index> waitForInProgressOrTriggerDownsampling(
* Issues a request downsample the source index to the downsample index for the specified round.
*/
private void downsampleIndexOnce(DataStreamLifecycle.Downsampling.Round round, String sourceIndex, String downsampleIndexName) {
DownsampleAction.Request request = new DownsampleAction.Request(sourceIndex, downsampleIndexName, null, round.config());
DownsampleAction.Request request = new DownsampleAction.Request(
TimeValue.THIRTY_SECONDS /* TODO should this be longer/configurable? */,
sourceIndex,
downsampleIndexName,
null,
round.config()
);
transportActionsDeduplicator.executeOnce(
request,
new ErrorRecordingActionListener(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,20 +45,21 @@ public static class Request extends MasterNodeRequest<Request> implements Indice
private DownsampleConfig downsampleConfig;

public Request(
TimeValue masterNodeTimeout,
final String sourceIndex,
final String targetIndex,
final TimeValue waitTimeout,
final DownsampleConfig downsampleConfig
) {
super(TRAPPY_IMPLICIT_DEFAULT_MASTER_NODE_TIMEOUT);
super(masterNodeTimeout);
this.sourceIndex = sourceIndex;
this.targetIndex = targetIndex;
this.waitTimeout = waitTimeout == null ? DEFAULT_WAIT_TIMEOUT : waitTimeout;
this.downsampleConfig = downsampleConfig;
}

public Request() {
super(TRAPPY_IMPLICIT_DEFAULT_MASTER_NODE_TIMEOUT);
public Request(TimeValue masterNodeTimeout) {
super(masterNodeTimeout);
}

public Request(StreamInput in) throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,8 +90,13 @@ public void performAction(

void performDownsampleIndex(String indexName, String downsampleIndexName, ActionListener<Void> listener) {
DownsampleConfig config = new DownsampleConfig(fixedInterval);
DownsampleAction.Request request = new DownsampleAction.Request(indexName, downsampleIndexName, waitTimeout, config)
.masterNodeTimeout(TimeValue.MAX_VALUE);
DownsampleAction.Request request = new DownsampleAction.Request(
TimeValue.MAX_VALUE,
indexName,
downsampleIndexName,
waitTimeout,
config
);
// Currently, DownsampleAction always acknowledges action was complete when no exceptions are thrown.
getClient().execute(DownsampleAction.INSTANCE, request, listener.delegateFailureAndWrap((l, response) -> l.onResponse(null)));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -267,6 +267,7 @@ public void testNoDisruption() {
// GIVEN

final DownsampleAction.Request downsampleRequest = new DownsampleAction.Request(
TEST_REQUEST_TIMEOUT,
SOURCE_INDEX_NAME,
TARGET_INDEX_NAME,
WAIT_TIMEOUT,
Expand Down Expand Up @@ -294,6 +295,7 @@ public void testDownsampleActionExceptionDisruption() {
// GIVEN
final MockTransportService coordinator = MockTransportService.getInstance(testCluster.coordinator);
final DownsampleAction.Request downsampleRequest = new DownsampleAction.Request(
TEST_REQUEST_TIMEOUT,
SOURCE_INDEX_NAME,
TARGET_INDEX_NAME,
WAIT_TIMEOUT,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.rest.BaseRestHandler;
import org.elasticsearch.rest.RestRequest;
import org.elasticsearch.rest.RestUtils;
import org.elasticsearch.rest.Scope;
import org.elasticsearch.rest.ServerlessScope;
import org.elasticsearch.rest.action.RestToXContentListener;
Expand Down Expand Up @@ -40,6 +41,7 @@ protected RestChannelConsumer prepareRequest(RestRequest restRequest, NodeClient
config = DownsampleConfig.fromXContent(parser);
}
DownsampleAction.Request request = new DownsampleAction.Request(
RestUtils.getMasterNodeTimeout(restRequest),
sourceIndex,
targetIndex,
TimeValue.parseTimeValue(timeout, null, "wait_timeout"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -594,7 +594,7 @@ public void onFailure(Exception e) {
};
client().execute(
DownsampleAction.INSTANCE,
new DownsampleAction.Request(sourceIndex, downsampleIndex, TIMEOUT, config),
new DownsampleAction.Request(TEST_REQUEST_TIMEOUT, sourceIndex, downsampleIndex, TIMEOUT, config),
downsampleListener
);
assertBusy(() -> {
Expand All @@ -607,7 +607,10 @@ public void onFailure(Exception e) {

assertBusy(() -> {
try {
client().execute(DownsampleAction.INSTANCE, new DownsampleAction.Request(sourceIndex, downsampleIndex, TIMEOUT, config));
client().execute(
DownsampleAction.INSTANCE,
new DownsampleAction.Request(TEST_REQUEST_TIMEOUT, sourceIndex, downsampleIndex, TIMEOUT, config)
);
} catch (ElasticsearchException e) {
fail("transient failure due to overlapping downsample operations");
}
Expand Down Expand Up @@ -1145,7 +1148,10 @@ private void prepareSourceIndex(final String sourceIndex, boolean blockWrite) {

private void downsample(String sourceIndex, String downsampleIndex, DownsampleConfig config) {
assertAcked(
client().execute(DownsampleAction.INSTANCE, new DownsampleAction.Request(sourceIndex, downsampleIndex, TIMEOUT, config))
client().execute(
DownsampleAction.INSTANCE,
new DownsampleAction.Request(TEST_REQUEST_TIMEOUT, sourceIndex, downsampleIndex, TIMEOUT, config)
)
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ public void testDataStreamDownsample() throws ExecutionException, InterruptedExc
// WHEN (simulate downsampling as done by an ILM action)
final String downsampleTargetIndex = DataStream.BACKING_INDEX_PREFIX + dataStreamName + "-downsample-1h";
final DownsampleAction.Request downsampleRequest = new DownsampleAction.Request(
TEST_REQUEST_TIMEOUT,
rolloverResponse.getOldIndex(),
downsampleTargetIndex,
TIMEOUT,
Expand Down

0 comments on commit 2db52ae

Please sign in to comment.