From e124dc1d4de4558c48cecfd540e32d7d7c2e7cb4 Mon Sep 17 00:00:00 2001 From: ChrisHegarty Date: Mon, 25 Sep 2023 20:49:40 +0100 Subject: [PATCH] Update ProjectOperator to release dropped blocks --- .../compute/operator/ProjectOperator.java | 14 ++++++- .../compute/operator/AnyOperatorTestCase.java | 2 +- .../compute/operator/OperatorTestCase.java | 1 + .../operator/ProjectOperatorTests.java | 39 +++++++++++++++++-- .../operator/TupleBlockSourceOperator.java | 18 ++++++--- 5 files changed, 63 insertions(+), 11 deletions(-) diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/ProjectOperator.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/ProjectOperator.java index 4192bfd570bd4..0ecba6d10e505 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/ProjectOperator.java +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/ProjectOperator.java @@ -9,9 +9,13 @@ import org.elasticsearch.compute.data.Block; import org.elasticsearch.compute.data.Page; +import org.elasticsearch.core.Releasable; +import org.elasticsearch.core.Releasables; +import java.util.ArrayList; import java.util.Arrays; import java.util.BitSet; +import java.util.List; public class ProjectOperator extends AbstractPageMappingOperator { @@ -52,10 +56,16 @@ protected Page process(Page page) { Arrays.fill(blocks, null); int b = 0; int positionCount = page.getPositionCount(); - for (int i = bs.nextSetBit(0); i >= 0 && i < page.getBlockCount(); i = bs.nextSetBit(i + 1)) { + List blocksToRelease = new ArrayList<>(); + for (int i = 0; i >= 0 && i < page.getBlockCount(); i++) { var block = page.getBlock(i); - blocks[b++] = block; + if (bs.get(i)) { + blocks[b++] = block; + } else { + blocksToRelease.add(block); + } } + Releasables.close(blocksToRelease); return new Page(positionCount, blocks); } diff --git a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/AnyOperatorTestCase.java b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/AnyOperatorTestCase.java index edbc59f9497fc..8f995d9a31bc3 100644 --- a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/AnyOperatorTestCase.java +++ b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/AnyOperatorTestCase.java @@ -95,7 +95,7 @@ protected final BigArrays nonBreakingBigArrays() { /** * A {@link DriverContext} with a nonBreakingBigArrays. */ - protected final DriverContext driverContext() { + protected DriverContext driverContext() { return new DriverContext(nonBreakingBigArrays(), BlockFactory.getNonBreakingInstance()); } } diff --git a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/OperatorTestCase.java b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/OperatorTestCase.java index 6a2ace060e1e6..3cbab148e3073 100644 --- a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/OperatorTestCase.java +++ b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/OperatorTestCase.java @@ -143,6 +143,7 @@ private void assertSimple(DriverContext context, int size) { BigArrays bigArrays = context.bigArrays().withCircuitBreaking(); List results = drive(simple(bigArrays).get(context), input.iterator()); assertSimpleOutput(input, results); + results.forEach(Page::releaseBlocks); assertThat(bigArrays.breakerService().getBreaker(CircuitBreaker.REQUEST).getUsed(), equalTo(0L)); } diff --git a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/ProjectOperatorTests.java b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/ProjectOperatorTests.java index 691c6f6cdbf56..b0b30e892e26e 100644 --- a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/ProjectOperatorTests.java +++ b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/ProjectOperatorTests.java @@ -7,22 +7,47 @@ package org.elasticsearch.compute.operator; +import org.elasticsearch.common.breaker.CircuitBreaker; import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.util.BigArrays; +import org.elasticsearch.common.util.MockBigArrays; +import org.elasticsearch.common.util.PageCacheRecycler; import org.elasticsearch.compute.data.Block; -import org.elasticsearch.compute.data.ConstantIntVector; +import org.elasticsearch.compute.data.BlockFactory; import org.elasticsearch.compute.data.IntBlock; import org.elasticsearch.compute.data.LongBlock; import org.elasticsearch.compute.data.Page; import org.elasticsearch.core.Tuple; +import org.elasticsearch.indices.breaker.CircuitBreakerService; +import org.junit.After; +import org.junit.Before; import java.util.BitSet; import java.util.List; import java.util.stream.LongStream; import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.is; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; public class ProjectOperatorTests extends OperatorTestCase { + + final CircuitBreaker breaker = new MockBigArrays.LimitedBreaker("esql-test-breaker", ByteSizeValue.ofGb(1)); + final BigArrays bigArrays = new MockBigArrays(PageCacheRecycler.NON_RECYCLING_INSTANCE, mockBreakerService(breaker)); + final BlockFactory blockFactory = BlockFactory.getInstance(breaker, bigArrays); + + @Before + @After + public void assertBreakerIsZero() { + assertThat(breaker.getUsed(), is(0L)); + } + + @Override + protected DriverContext driverContext() { + return new DriverContext(blockFactory.bigArrays(), blockFactory); + } + public void testProjectionOnEmptyPage() { var page = new Page(0); var projection = new ProjectOperator(randomMask(randomIntBetween(2, 10))); @@ -34,7 +59,7 @@ public void testProjection() { var size = randomIntBetween(2, 5); var blocks = new Block[size]; for (int i = 0; i < blocks.length; i++) { - blocks[i] = new ConstantIntVector(i, size).asBlock(); + blocks[i] = blockFactory.newConstantIntBlockWith(i, size); } var page = new Page(size, blocks); @@ -52,6 +77,7 @@ public void testProjection() { assertTrue(mask.get(shouldBeSetInMask)); lastSetIndex = mask.nextSetBit(lastSetIndex + 1); assertEquals(shouldBeSetInMask, lastSetIndex); + block.close(); } } @@ -65,7 +91,7 @@ private BitSet randomMask(int size) { @Override protected SourceOperator simpleInput(int end) { - return new TupleBlockSourceOperator(LongStream.range(0, end).mapToObj(l -> Tuple.tuple(l, end - l))); + return new TupleBlockSourceOperator(LongStream.range(0, end).mapToObj(l -> Tuple.tuple(l, end - l)), blockFactory); } @Override @@ -106,4 +132,11 @@ protected ByteSizeValue smallEnoughToCircuitBreak() { assumeTrue("doesn't use big arrays so can't break", false); return null; } + + // A breaker service that always returns the given breaker for getBreaker(CircuitBreaker.REQUEST) + static CircuitBreakerService mockBreakerService(CircuitBreaker breaker) { + CircuitBreakerService breakerService = mock(CircuitBreakerService.class); + when(breakerService.getBreaker(CircuitBreaker.REQUEST)).thenReturn(breaker); + return breakerService; + } } diff --git a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/TupleBlockSourceOperator.java b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/TupleBlockSourceOperator.java index 0bcaac0e5b646..1baa522c23930 100644 --- a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/TupleBlockSourceOperator.java +++ b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/TupleBlockSourceOperator.java @@ -7,7 +7,7 @@ package org.elasticsearch.compute.operator; -import org.elasticsearch.compute.data.LongBlock; +import org.elasticsearch.compute.data.BlockFactory; import org.elasticsearch.compute.data.Page; import org.elasticsearch.core.Tuple; @@ -24,13 +24,20 @@ public class TupleBlockSourceOperator extends AbstractBlockSourceOperator { private final List> values; + final BlockFactory blockFactory; + public TupleBlockSourceOperator(Stream> values) { - this(values, DEFAULT_MAX_PAGE_POSITIONS); + this(values, DEFAULT_MAX_PAGE_POSITIONS, BlockFactory.getNonBreakingInstance()); + } + + public TupleBlockSourceOperator(Stream> values, BlockFactory blockFactory) { + this(values, DEFAULT_MAX_PAGE_POSITIONS, blockFactory); } - public TupleBlockSourceOperator(Stream> values, int maxPagePositions) { + public TupleBlockSourceOperator(Stream> values, int maxPagePositions, BlockFactory blockFactory) { super(maxPagePositions); this.values = values.toList(); + this.blockFactory = blockFactory; } public TupleBlockSourceOperator(List> values) { @@ -40,12 +47,13 @@ public TupleBlockSourceOperator(List> values) { public TupleBlockSourceOperator(List> values, int maxPagePositions) { super(maxPagePositions); this.values = values; + blockFactory = BlockFactory.getNonBreakingInstance(); } @Override protected Page createPage(int positionOffset, int length) { - var blockBuilder1 = LongBlock.newBlockBuilder(length); - var blockBuilder2 = LongBlock.newBlockBuilder(length); + var blockBuilder1 = blockFactory.newLongBlockBuilder(length); + var blockBuilder2 = blockFactory.newLongBlockBuilder(length); for (int i = 0; i < length; i++) { Tuple item = values.get(positionOffset + i); if (item.v1() == null) {