Skip to content

Commit

Permalink
Avoid using small inactive exchange timeout in breaker tests (elastic…
Browse files Browse the repository at this point in the history
…#106394)

The tests failed because we set an inactive exchange timeout too short, 
causing a timeout error instead of triggering the expected
CircuitBreakingException.

Closes elastic#105681
Closes elastic#105543
  • Loading branch information
dnhatn authored Mar 21, 2024
1 parent 3489906 commit bce11c6
Show file tree
Hide file tree
Showing 2 changed files with 6 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,9 @@
import org.elasticsearch.compute.data.BlockStreamInput;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.tasks.CancellableTask;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.tasks.TaskCancelledException;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.Transport;
import org.elasticsearch.transport.TransportChannel;
Expand Down Expand Up @@ -178,13 +180,15 @@ public void messageReceived(OpenExchangeRequest request, TransportChannel channe

private class ExchangeTransportAction implements TransportRequestHandler<ExchangeRequest> {
@Override
public void messageReceived(ExchangeRequest request, TransportChannel channel, Task task) {
public void messageReceived(ExchangeRequest request, TransportChannel channel, Task transportTask) {
final String exchangeId = request.exchangeId();
ActionListener<ExchangeResponse> listener = new ChannelActionListener<>(channel);
final ExchangeSinkHandler sinkHandler = sinks.get(exchangeId);
if (sinkHandler == null) {
listener.onResponse(new ExchangeResponse(blockFactory, null, true));
} else {
CancellableTask task = (CancellableTask) transportTask;
task.addListener(() -> sinkHandler.onFailure(new TaskCancelledException(task.getReasonCancelled())));
sinkHandler.fetchPageAsync(request.sourcesFinished(), listener);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@

package org.elasticsearch.xpack.esql.action;

import org.apache.lucene.tests.util.LuceneTestCase;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.action.DocWriteResponse;
Expand Down Expand Up @@ -35,7 +34,6 @@
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.instanceOf;

@LuceneTestCase.AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/105543")
@TestLogging(value = "org.elasticsearch.xpack.esql:TRACE", reason = "debug")
public class EsqlActionBreakerIT extends EsqlActionIT {

Expand Down Expand Up @@ -72,7 +70,7 @@ protected Settings nodeSettings(int nodeOrdinal, Settings otherSettings) {
HierarchyCircuitBreakerService.REQUEST_CIRCUIT_BREAKER_TYPE_SETTING.getKey(),
HierarchyCircuitBreakerService.REQUEST_CIRCUIT_BREAKER_TYPE_SETTING.getDefault(Settings.EMPTY)
)
.put(ExchangeService.INACTIVE_SINKS_INTERVAL_SETTING, TimeValue.timeValueMillis(between(500, 2000)))
.put(ExchangeService.INACTIVE_SINKS_INTERVAL_SETTING, TimeValue.timeValueSeconds(between(5, 10)))
.put(BlockFactory.LOCAL_BREAKER_OVER_RESERVED_SIZE_SETTING, ByteSizeValue.ofBytes(between(0, 256)))
.put(BlockFactory.LOCAL_BREAKER_OVER_RESERVED_MAX_SIZE_SETTING, ByteSizeValue.ofBytes(between(0, 1024)))
// allow reading pages from network can trip the circuit breaker
Expand Down

0 comments on commit bce11c6

Please sign in to comment.