diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/exchange/ExchangeService.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/exchange/ExchangeService.java index a8afce1a3b223..9af08346256f7 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/exchange/ExchangeService.java +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/exchange/ExchangeService.java @@ -25,7 +25,9 @@ import org.elasticsearch.compute.data.BlockStreamInput; import org.elasticsearch.core.Nullable; import org.elasticsearch.core.TimeValue; +import org.elasticsearch.tasks.CancellableTask; import org.elasticsearch.tasks.Task; +import org.elasticsearch.tasks.TaskCancelledException; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.Transport; import org.elasticsearch.transport.TransportChannel; @@ -178,13 +180,15 @@ public void messageReceived(OpenExchangeRequest request, TransportChannel channe private class ExchangeTransportAction implements TransportRequestHandler { @Override - public void messageReceived(ExchangeRequest request, TransportChannel channel, Task task) { + public void messageReceived(ExchangeRequest request, TransportChannel channel, Task transportTask) { final String exchangeId = request.exchangeId(); ActionListener listener = new ChannelActionListener<>(channel); final ExchangeSinkHandler sinkHandler = sinks.get(exchangeId); if (sinkHandler == null) { listener.onResponse(new ExchangeResponse(blockFactory, null, true)); } else { + CancellableTask task = (CancellableTask) transportTask; + task.addListener(() -> sinkHandler.onFailure(new TaskCancelledException(task.getReasonCancelled()))); sinkHandler.fetchPageAsync(request.sourcesFinished(), listener); } } diff --git a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/EsqlActionBreakerIT.java b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/EsqlActionBreakerIT.java index 85eb0c02625ad..059ed672e56c6 100644 --- a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/EsqlActionBreakerIT.java +++ b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/EsqlActionBreakerIT.java @@ -7,7 +7,6 @@ package org.elasticsearch.xpack.esql.action; -import org.apache.lucene.tests.util.LuceneTestCase; import org.elasticsearch.ElasticsearchException; import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.action.DocWriteResponse; @@ -35,7 +34,6 @@ import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.instanceOf; -@LuceneTestCase.AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/105543") @TestLogging(value = "org.elasticsearch.xpack.esql:TRACE", reason = "debug") public class EsqlActionBreakerIT extends EsqlActionIT { @@ -72,7 +70,7 @@ protected Settings nodeSettings(int nodeOrdinal, Settings otherSettings) { HierarchyCircuitBreakerService.REQUEST_CIRCUIT_BREAKER_TYPE_SETTING.getKey(), HierarchyCircuitBreakerService.REQUEST_CIRCUIT_BREAKER_TYPE_SETTING.getDefault(Settings.EMPTY) ) - .put(ExchangeService.INACTIVE_SINKS_INTERVAL_SETTING, TimeValue.timeValueMillis(between(500, 2000))) + .put(ExchangeService.INACTIVE_SINKS_INTERVAL_SETTING, TimeValue.timeValueSeconds(between(5, 10))) .put(BlockFactory.LOCAL_BREAKER_OVER_RESERVED_SIZE_SETTING, ByteSizeValue.ofBytes(between(0, 256))) .put(BlockFactory.LOCAL_BREAKER_OVER_RESERVED_MAX_SIZE_SETTING, ByteSizeValue.ofBytes(between(0, 1024))) // allow reading pages from network can trip the circuit breaker