Skip to content

Commit

Permalink
Update ProjectOperator to release dropped blocks
Browse files Browse the repository at this point in the history
  • Loading branch information
ChrisHegarty committed Sep 25, 2023
1 parent 8a05425 commit e124dc1
Show file tree
Hide file tree
Showing 5 changed files with 63 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,13 @@

import org.elasticsearch.compute.data.Block;
import org.elasticsearch.compute.data.Page;
import org.elasticsearch.core.Releasable;
import org.elasticsearch.core.Releasables;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.BitSet;
import java.util.List;

public class ProjectOperator extends AbstractPageMappingOperator {

Expand Down Expand Up @@ -52,10 +56,16 @@ protected Page process(Page page) {
Arrays.fill(blocks, null);
int b = 0;
int positionCount = page.getPositionCount();
for (int i = bs.nextSetBit(0); i >= 0 && i < page.getBlockCount(); i = bs.nextSetBit(i + 1)) {
List<Releasable> blocksToRelease = new ArrayList<>();
for (int i = 0; i >= 0 && i < page.getBlockCount(); i++) {
var block = page.getBlock(i);
blocks[b++] = block;
if (bs.get(i)) {
blocks[b++] = block;
} else {
blocksToRelease.add(block);
}
}
Releasables.close(blocksToRelease);
return new Page(positionCount, blocks);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ protected final BigArrays nonBreakingBigArrays() {
/**
* A {@link DriverContext} with a nonBreakingBigArrays.
*/
protected final DriverContext driverContext() {
protected DriverContext driverContext() {
return new DriverContext(nonBreakingBigArrays(), BlockFactory.getNonBreakingInstance());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,7 @@ private void assertSimple(DriverContext context, int size) {
BigArrays bigArrays = context.bigArrays().withCircuitBreaking();
List<Page> results = drive(simple(bigArrays).get(context), input.iterator());
assertSimpleOutput(input, results);
results.forEach(Page::releaseBlocks);
assertThat(bigArrays.breakerService().getBreaker(CircuitBreaker.REQUEST).getUsed(), equalTo(0L));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,22 +7,47 @@

package org.elasticsearch.compute.operator;

import org.elasticsearch.common.breaker.CircuitBreaker;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.common.util.MockBigArrays;
import org.elasticsearch.common.util.PageCacheRecycler;
import org.elasticsearch.compute.data.Block;
import org.elasticsearch.compute.data.ConstantIntVector;
import org.elasticsearch.compute.data.BlockFactory;
import org.elasticsearch.compute.data.IntBlock;
import org.elasticsearch.compute.data.LongBlock;
import org.elasticsearch.compute.data.Page;
import org.elasticsearch.core.Tuple;
import org.elasticsearch.indices.breaker.CircuitBreakerService;
import org.junit.After;
import org.junit.Before;

import java.util.BitSet;
import java.util.List;
import java.util.stream.LongStream;

import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.is;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

public class ProjectOperatorTests extends OperatorTestCase {

final CircuitBreaker breaker = new MockBigArrays.LimitedBreaker("esql-test-breaker", ByteSizeValue.ofGb(1));
final BigArrays bigArrays = new MockBigArrays(PageCacheRecycler.NON_RECYCLING_INSTANCE, mockBreakerService(breaker));
final BlockFactory blockFactory = BlockFactory.getInstance(breaker, bigArrays);

@Before
@After
public void assertBreakerIsZero() {
assertThat(breaker.getUsed(), is(0L));
}

@Override
protected DriverContext driverContext() {
return new DriverContext(blockFactory.bigArrays(), blockFactory);
}

public void testProjectionOnEmptyPage() {
var page = new Page(0);
var projection = new ProjectOperator(randomMask(randomIntBetween(2, 10)));
Expand All @@ -34,7 +59,7 @@ public void testProjection() {
var size = randomIntBetween(2, 5);
var blocks = new Block[size];
for (int i = 0; i < blocks.length; i++) {
blocks[i] = new ConstantIntVector(i, size).asBlock();
blocks[i] = blockFactory.newConstantIntBlockWith(i, size);
}

var page = new Page(size, blocks);
Expand All @@ -52,6 +77,7 @@ public void testProjection() {
assertTrue(mask.get(shouldBeSetInMask));
lastSetIndex = mask.nextSetBit(lastSetIndex + 1);
assertEquals(shouldBeSetInMask, lastSetIndex);
block.close();
}
}

Expand All @@ -65,7 +91,7 @@ private BitSet randomMask(int size) {

@Override
protected SourceOperator simpleInput(int end) {
return new TupleBlockSourceOperator(LongStream.range(0, end).mapToObj(l -> Tuple.tuple(l, end - l)));
return new TupleBlockSourceOperator(LongStream.range(0, end).mapToObj(l -> Tuple.tuple(l, end - l)), blockFactory);
}

@Override
Expand Down Expand Up @@ -106,4 +132,11 @@ protected ByteSizeValue smallEnoughToCircuitBreak() {
assumeTrue("doesn't use big arrays so can't break", false);
return null;
}

// A breaker service that always returns the given breaker for getBreaker(CircuitBreaker.REQUEST)
static CircuitBreakerService mockBreakerService(CircuitBreaker breaker) {
CircuitBreakerService breakerService = mock(CircuitBreakerService.class);
when(breakerService.getBreaker(CircuitBreaker.REQUEST)).thenReturn(breaker);
return breakerService;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@

package org.elasticsearch.compute.operator;

import org.elasticsearch.compute.data.LongBlock;
import org.elasticsearch.compute.data.BlockFactory;
import org.elasticsearch.compute.data.Page;
import org.elasticsearch.core.Tuple;

Expand All @@ -24,13 +24,20 @@ public class TupleBlockSourceOperator extends AbstractBlockSourceOperator {

private final List<Tuple<Long, Long>> values;

final BlockFactory blockFactory;

public TupleBlockSourceOperator(Stream<Tuple<Long, Long>> values) {
this(values, DEFAULT_MAX_PAGE_POSITIONS);
this(values, DEFAULT_MAX_PAGE_POSITIONS, BlockFactory.getNonBreakingInstance());
}

public TupleBlockSourceOperator(Stream<Tuple<Long, Long>> values, BlockFactory blockFactory) {
this(values, DEFAULT_MAX_PAGE_POSITIONS, blockFactory);
}

public TupleBlockSourceOperator(Stream<Tuple<Long, Long>> values, int maxPagePositions) {
public TupleBlockSourceOperator(Stream<Tuple<Long, Long>> values, int maxPagePositions, BlockFactory blockFactory) {
super(maxPagePositions);
this.values = values.toList();
this.blockFactory = blockFactory;
}

public TupleBlockSourceOperator(List<Tuple<Long, Long>> values) {
Expand All @@ -40,12 +47,13 @@ public TupleBlockSourceOperator(List<Tuple<Long, Long>> values) {
public TupleBlockSourceOperator(List<Tuple<Long, Long>> values, int maxPagePositions) {
super(maxPagePositions);
this.values = values;
blockFactory = BlockFactory.getNonBreakingInstance();
}

@Override
protected Page createPage(int positionOffset, int length) {
var blockBuilder1 = LongBlock.newBlockBuilder(length);
var blockBuilder2 = LongBlock.newBlockBuilder(length);
var blockBuilder1 = blockFactory.newLongBlockBuilder(length);
var blockBuilder2 = blockFactory.newLongBlockBuilder(length);
for (int i = 0; i < length; i++) {
Tuple<Long, Long> item = values.get(positionOffset + i);
if (item.v1() == null) {
Expand Down

0 comments on commit e124dc1

Please sign in to comment.