From e4868b4c521d163306a8ff72fbdb8c73d3b4dd26 Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Tue, 10 Oct 2023 09:59:32 -0700 Subject: [PATCH 1/5] Enable limit tests --- .../java/org/elasticsearch/xpack/esql/action/EsqlActionIT.java | 1 - 1 file changed, 1 deletion(-) diff --git a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/EsqlActionIT.java b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/EsqlActionIT.java index f10ca17d741d8..a4c13af3d99ad 100644 --- a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/EsqlActionIT.java +++ b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/EsqlActionIT.java @@ -861,7 +861,6 @@ public void testFromStatsLimit() { } } - @AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/99826") public void testFromLimit() { try (EsqlQueryResponse results = run("from test | keep data | limit 2")) { logger.info(results); From 5f5fee63b7cfa1c82b28100047bcf488b32378c7 Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Tue, 10 Oct 2023 10:06:27 -0700 Subject: [PATCH 2/5] Delay adding pages --- .../compute/operator/exchange/ExchangeBuffer.java | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/exchange/ExchangeBuffer.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/exchange/ExchangeBuffer.java index 930ced04636f8..fda3ad4a9e770 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/exchange/ExchangeBuffer.java +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/exchange/ExchangeBuffer.java @@ -9,10 +9,12 @@ import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.support.SubscribableListener; +import org.elasticsearch.common.Randomness; import org.elasticsearch.compute.data.Page; import org.elasticsearch.compute.operator.Operator; import java.util.Queue; +import java.util.Random; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.atomic.AtomicInteger; @@ -44,6 +46,11 @@ void addPage(Page page) { if (noMoreInputs) { page.releaseBlocks(); } else { + try { + Thread.sleep(Randomness.get().nextInt(100)); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } queue.add(page); if (queueSize.incrementAndGet() == 1) { notifyNotEmpty(); From 89a72ef04491c6a853207cf6681009e01b5fa6c8 Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Tue, 10 Oct 2023 10:29:34 -0700 Subject: [PATCH 3/5] Discard pages --- .../operator/exchange/ExchangeBuffer.java | 22 +++++++++---------- 1 file changed, 11 insertions(+), 11 deletions(-) diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/exchange/ExchangeBuffer.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/exchange/ExchangeBuffer.java index fda3ad4a9e770..a5353f57a930c 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/exchange/ExchangeBuffer.java +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/exchange/ExchangeBuffer.java @@ -9,12 +9,10 @@ import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.support.SubscribableListener; -import org.elasticsearch.common.Randomness; import org.elasticsearch.compute.data.Page; import org.elasticsearch.compute.operator.Operator; import java.util.Queue; -import java.util.Random; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.atomic.AtomicInteger; @@ -46,15 +44,13 @@ void addPage(Page page) { if (noMoreInputs) { page.releaseBlocks(); } else { - try { - Thread.sleep(Randomness.get().nextInt(100)); - } catch (InterruptedException e) { - throw new RuntimeException(e); - } queue.add(page); if (queueSize.incrementAndGet() == 1) { notifyNotEmpty(); } + if (noMoreInputs) { + discardPages(); + } } } @@ -122,13 +118,17 @@ SubscribableListener waitForReading() { } } + private void discardPages() { + Page p; + while ((p = pollPage()) != null) { + p.releaseBlocks(); + } + } + void finish(boolean drainingPages) { noMoreInputs = true; if (drainingPages) { - Page p; - while ((p = pollPage()) != null) { - p.releaseBlocks(); - } + discardPages(); } notifyNotEmpty(); if (drainingPages || queueSize.get() == 0) { From 12f8bf6130e1b99b48d6058a6e87414c40acded7 Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Tue, 10 Oct 2023 13:09:03 -0700 Subject: [PATCH 4/5] Discard pages --- .../compute/operator/exchange/ExchangeBuffer.java | 14 +++++--------- 1 file changed, 5 insertions(+), 9 deletions(-) diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/exchange/ExchangeBuffer.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/exchange/ExchangeBuffer.java index a5353f57a930c..df6c09ea1ff97 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/exchange/ExchangeBuffer.java +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/exchange/ExchangeBuffer.java @@ -41,16 +41,12 @@ final class ExchangeBuffer { } void addPage(Page page) { + queue.add(page); + if (queueSize.incrementAndGet() == 1) { + notifyNotEmpty(); + } if (noMoreInputs) { - page.releaseBlocks(); - } else { - queue.add(page); - if (queueSize.incrementAndGet() == 1) { - notifyNotEmpty(); - } - if (noMoreInputs) { - discardPages(); - } + discardPages(); } } From 549c8f27e763086625f1100e053b8788f4def541 Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Tue, 10 Oct 2023 21:46:06 -0700 Subject: [PATCH 5/5] Add test --- .../exchange/ExchangeBufferTests.java | 93 +++++++++++++++++++ 1 file changed, 93 insertions(+) create mode 100644 x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/exchange/ExchangeBufferTests.java diff --git a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/exchange/ExchangeBufferTests.java b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/exchange/ExchangeBufferTests.java new file mode 100644 index 0000000000000..4c975c6c07834 --- /dev/null +++ b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/exchange/ExchangeBufferTests.java @@ -0,0 +1,93 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.compute.operator.exchange; + +import org.elasticsearch.common.breaker.CircuitBreaker; +import org.elasticsearch.common.unit.ByteSizeValue; +import org.elasticsearch.common.util.BigArrays; +import org.elasticsearch.common.util.MockBigArrays; +import org.elasticsearch.common.util.PageCacheRecycler; +import org.elasticsearch.compute.data.BasicBlockTests; +import org.elasticsearch.compute.data.Block; +import org.elasticsearch.compute.data.BlockFactory; +import org.elasticsearch.compute.data.ElementType; +import org.elasticsearch.compute.data.MockBlockFactory; +import org.elasticsearch.compute.data.Page; +import org.elasticsearch.test.ESTestCase; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; + +import static org.hamcrest.Matchers.equalTo; + +public class ExchangeBufferTests extends ESTestCase { + + public void testDrainPages() throws Exception { + ExchangeBuffer buffer = new ExchangeBuffer(randomIntBetween(10, 1000)); + var blockFactory = blockFactory(); + CountDownLatch latch = new CountDownLatch(1); + Thread[] producers = new Thread[between(1, 4)]; + AtomicBoolean stopped = new AtomicBoolean(); + AtomicInteger addedPages = new AtomicInteger(); + for (int t = 0; t < producers.length; t++) { + producers[t] = new Thread(() -> { + try { + latch.await(10, TimeUnit.SECONDS); + } catch (InterruptedException e) { + throw new AssertionError(e); + } + while (stopped.get() == false && addedPages.incrementAndGet() < 10_000) { + buffer.addPage(randomPage(blockFactory)); + } + }); + producers[t].start(); + } + latch.countDown(); + try { + int minPage = between(10, 100); + int receivedPage = 0; + while (receivedPage < minPage) { + Page p = buffer.pollPage(); + if (p != null) { + p.releaseBlocks(); + ++receivedPage; + } + } + } finally { + buffer.finish(true); + stopped.set(true); + } + for (Thread t : producers) { + t.join(); + } + assertThat(buffer.size(), equalTo(0)); + blockFactory.ensureAllBlocksAreReleased(); + } + + private static MockBlockFactory blockFactory() { + BigArrays bigArrays = new MockBigArrays(PageCacheRecycler.NON_RECYCLING_INSTANCE, ByteSizeValue.ofGb(1)).withCircuitBreaking(); + CircuitBreaker breaker = bigArrays.breakerService().getBreaker(CircuitBreaker.REQUEST); + return new MockBlockFactory(breaker, bigArrays); + } + + private static Page randomPage(BlockFactory blockFactory) { + Block block = BasicBlockTests.randomBlock( + blockFactory, + randomFrom(ElementType.LONG, ElementType.BYTES_REF, ElementType.BOOLEAN), + randomIntBetween(1, 100), + randomBoolean(), + 0, + between(1, 2), + 0, + between(1, 2) + ).block(); + return new Page(block); + } +}