From bce11c6c1c1c73a4b9a57b367cc6d379b9acac1b Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Thu, 21 Mar 2024 08:24:32 -0700 Subject: [PATCH] Avoid using small inactive exchange timeout in breaker tests (#106394) The tests failed because we set an inactive exchange timeout too short, causing a timeout error instead of triggering the expected CircuitBreakingException. Closes #105681 Closes #105543 --- .../compute/operator/exchange/ExchangeService.java | 6 +++++- .../xpack/esql/action/EsqlActionBreakerIT.java | 4 +--- 2 files changed, 6 insertions(+), 4 deletions(-) diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/exchange/ExchangeService.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/exchange/ExchangeService.java index a8afce1a3b223..9af08346256f7 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/exchange/ExchangeService.java +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/exchange/ExchangeService.java @@ -25,7 +25,9 @@ import org.elasticsearch.compute.data.BlockStreamInput; import org.elasticsearch.core.Nullable; import org.elasticsearch.core.TimeValue; +import org.elasticsearch.tasks.CancellableTask; import org.elasticsearch.tasks.Task; +import org.elasticsearch.tasks.TaskCancelledException; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.Transport; import org.elasticsearch.transport.TransportChannel; @@ -178,13 +180,15 @@ public void messageReceived(OpenExchangeRequest request, TransportChannel channe private class ExchangeTransportAction implements TransportRequestHandler { @Override - public void messageReceived(ExchangeRequest request, TransportChannel channel, Task task) { + public void messageReceived(ExchangeRequest request, TransportChannel channel, Task transportTask) { final String exchangeId = request.exchangeId(); ActionListener listener = new ChannelActionListener<>(channel); final ExchangeSinkHandler sinkHandler = sinks.get(exchangeId); if (sinkHandler == null) { listener.onResponse(new ExchangeResponse(blockFactory, null, true)); } else { + CancellableTask task = (CancellableTask) transportTask; + task.addListener(() -> sinkHandler.onFailure(new TaskCancelledException(task.getReasonCancelled()))); sinkHandler.fetchPageAsync(request.sourcesFinished(), listener); } } diff --git a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/EsqlActionBreakerIT.java b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/EsqlActionBreakerIT.java index 85eb0c02625ad..059ed672e56c6 100644 --- a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/EsqlActionBreakerIT.java +++ b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/EsqlActionBreakerIT.java @@ -7,7 +7,6 @@ package org.elasticsearch.xpack.esql.action; -import org.apache.lucene.tests.util.LuceneTestCase; import org.elasticsearch.ElasticsearchException; import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.action.DocWriteResponse; @@ -35,7 +34,6 @@ import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.instanceOf; -@LuceneTestCase.AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/105543") @TestLogging(value = "org.elasticsearch.xpack.esql:TRACE", reason = "debug") public class EsqlActionBreakerIT extends EsqlActionIT { @@ -72,7 +70,7 @@ protected Settings nodeSettings(int nodeOrdinal, Settings otherSettings) { HierarchyCircuitBreakerService.REQUEST_CIRCUIT_BREAKER_TYPE_SETTING.getKey(), HierarchyCircuitBreakerService.REQUEST_CIRCUIT_BREAKER_TYPE_SETTING.getDefault(Settings.EMPTY) ) - .put(ExchangeService.INACTIVE_SINKS_INTERVAL_SETTING, TimeValue.timeValueMillis(between(500, 2000))) + .put(ExchangeService.INACTIVE_SINKS_INTERVAL_SETTING, TimeValue.timeValueSeconds(between(5, 10))) .put(BlockFactory.LOCAL_BREAKER_OVER_RESERVED_SIZE_SETTING, ByteSizeValue.ofBytes(between(0, 256))) .put(BlockFactory.LOCAL_BREAKER_OVER_RESERVED_MAX_SIZE_SETTING, ByteSizeValue.ofBytes(between(0, 1024))) // allow reading pages from network can trip the circuit breaker