diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/exchange/ExchangeBuffer.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/exchange/ExchangeBuffer.java index 930ced04636f8..df6c09ea1ff97 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/exchange/ExchangeBuffer.java +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/exchange/ExchangeBuffer.java @@ -41,13 +41,12 @@ final class ExchangeBuffer { } void addPage(Page page) { + queue.add(page); + if (queueSize.incrementAndGet() == 1) { + notifyNotEmpty(); + } if (noMoreInputs) { - page.releaseBlocks(); - } else { - queue.add(page); - if (queueSize.incrementAndGet() == 1) { - notifyNotEmpty(); - } + discardPages(); } } @@ -115,13 +114,17 @@ SubscribableListener waitForReading() { } } + private void discardPages() { + Page p; + while ((p = pollPage()) != null) { + p.releaseBlocks(); + } + } + void finish(boolean drainingPages) { noMoreInputs = true; if (drainingPages) { - Page p; - while ((p = pollPage()) != null) { - p.releaseBlocks(); - } + discardPages(); } notifyNotEmpty(); if (drainingPages || queueSize.get() == 0) { diff --git a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/exchange/ExchangeBufferTests.java b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/exchange/ExchangeBufferTests.java new file mode 100644 index 0000000000000..4c975c6c07834 --- /dev/null +++ b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/exchange/ExchangeBufferTests.java @@ -0,0 +1,93 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.compute.operator.exchange; + +import org.elasticsearch.common.breaker.CircuitBreaker; +import org.elasticsearch.common.unit.ByteSizeValue; +import org.elasticsearch.common.util.BigArrays; +import org.elasticsearch.common.util.MockBigArrays; +import org.elasticsearch.common.util.PageCacheRecycler; +import org.elasticsearch.compute.data.BasicBlockTests; +import org.elasticsearch.compute.data.Block; +import org.elasticsearch.compute.data.BlockFactory; +import org.elasticsearch.compute.data.ElementType; +import org.elasticsearch.compute.data.MockBlockFactory; +import org.elasticsearch.compute.data.Page; +import org.elasticsearch.test.ESTestCase; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; + +import static org.hamcrest.Matchers.equalTo; + +public class ExchangeBufferTests extends ESTestCase { + + public void testDrainPages() throws Exception { + ExchangeBuffer buffer = new ExchangeBuffer(randomIntBetween(10, 1000)); + var blockFactory = blockFactory(); + CountDownLatch latch = new CountDownLatch(1); + Thread[] producers = new Thread[between(1, 4)]; + AtomicBoolean stopped = new AtomicBoolean(); + AtomicInteger addedPages = new AtomicInteger(); + for (int t = 0; t < producers.length; t++) { + producers[t] = new Thread(() -> { + try { + latch.await(10, TimeUnit.SECONDS); + } catch (InterruptedException e) { + throw new AssertionError(e); + } + while (stopped.get() == false && addedPages.incrementAndGet() < 10_000) { + buffer.addPage(randomPage(blockFactory)); + } + }); + producers[t].start(); + } + latch.countDown(); + try { + int minPage = between(10, 100); + int receivedPage = 0; + while (receivedPage < minPage) { + Page p = buffer.pollPage(); + if (p != null) { + p.releaseBlocks(); + ++receivedPage; + } + } + } finally { + buffer.finish(true); + stopped.set(true); + } + for (Thread t : producers) { + t.join(); + } + assertThat(buffer.size(), equalTo(0)); + blockFactory.ensureAllBlocksAreReleased(); + } + + private static MockBlockFactory blockFactory() { + BigArrays bigArrays = new MockBigArrays(PageCacheRecycler.NON_RECYCLING_INSTANCE, ByteSizeValue.ofGb(1)).withCircuitBreaking(); + CircuitBreaker breaker = bigArrays.breakerService().getBreaker(CircuitBreaker.REQUEST); + return new MockBlockFactory(breaker, bigArrays); + } + + private static Page randomPage(BlockFactory blockFactory) { + Block block = BasicBlockTests.randomBlock( + blockFactory, + randomFrom(ElementType.LONG, ElementType.BYTES_REF, ElementType.BOOLEAN), + randomIntBetween(1, 100), + randomBoolean(), + 0, + between(1, 2), + 0, + between(1, 2) + ).block(); + return new Page(block); + } +} diff --git a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/EsqlActionIT.java b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/EsqlActionIT.java index f10ca17d741d8..a4c13af3d99ad 100644 --- a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/EsqlActionIT.java +++ b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/EsqlActionIT.java @@ -861,7 +861,6 @@ public void testFromStatsLimit() { } } - @AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/99826") public void testFromLimit() { try (EsqlQueryResponse results = run("from test | keep data | limit 2")) { logger.info(results);