From b27bafa44befb43c0ded3bc3efda929ece56a37c Mon Sep 17 00:00:00 2001 From: ChrisHegarty Date: Fri, 29 Sep 2023 14:34:34 +0100 Subject: [PATCH] more fixes and tests --- .../blockhash/BooleanBlockHash.java | 21 +- .../blockhash/BytesRefBlockHash.java | 37 +- .../blockhash/DoubleBlockHash.java | 27 +- .../aggregation/blockhash/IntBlockHash.java | 18 +- .../aggregation/blockhash/LongBlockHash.java | 27 +- .../blockhash/LongLongBlockHash.java | 31 +- .../blockhash/PackedValuesBlockHash.java | 4 +- .../compute/data/BlockUtils.java | 8 +- .../compute/data/X-Vector.java.st | 4 +- .../compute/operator/Driver.java | 2 +- .../operator/HashAggregationOperator.java | 6 +- .../GroupingAggregatorFunctionTestCase.java | 5 + .../aggregation/blockhash/BlockHashTests.java | 598 ++++++++++-------- .../compute/data/DocVectorTests.java | 3 +- .../compute/data/FilteredBlockTests.java | 33 +- .../operator/ForkingOperatorTestCase.java | 30 +- .../compute/operator/OperatorTestCase.java | 47 +- 17 files changed, 504 insertions(+), 397 deletions(-) diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/aggregation/blockhash/BooleanBlockHash.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/aggregation/blockhash/BooleanBlockHash.java index e8ac32b7f22f..6c0d8e49d452 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/aggregation/blockhash/BooleanBlockHash.java +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/aggregation/blockhash/BooleanBlockHash.java @@ -12,7 +12,6 @@ import org.elasticsearch.compute.aggregation.GroupingAggregatorFunction; import org.elasticsearch.compute.data.BooleanBlock; import org.elasticsearch.compute.data.BooleanVector; -import org.elasticsearch.compute.data.IntArrayVector; import org.elasticsearch.compute.data.IntBlock; import org.elasticsearch.compute.data.IntVector; import org.elasticsearch.compute.data.Page; @@ -41,22 +40,28 @@ public void add(Page page, GroupingAggregatorFunction.AddInput addInput) { BooleanBlock block = page.getBlock(channel); BooleanVector vector = block.asVector(); if (vector == null) { - addInput.add(0, add(block)); + try (IntBlock groupIds = add(block)) { + addInput.add(0, groupIds); + } } else { - addInput.add(0, add(vector)); + try (IntVector groupIds = add(vector)) { + addInput.add(0, groupIds); + } } } private IntVector add(BooleanVector vector) { - int[] groups = new int[vector.getPositionCount()]; - for (int i = 0; i < vector.getPositionCount(); i++) { - groups[i] = MultivalueDedupeBoolean.hashOrd(everSeen, vector.getBoolean(i)); + int positions = vector.getPositionCount(); + try (var builder = IntVector.newVectorFixedBuilder(positions, blockFactory)) { + for (int i = 0; i < positions; i++) { + builder.appendInt(MultivalueDedupeBoolean.hashOrd(everSeen, vector.getBoolean(i))); + } + return builder.build(); } - return new IntArrayVector(groups, groups.length); } private IntBlock add(BooleanBlock block) { - return new MultivalueDedupeBoolean(block).hash(everSeen); + return new MultivalueDedupeBoolean(block).hash(everSeen); // TODO: block builder } @Override diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/aggregation/blockhash/BytesRefBlockHash.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/aggregation/blockhash/BytesRefBlockHash.java index e651279a59d5..f784ff1032f4 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/aggregation/blockhash/BytesRefBlockHash.java +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/aggregation/blockhash/BytesRefBlockHash.java @@ -17,10 +17,8 @@ import org.elasticsearch.common.util.BytesRefHash; import org.elasticsearch.compute.aggregation.GroupingAggregatorFunction; import org.elasticsearch.compute.aggregation.SeenGroupIds; -import org.elasticsearch.compute.data.BytesRefArrayVector; import org.elasticsearch.compute.data.BytesRefBlock; import org.elasticsearch.compute.data.BytesRefVector; -import org.elasticsearch.compute.data.IntArrayVector; import org.elasticsearch.compute.data.IntBlock; import org.elasticsearch.compute.data.IntVector; import org.elasticsearch.compute.data.Page; @@ -58,22 +56,28 @@ public void add(Page page, GroupingAggregatorFunction.AddInput addInput) { BytesRefBlock block = page.getBlock(channel); BytesRefVector vector = block.asVector(); if (vector == null) { - addInput.add(0, add(block)); + try (IntBlock groupIds = add(block)) { + addInput.add(0, groupIds); + } } else { - addInput.add(0, add(vector)); + try (IntVector groupIds = add(vector)) { + addInput.add(0, groupIds); + } } } private IntVector add(BytesRefVector vector) { - int[] groups = new int[vector.getPositionCount()]; - for (int i = 0; i < vector.getPositionCount(); i++) { - groups[i] = Math.toIntExact(hashOrdToGroupNullReserved(bytesRefHash.add(vector.getBytesRef(i, bytes)))); + int positions = vector.getPositionCount(); + try (var builder = IntVector.newVectorFixedBuilder(positions, blockFactory)) { + for (int i = 0; i < positions; i++) { + builder.appendInt(Math.toIntExact(hashOrdToGroupNullReserved(bytesRefHash.add(vector.getBytesRef(i, bytes))))); + } + return builder.build(); } - return new IntArrayVector(groups, vector.getPositionCount()); } private IntBlock add(BytesRefBlock block) { - MultivalueDedupe.HashResult result = new MultivalueDedupeBytesRef(block).hash(bytesRefHash); + MultivalueDedupe.HashResult result = new MultivalueDedupeBytesRef(block).hash(bytesRefHash); // TODO: block builder seenNull |= result.sawNull(); return result.ords(); } @@ -87,13 +91,14 @@ public BytesRefBlock[] getKeys() { // TODO replace with takeBytesRefsOwnership ?! if (seenNull) { - BytesRefBlock.Builder builder = BytesRefBlock.newBlockBuilder(Math.toIntExact(bytesRefHash.size() + 1)); - builder.appendNull(); - BytesRef spare = new BytesRef(); - for (long i = 0; i < bytesRefHash.size(); i++) { - builder.appendBytesRef(bytesRefHash.get(i, spare)); + try (var builder = blockFactory.newBytesRefBlockBuilder(Math.toIntExact(bytesRefHash.size() + 1))) { + builder.appendNull(); + BytesRef spare = new BytesRef(); + for (long i = 0; i < bytesRefHash.size(); i++) { + builder.appendBytesRef(bytesRefHash.get(i, spare)); + } + return new BytesRefBlock[] { builder.build() }; } - return new BytesRefBlock[] { builder.build() }; } final int size = Math.toIntExact(bytesRefHash.size()); @@ -101,7 +106,7 @@ public BytesRefBlock[] getKeys() { bytesRefHash.getBytesRefs().writeTo(out); try (StreamInput in = out.bytes().streamInput()) { return new BytesRefBlock[] { - new BytesRefArrayVector(new BytesRefArray(in, BigArrays.NON_RECYCLING_INSTANCE), size).asBlock() }; + blockFactory.newBytesRefArrayVector(new BytesRefArray(in, BigArrays.NON_RECYCLING_INSTANCE), size).asBlock() }; } } catch (IOException e) { throw new IllegalStateException(e); diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/aggregation/blockhash/DoubleBlockHash.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/aggregation/blockhash/DoubleBlockHash.java index 68b9b4315b5d..298bd9eff93d 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/aggregation/blockhash/DoubleBlockHash.java +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/aggregation/blockhash/DoubleBlockHash.java @@ -13,11 +13,8 @@ import org.elasticsearch.compute.aggregation.GroupingAggregatorFunction; import org.elasticsearch.compute.aggregation.SeenGroupIds; import org.elasticsearch.compute.data.Block; -import org.elasticsearch.compute.data.DoubleArrayBlock; -import org.elasticsearch.compute.data.DoubleArrayVector; import org.elasticsearch.compute.data.DoubleBlock; import org.elasticsearch.compute.data.DoubleVector; -import org.elasticsearch.compute.data.IntArrayVector; import org.elasticsearch.compute.data.IntBlock; import org.elasticsearch.compute.data.IntVector; import org.elasticsearch.compute.data.Page; @@ -54,22 +51,28 @@ public void add(Page page, GroupingAggregatorFunction.AddInput addInput) { DoubleBlock block = page.getBlock(channel); DoubleVector vector = block.asVector(); if (vector == null) { - addInput.add(0, add(block)); + try (IntBlock groupIds = add(block)) { + addInput.add(0, groupIds); + } } else { - addInput.add(0, add(vector)); + try (IntVector groupIds = add(vector)) { + addInput.add(0, groupIds); + } } } private IntVector add(DoubleVector vector) { - int[] groups = new int[vector.getPositionCount()]; - for (int i = 0; i < vector.getPositionCount(); i++) { - groups[i] = Math.toIntExact(hashOrdToGroupNullReserved(longHash.add(Double.doubleToLongBits(vector.getDouble(i))))); + int positions = vector.getPositionCount(); + try (var builder = IntVector.newVectorFixedBuilder(positions, blockFactory)) { + for (int i = 0; i < positions; i++) { + builder.appendInt(Math.toIntExact(hashOrdToGroupNullReserved(longHash.add(Double.doubleToLongBits(vector.getDouble(i)))))); + } + return builder.build(); } - return new IntArrayVector(groups, groups.length); } private IntBlock add(DoubleBlock block) { - MultivalueDedupe.HashResult result = new MultivalueDedupeDouble(block).hash(longHash); + MultivalueDedupe.HashResult result = new MultivalueDedupeDouble(block).hash(longHash); // TODO: block builder seenNull |= result.sawNull(); return result.ords(); } @@ -85,7 +88,7 @@ public DoubleBlock[] getKeys() { BitSet nulls = new BitSet(1); nulls.set(0); return new DoubleBlock[] { - new DoubleArrayBlock(keys, keys.length, null, nulls, Block.MvOrdering.DEDUPLICATED_AND_SORTED_ASCENDING) }; + blockFactory.newDoubleArrayBlock(keys, keys.length, null, nulls, Block.MvOrdering.DEDUPLICATED_AND_SORTED_ASCENDING) }; } final int size = Math.toIntExact(longHash.size()); @@ -95,7 +98,7 @@ public DoubleBlock[] getKeys() { } // TODO claim the array and wrap? - return new DoubleBlock[] { new DoubleArrayVector(keys, keys.length).asBlock() }; + return new DoubleBlock[] { blockFactory.newDoubleArrayVector(keys, keys.length).asBlock() }; } @Override diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/aggregation/blockhash/IntBlockHash.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/aggregation/blockhash/IntBlockHash.java index 519ba4724057..403355b7e2ea 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/aggregation/blockhash/IntBlockHash.java +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/aggregation/blockhash/IntBlockHash.java @@ -53,22 +53,23 @@ public void add(Page page, GroupingAggregatorFunction.AddInput addInput) { } } else { try (IntVector groupIds = add(vector)) { - addInput.add(0, add(groupIds)); + addInput.add(0, groupIds); } } } private IntVector add(IntVector vector) { - long preAdjustBytes = blockFactory.preAdjustBreakerForInt(vector.getPositionCount()); - int[] groups = new int[vector.getPositionCount()]; - for (int i = 0; i < vector.getPositionCount(); i++) { - groups[i] = Math.toIntExact(hashOrdToGroupNullReserved(longHash.add(vector.getInt(i)))); + int positions = vector.getPositionCount(); + try (var builder = IntVector.newVectorFixedBuilder(positions, blockFactory)) { + for (int i = 0; i < positions; i++) { + builder.appendInt(Math.toIntExact(hashOrdToGroupNullReserved(longHash.add(vector.getInt(i))))); + } + return builder.build(); } - return blockFactory.newIntArrayVector(groups, groups.length, preAdjustBytes); } private IntBlock add(IntBlock block) { - MultivalueDedupe.HashResult result = new MultivalueDedupeInt(block).hash(longHash); + MultivalueDedupe.HashResult result = new MultivalueDedupeInt(block).hash(longHash); // TODO: block builder seenNull |= result.sawNull(); return result.ords(); } @@ -83,7 +84,8 @@ public IntBlock[] getKeys() { } BitSet nulls = new BitSet(1); nulls.set(0); - return new IntBlock[] { blockFactory.newIntArrayBlock(keys, keys.length, null, nulls, Block.MvOrdering.DEDUPLICATED_AND_SORTED_ASCENDING) }; + return new IntBlock[] { + blockFactory.newIntArrayBlock(keys, keys.length, null, nulls, Block.MvOrdering.DEDUPLICATED_AND_SORTED_ASCENDING) }; } final int size = Math.toIntExact(longHash.size()); final int[] keys = new int[size]; diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/aggregation/blockhash/LongBlockHash.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/aggregation/blockhash/LongBlockHash.java index f4d28429cb6e..393de2a400ec 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/aggregation/blockhash/LongBlockHash.java +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/aggregation/blockhash/LongBlockHash.java @@ -13,11 +13,8 @@ import org.elasticsearch.compute.aggregation.GroupingAggregatorFunction; import org.elasticsearch.compute.aggregation.SeenGroupIds; import org.elasticsearch.compute.data.Block; -import org.elasticsearch.compute.data.IntArrayVector; import org.elasticsearch.compute.data.IntBlock; import org.elasticsearch.compute.data.IntVector; -import org.elasticsearch.compute.data.LongArrayBlock; -import org.elasticsearch.compute.data.LongArrayVector; import org.elasticsearch.compute.data.LongBlock; import org.elasticsearch.compute.data.LongVector; import org.elasticsearch.compute.data.Page; @@ -54,22 +51,28 @@ public void add(Page page, GroupingAggregatorFunction.AddInput addInput) { LongBlock block = page.getBlock(channel); LongVector vector = block.asVector(); if (vector == null) { - addInput.add(0, add(block)); + try (IntBlock groupIds = add(block)) { + addInput.add(0, groupIds); + } } else { - addInput.add(0, add(vector)); + try (IntVector groupIds = add(vector)) { + addInput.add(0, groupIds); + } } } private IntVector add(LongVector vector) { - int[] groups = new int[vector.getPositionCount()]; - for (int i = 0; i < vector.getPositionCount(); i++) { - groups[i] = Math.toIntExact(hashOrdToGroupNullReserved(longHash.add(vector.getLong(i)))); + int positions = vector.getPositionCount(); + try (var builder = IntVector.newVectorFixedBuilder(positions, blockFactory)) { + for (int i = 0; i < positions; i++) { + builder.appendInt(Math.toIntExact(hashOrdToGroupNullReserved(longHash.add(vector.getLong(i))))); + } + return builder.build(); } - return new IntArrayVector(groups, groups.length); } private IntBlock add(LongBlock block) { - MultivalueDedupe.HashResult result = new MultivalueDedupeLong(block).hash(longHash); + MultivalueDedupe.HashResult result = new MultivalueDedupeLong(block).hash(longHash); // TODO: block builder seenNull |= result.sawNull(); return result.ords(); } @@ -85,7 +88,7 @@ public LongBlock[] getKeys() { BitSet nulls = new BitSet(1); nulls.set(0); return new LongBlock[] { - new LongArrayBlock(keys, keys.length, null, nulls, Block.MvOrdering.DEDUPLICATED_AND_SORTED_ASCENDING) }; + blockFactory.newLongArrayBlock(keys, keys.length, null, nulls, Block.MvOrdering.DEDUPLICATED_AND_SORTED_ASCENDING) }; } final int size = Math.toIntExact(longHash.size()); @@ -95,7 +98,7 @@ public LongBlock[] getKeys() { } // TODO call something like takeKeyOwnership to claim the keys array directly - return new LongBlock[] { new LongArrayVector(keys, keys.length).asBlock() }; + return new LongBlock[] { blockFactory.newLongArrayVector(keys, keys.length).asBlock() }; } @Override diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/aggregation/blockhash/LongLongBlockHash.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/aggregation/blockhash/LongLongBlockHash.java index f5c6bdd317a2..cd78c45c920f 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/aggregation/blockhash/LongLongBlockHash.java +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/aggregation/blockhash/LongLongBlockHash.java @@ -14,13 +14,13 @@ import org.elasticsearch.compute.aggregation.GroupingAggregatorFunction; import org.elasticsearch.compute.aggregation.SeenGroupIds; import org.elasticsearch.compute.data.Block; -import org.elasticsearch.compute.data.IntArrayVector; import org.elasticsearch.compute.data.IntBlock; import org.elasticsearch.compute.data.IntVector; import org.elasticsearch.compute.data.LongBlock; import org.elasticsearch.compute.data.LongVector; import org.elasticsearch.compute.data.Page; import org.elasticsearch.compute.operator.DriverContext; +import org.elasticsearch.core.Releasable; import org.elasticsearch.core.Releasables; /** @@ -52,24 +52,30 @@ public void add(Page page, GroupingAggregatorFunction.AddInput addInput) { LongVector vector1 = block1.asVector(); LongVector vector2 = block2.asVector(); if (vector1 != null && vector2 != null) { - addInput.add(0, add(vector1, vector2)); + try (IntVector groupIds = add(vector1, vector2)) { + addInput.add(0, groupIds); + } } else { - new AddBlock(block1, block2, addInput).add(); + try (var addBlock = new AddBlock(block1, block2, addInput)) { + addBlock.add(); + } } } private IntVector add(LongVector vector1, LongVector vector2) { int positions = vector1.getPositionCount(); - final int[] ords = new int[positions]; - for (int i = 0; i < positions; i++) { - ords[i] = Math.toIntExact(hashOrdToGroup(hash.add(vector1.getLong(i), vector2.getLong(i)))); + try (var builder = IntVector.newVectorFixedBuilder(positions, blockFactory)) { + for (int i = 0; i < positions; i++) { + builder.appendInt(Math.toIntExact(hashOrdToGroup(hash.add(vector1.getLong(i), vector2.getLong(i))))); + } + return builder.build(); } - return new IntArrayVector(ords, positions); } private static final long[] EMPTY = new long[0]; - private class AddBlock extends AbstractAddBlock { + // TODO: this uses the non-breaking block factory - update to use this blockFactory + private class AddBlock extends AbstractAddBlock implements Releasable { private final LongBlock block1; private final LongBlock block2; @@ -131,6 +137,11 @@ void add() { } emitOrds(); } + + @Override + public void close() { + Releasables.closeExpectNoException(block1, block2); + } } static class AbstractAddBlock { @@ -186,8 +197,8 @@ static int add(long[] seen, int nextSeen, long v) { @Override public Block[] getKeys() { int positions = (int) hash.size(); - LongVector.Builder keys1 = LongVector.newVectorBuilder(positions); - LongVector.Builder keys2 = LongVector.newVectorBuilder(positions); + LongVector.Builder keys1 = blockFactory.newLongVectorBuilder(positions); + LongVector.Builder keys2 = blockFactory.newLongVectorBuilder(positions); for (long i = 0; i < positions; i++) { keys1.appendLong(hash.getKey1(i)); keys2.appendLong(hash.getKey2(i)); diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/aggregation/blockhash/PackedValuesBlockHash.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/aggregation/blockhash/PackedValuesBlockHash.java index c3af78cc6965..7274b0814404 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/aggregation/blockhash/PackedValuesBlockHash.java +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/aggregation/blockhash/PackedValuesBlockHash.java @@ -23,6 +23,7 @@ import org.elasticsearch.compute.operator.DriverContext; import org.elasticsearch.compute.operator.HashAggregationOperator; import org.elasticsearch.compute.operator.MultivalueDedupe; +import org.elasticsearch.core.Releasables; import java.util.Arrays; import java.util.List; @@ -202,7 +203,7 @@ public Block[] getKeys() { for (int g = 0; g < builders.length; g++) { ElementType elementType = groups[g].spec.elementType(); decoders[g] = BatchEncoder.decoder(elementType); - builders[g] = elementType.newBlockBuilder(size); + builders[g] = elementType.newBlockBuilder(size, blockFactory); } BytesRef[] values = new BytesRef[(int) Math.min(100, bytesRefHash.size())]; @@ -236,6 +237,7 @@ public Block[] getKeys() { for (int g = 0; g < keyBlocks.length; g++) { keyBlocks[g] = builders[g].build(); } + Releasables.closeExpectNoException(builders); return keyBlocks; } diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/BlockUtils.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/BlockUtils.java index 87a8d0bc3251..06fcae9d8320 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/BlockUtils.java +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/BlockUtils.java @@ -143,10 +143,10 @@ public static Block deepCopyOf(Block block) { } public static Block deepCopyOf(Block block, BlockFactory blockFactory) { - // TODO: plumb in blockFactory - Block.Builder builder = block.elementType().newBlockBuilder(block.getPositionCount()); - builder.copyFrom(block, 0, block.getPositionCount()); - return builder.build(); + try (Block.Builder builder = block.elementType().newBlockBuilder(block.getPositionCount(), blockFactory)) { + builder.copyFrom(block, 0, block.getPositionCount()); + return builder.build(); + } } private static Class type(List> list, int i) { diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/X-Vector.java.st b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/X-Vector.java.st index 8be7024757a9..9f29f3e25fe9 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/X-Vector.java.st +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/X-Vector.java.st @@ -175,12 +175,12 @@ $endif$ $if(int)$ /** Create a vector for a range of ints. */ - static IntVector range(int startInclusive, int endExclusive) { + static IntVector range(int startInclusive, int endExclusive, BlockFactory blockFactory) { int[] values = new int[endExclusive - startInclusive]; for (int i = 0; i < values.length; i++) { values[i] = startInclusive + i; } - return new IntArrayVector(values, values.length); + return blockFactory.newIntArrayVector(values, values.length); } $endif$ diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/Driver.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/Driver.java index d0a35e6f6835..01862288e8f2 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/Driver.java +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/Driver.java @@ -195,7 +195,7 @@ private SubscribableListener runSingleLoopIteration() { } catch (Throwable t) { releasePageBlocksWhileHandlingException(page); throw t; - } + } } if (op.isFinished()) { diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/HashAggregationOperator.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/HashAggregationOperator.java index 13255971ce7a..7c5e2087bf0d 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/HashAggregationOperator.java +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/HashAggregationOperator.java @@ -158,9 +158,9 @@ public void finish() { } finally { // selected should always be closed // TODO: is selected exposed through intermediate aggs? - Releasables.closeExpectNoException(selected); // TODO; null check? - if (success == false) { - Releasables.closeExpectNoException(blocks); // TODO; null check? + Releasables.closeExpectNoException(selected); + if (success == false && blocks != null) { + Releasables.closeExpectNoException(blocks); } } } diff --git a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/aggregation/GroupingAggregatorFunctionTestCase.java b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/aggregation/GroupingAggregatorFunctionTestCase.java index 3cce39f2d90a..0bf09486169b 100644 --- a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/aggregation/GroupingAggregatorFunctionTestCase.java +++ b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/aggregation/GroupingAggregatorFunctionTestCase.java @@ -160,6 +160,7 @@ public final void testNullGroups() { DriverContext driverContext = driverContext(); int end = between(50, 60); List input = CannedSourceOperator.collectPages(nullGroups(simpleInput(driverContext.blockFactory(), end))); + List origInput = BlockTestUtils.deepCopyOf(input, BlockFactory.getNonBreakingInstance()); List results = drive(simple(nonBreakingBigArrays().withCircuitBreaking()).get(driverContext), input.iterator()); assertSimpleOutput(origInput, results); } @@ -227,6 +228,7 @@ public final void testMultivalued() { DriverContext driverContext = driverContext(); int end = between(1_000, 100_000); List input = CannedSourceOperator.collectPages(mergeValues(simpleInput(driverContext.blockFactory(), end))); + List origInput = BlockTestUtils.deepCopyOf(input, BlockFactory.getNonBreakingInstance()); List results = drive(simple(nonBreakingBigArrays().withCircuitBreaking()).get(driverContext), input.iterator()); assertSimpleOutput(origInput, results); } @@ -237,6 +239,7 @@ public final void testMulitvaluedNullGroupsAndValues() { List input = CannedSourceOperator.collectPages( new NullInsertingSourceOperator(mergeValues(simpleInput(driverContext.blockFactory(), end))) ); + List origInput = BlockTestUtils.deepCopyOf(input, BlockFactory.getNonBreakingInstance()); List results = drive(simple(nonBreakingBigArrays().withCircuitBreaking()).get(driverContext), input.iterator()); assertSimpleOutput(origInput, results); } @@ -245,6 +248,7 @@ public final void testMulitvaluedNullGroup() { DriverContext driverContext = driverContext(); int end = between(50, 60); List input = CannedSourceOperator.collectPages(nullGroups(mergeValues(simpleInput(driverContext.blockFactory(), end)))); + List origInput = BlockTestUtils.deepCopyOf(input, BlockFactory.getNonBreakingInstance()); List results = drive(simple(nonBreakingBigArrays().withCircuitBreaking()).get(driverContext), input.iterator()); assertSimpleOutput(origInput, results); } @@ -253,6 +257,7 @@ public final void testMulitvaluedNullValues() { DriverContext driverContext = driverContext(); int end = between(50, 60); List input = CannedSourceOperator.collectPages(nullValues(mergeValues(simpleInput(driverContext.blockFactory(), end)))); + List origInput = BlockTestUtils.deepCopyOf(input, BlockFactory.getNonBreakingInstance()); List results = drive(simple(nonBreakingBigArrays().withCircuitBreaking()).get(driverContext), input.iterator()); assertSimpleOutput(origInput, results); } diff --git a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/aggregation/blockhash/BlockHashTests.java b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/aggregation/blockhash/BlockHashTests.java index 0667d54a27ed..78f4f5d42898 100644 --- a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/aggregation/blockhash/BlockHashTests.java +++ b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/aggregation/blockhash/BlockHashTests.java @@ -56,19 +56,18 @@ public class BlockHashTests extends ESTestCase { final CircuitBreaker breaker = new MockBigArrays.LimitedBreaker("esql-test-breaker", ByteSizeValue.ofGb(1)); final BigArrays bigArrays = new MockBigArrays(PageCacheRecycler.NON_RECYCLING_INSTANCE, mockBreakerService(breaker)); final BlockFactory blockFactory = BlockFactory.getInstance(breaker, bigArrays); - final BlockFactory nonBreakingFactory = BlockFactory.getNonBreakingInstance(); @ParametersFactory public static List params() { List params = new ArrayList<>(); - params.add(new Object[]{false}); - //params.add(new Object[]{true}); + params.add(new Object[] { false }); + params.add(new Object[] { true }); return params; } @After public void checkBreaker() { - // assertThat(breaker.getUsed(), is(0L)); // TODO: enable once all blocks are released + assertThat(breaker.getUsed(), is(0L)); // TODO: enable once all blocks are released } private final boolean forcePackedHash; @@ -78,8 +77,8 @@ public BlockHashTests(@Named("forcePackedHash") boolean forcePackedHash) { } public void testIntHash() { - int[] values = new int[]{1, 2, 3, 1, 2, 3, 1, 2, 3}; - IntBlock block = nonBreakingFactory.newIntArrayVector(values, values.length).asBlock(); + int[] values = new int[] { 1, 2, 3, 1, 2, 3, 1, 2, 3 }; + IntBlock block = BlockFactory.getNonBreakingInstance().newIntArrayVector(values, values.length).asBlock(); OrdsAndKeys ordsAndKeys = hash(block); if (forcePackedHash) { @@ -92,11 +91,14 @@ public void testIntHash() { assertThat(ordsAndKeys.nonEmpty, equalTo(intRange(1, 4))); } assertKeys(ordsAndKeys.keys, 1, 2, 3); + // we close these explicitly in the test. In common operation the operator is in charge of closing these. + Releasables.closeExpectNoException(ordsAndKeys.keys); + Releasables.closeExpectNoException(ordsAndKeys.nonEmpty); assertThat(breaker.getUsed(), is(0L)); } public void testIntHashWithNulls() { - IntBlock.Builder builder = IntBlock.newBlockBuilder(4); + IntBlock.Builder builder = BlockFactory.getNonBreakingInstance().newIntBlockBuilder(4); builder.appendInt(0); builder.appendNull(); builder.appendInt(2); @@ -113,10 +115,13 @@ public void testIntHashWithNulls() { assertKeys(ordsAndKeys.keys, null, 0, 2); } assertThat(ordsAndKeys.nonEmpty, equalTo(intRange(0, 3))); + Releasables.closeExpectNoException(ordsAndKeys.keys); + Releasables.closeExpectNoException(ordsAndKeys.nonEmpty); + assertThat(breaker.getUsed(), is(0L)); } public void testIntHashWithMultiValuedFields() { - var builder = IntBlock.newBlockBuilder(8); + var builder = BlockFactory.getNonBreakingInstance().newIntBlockBuilder(8); builder.appendInt(1); builder.beginPositionEntry(); builder.appendInt(1); @@ -142,33 +147,36 @@ public void testIntHashWithMultiValuedFields() { assertThat(ordsAndKeys.description, startsWith("PackedValuesBlockHash{groups=[0:INT], entries=4, size=")); assertOrds( ordsAndKeys.ords, - new int[]{0}, - new int[]{0, 1}, - new int[]{2, 0}, - new int[]{2}, - new int[]{3}, - new int[]{2, 1, 0} + new int[] { 0 }, + new int[] { 0, 1 }, + new int[] { 2, 0 }, + new int[] { 2 }, + new int[] { 3 }, + new int[] { 2, 1, 0 } ); assertKeys(ordsAndKeys.keys, 1, 2, 3, null); } else { assertThat(ordsAndKeys.description, equalTo("IntBlockHash{channel=0, entries=3, seenNull=true}")); assertOrds( ordsAndKeys.ords, - new int[]{1}, - new int[]{1, 2}, - new int[]{3, 1}, - new int[]{3}, - new int[]{0}, - new int[]{3, 2, 1} + new int[] { 1 }, + new int[] { 1, 2 }, + new int[] { 3, 1 }, + new int[] { 3 }, + new int[] { 0 }, + new int[] { 3, 2, 1 } ); assertKeys(ordsAndKeys.keys, null, 1, 2, 3); } assertThat(ordsAndKeys.nonEmpty, equalTo(intRange(0, 4))); + Releasables.closeExpectNoException(ordsAndKeys.keys); + Releasables.closeExpectNoException(ordsAndKeys.nonEmpty); + assertThat(breaker.getUsed(), is(0L)); } public void testLongHash() { - long[] values = new long[]{2, 1, 4, 2, 4, 1, 3, 4}; - LongBlock block = blockFactory.newLongArrayVector(values, values.length).asBlock(); + long[] values = new long[] { 2, 1, 4, 2, 4, 1, 3, 4 }; + LongBlock block = BlockFactory.getNonBreakingInstance().newLongArrayVector(values, values.length).asBlock(); OrdsAndKeys ordsAndKeys = hash(block); if (forcePackedHash) { @@ -181,11 +189,13 @@ public void testLongHash() { assertThat(ordsAndKeys.nonEmpty, equalTo(intRange(1, 5))); } assertKeys(ordsAndKeys.keys, 2L, 1L, 4L, 3L); - Releasables.closeExpectNoException(block); + Releasables.closeExpectNoException(ordsAndKeys.keys); + Releasables.closeExpectNoException(ordsAndKeys.nonEmpty); + assertThat(breaker.getUsed(), is(0L)); } public void testLongHashWithNulls() { - LongBlock.Builder builder = LongBlock.newBlockBuilder(4); + LongBlock.Builder builder = BlockFactory.getNonBreakingInstance().newLongBlockBuilder(4); builder.appendLong(0); builder.appendNull(); builder.appendLong(2); @@ -202,10 +212,13 @@ public void testLongHashWithNulls() { assertKeys(ordsAndKeys.keys, null, 0L, 2L); } assertThat(ordsAndKeys.nonEmpty, equalTo(intRange(0, 3))); + Releasables.closeExpectNoException(ordsAndKeys.keys); + Releasables.closeExpectNoException(ordsAndKeys.nonEmpty); + assertThat(breaker.getUsed(), is(0L)); } public void testLongHashWithMultiValuedFields() { - var builder = LongBlock.newBlockBuilder(8); + var builder = BlockFactory.getNonBreakingInstance().newLongBlockBuilder(8); builder.appendLong(1); builder.beginPositionEntry(); builder.appendLong(1); @@ -232,34 +245,36 @@ public void testLongHashWithMultiValuedFields() { assertThat(ordsAndKeys.description, startsWith("PackedValuesBlockHash{groups=[0:LONG], entries=4, size=")); assertOrds( ordsAndKeys.ords, - new int[]{0}, - new int[]{0, 1, 2}, - new int[]{0}, - new int[]{2}, - new int[]{3}, - new int[]{2, 1, 0} + new int[] { 0 }, + new int[] { 0, 1, 2 }, + new int[] { 0 }, + new int[] { 2 }, + new int[] { 3 }, + new int[] { 2, 1, 0 } ); assertKeys(ordsAndKeys.keys, 1L, 2L, 3L, null); } else { assertThat(ordsAndKeys.description, equalTo("LongBlockHash{channel=0, entries=3, seenNull=true}")); assertOrds( ordsAndKeys.ords, - new int[]{1}, - new int[]{1, 2, 3}, - new int[]{1}, - new int[]{3}, - new int[]{0}, - new int[]{3, 2, 1} + new int[] { 1 }, + new int[] { 1, 2, 3 }, + new int[] { 1 }, + new int[] { 3 }, + new int[] { 0 }, + new int[] { 3, 2, 1 } ); assertKeys(ordsAndKeys.keys, null, 1L, 2L, 3L); } assertThat(ordsAndKeys.nonEmpty, equalTo(intRange(0, 4))); - Releasables.closeExpectNoException(block); + Releasables.closeExpectNoException(ordsAndKeys.keys); + Releasables.closeExpectNoException(ordsAndKeys.nonEmpty); + assertThat(breaker.getUsed(), is(0L)); } public void testDoubleHash() { - double[] values = new double[]{2.0, 1.0, 4.0, 2.0, 4.0, 1.0, 3.0, 4.0}; - DoubleBlock block = blockFactory.newDoubleArrayVector(values, values.length).asBlock(); + double[] values = new double[] { 2.0, 1.0, 4.0, 2.0, 4.0, 1.0, 3.0, 4.0 }; + DoubleBlock block = BlockFactory.getNonBreakingInstance().newDoubleArrayVector(values, values.length).asBlock(); OrdsAndKeys ordsAndKeys = hash(block); if (forcePackedHash) { @@ -272,11 +287,14 @@ public void testDoubleHash() { assertThat(ordsAndKeys.nonEmpty, equalTo(intRange(1, 5))); } assertKeys(ordsAndKeys.keys, 2.0, 1.0, 4.0, 3.0); - Releasables.closeExpectNoException(block); + // we close these explicitly in the test. In common operation the operator is in charge of closing these. + Releasables.closeExpectNoException(ordsAndKeys.keys); + Releasables.closeExpectNoException(ordsAndKeys.nonEmpty); + assertThat(breaker.getUsed(), is(0L)); } public void testDoubleHashWithNulls() { - DoubleBlock.Builder builder = DoubleBlock.newBlockBuilder(4); + DoubleBlock.Builder builder = BlockFactory.getNonBreakingInstance().newDoubleBlockBuilder(4); builder.appendDouble(0); builder.appendNull(); builder.appendDouble(2); @@ -294,11 +312,13 @@ public void testDoubleHashWithNulls() { assertKeys(ordsAndKeys.keys, null, 0.0, 2.0); } assertThat(ordsAndKeys.nonEmpty, equalTo(intRange(0, 3))); - Releasables.closeExpectNoException(block); + Releasables.closeExpectNoException(ordsAndKeys.keys); + Releasables.closeExpectNoException(ordsAndKeys.nonEmpty); + assertThat(breaker.getUsed(), is(0L)); } public void testDoubleHashWithMultiValuedFields() { - var builder = DoubleBlock.newBlockBuilder(8); + var builder = BlockFactory.getNonBreakingInstance().newDoubleBlockBuilder(8); builder.appendDouble(1); builder.beginPositionEntry(); builder.appendDouble(2); @@ -324,33 +344,35 @@ public void testDoubleHashWithMultiValuedFields() { assertThat(ordsAndKeys.description, startsWith("PackedValuesBlockHash{groups=[0:DOUBLE], entries=4, size=")); assertOrds( ordsAndKeys.ords, - new int[]{0}, - new int[]{1, 2}, - new int[]{2, 1}, - new int[]{0}, - new int[]{3}, - new int[]{0, 1} + new int[] { 0 }, + new int[] { 1, 2 }, + new int[] { 2, 1 }, + new int[] { 0 }, + new int[] { 3 }, + new int[] { 0, 1 } ); assertKeys(ordsAndKeys.keys, 1.0, 2.0, 3.0, null); } else { assertThat(ordsAndKeys.description, equalTo("DoubleBlockHash{channel=0, entries=3, seenNull=true}")); assertOrds( ordsAndKeys.ords, - new int[]{1}, - new int[]{2, 3}, - new int[]{3, 2}, - new int[]{1}, - new int[]{0}, - new int[]{1, 2} + new int[] { 1 }, + new int[] { 2, 3 }, + new int[] { 3, 2 }, + new int[] { 1 }, + new int[] { 0 }, + new int[] { 1, 2 } ); assertKeys(ordsAndKeys.keys, null, 1.0, 2.0, 3.0); } assertThat(ordsAndKeys.nonEmpty, equalTo(intRange(0, 4))); - Releasables.closeExpectNoException(block); + Releasables.closeExpectNoException(ordsAndKeys.keys); + Releasables.closeExpectNoException(ordsAndKeys.nonEmpty); + assertThat(breaker.getUsed(), is(0L)); } public void testBasicBytesRefHash() { - var builder = blockFactory.newBytesRefBlockBuilder(8); + var builder = BlockFactory.getNonBreakingInstance().newBytesRefBlockBuilder(8); builder.appendBytesRef(new BytesRef("item-2")); builder.appendBytesRef(new BytesRef("item-1")); builder.appendBytesRef(new BytesRef("item-4")); @@ -374,11 +396,13 @@ public void testBasicBytesRefHash() { assertThat(ordsAndKeys.nonEmpty, equalTo(intRange(1, 5))); } assertKeys(ordsAndKeys.keys, "item-2", "item-1", "item-4", "item-3"); - Releasables.closeExpectNoException(block); + Releasables.closeExpectNoException(ordsAndKeys.keys); + Releasables.closeExpectNoException(ordsAndKeys.nonEmpty); + assertThat(breaker.getUsed(), is(0L)); } public void testBytesRefHashWithNulls() { - BytesRefBlock.Builder builder = BytesRefBlock.newBlockBuilder(4); + BytesRefBlock.Builder builder = BlockFactory.getNonBreakingInstance().newBytesRefBlockBuilder(4); builder.appendBytesRef(new BytesRef("cat")); builder.appendNull(); builder.appendBytesRef(new BytesRef("dog")); @@ -398,11 +422,13 @@ public void testBytesRefHashWithNulls() { assertKeys(ordsAndKeys.keys, null, "cat", "dog"); } assertThat(ordsAndKeys.nonEmpty, equalTo(intRange(0, 3))); - Releasables.closeExpectNoException(block); + Releasables.closeExpectNoException(ordsAndKeys.keys); + Releasables.closeExpectNoException(ordsAndKeys.nonEmpty); + assertThat(breaker.getUsed(), is(0L)); } public void testBytesRefHashWithMultiValuedFields() { - var builder = BytesRefBlock.newBlockBuilder(8); + var builder = BlockFactory.getNonBreakingInstance().newBytesRefBlockBuilder(8); builder.appendBytesRef(new BytesRef("foo")); builder.beginPositionEntry(); builder.appendBytesRef(new BytesRef("foo")); @@ -430,12 +456,12 @@ public void testBytesRefHashWithMultiValuedFields() { assertThat(ordsAndKeys.description, endsWith("b}")); assertOrds( ordsAndKeys.ords, - new int[]{0}, - new int[]{0, 1}, - new int[]{1, 2}, - new int[]{2, 1}, - new int[]{3}, - new int[]{2, 1} + new int[] { 0 }, + new int[] { 0, 1 }, + new int[] { 1, 2 }, + new int[] { 2, 1 }, + new int[] { 3 }, + new int[] { 2, 1 } ); assertKeys(ordsAndKeys.keys, "foo", "bar", "bort", null); } else { @@ -443,22 +469,24 @@ public void testBytesRefHashWithMultiValuedFields() { assertThat(ordsAndKeys.description, endsWith("b, seenNull=true}")); assertOrds( ordsAndKeys.ords, - new int[]{1}, - new int[]{1, 2}, - new int[]{2, 3}, - new int[]{3, 2}, - new int[]{0}, - new int[]{3, 2} + new int[] { 1 }, + new int[] { 1, 2 }, + new int[] { 2, 3 }, + new int[] { 3, 2 }, + new int[] { 0 }, + new int[] { 3, 2 } ); assertKeys(ordsAndKeys.keys, null, "foo", "bar", "bort"); } assertThat(ordsAndKeys.nonEmpty, equalTo(intRange(0, 4))); - Releasables.closeExpectNoException(block); + Releasables.closeExpectNoException(ordsAndKeys.keys); + Releasables.closeExpectNoException(ordsAndKeys.nonEmpty); + assertThat(breaker.getUsed(), is(0L)); } public void testBooleanHashFalseFirst() { - boolean[] values = new boolean[]{false, true, true, true, true}; - BooleanBlock block = blockFactory.newBooleanArrayVector(values, values.length).asBlock(); + boolean[] values = new boolean[] { false, true, true, true, true }; + BooleanBlock block = BlockFactory.getNonBreakingInstance().newBooleanArrayVector(values, values.length).asBlock(); OrdsAndKeys ordsAndKeys = hash(block); if (forcePackedHash) { @@ -471,12 +499,14 @@ public void testBooleanHashFalseFirst() { assertThat(ordsAndKeys.nonEmpty, equalTo(intRange(1, 3))); } assertKeys(ordsAndKeys.keys, false, true); - Releasables.closeExpectNoException(block); + Releasables.closeExpectNoException(ordsAndKeys.keys); + Releasables.closeExpectNoException(ordsAndKeys.nonEmpty); + assertThat(breaker.getUsed(), is(0L)); } public void testBooleanHashTrueFirst() { - boolean[] values = new boolean[]{true, false, false, true, true}; - BooleanBlock block = blockFactory.newBooleanArrayVector(values, values.length).asBlock(); + boolean[] values = new boolean[] { true, false, false, true, true }; + BooleanBlock block = BlockFactory.getNonBreakingInstance().newBooleanArrayVector(values, values.length).asBlock(); OrdsAndKeys ordsAndKeys = hash(block); if (forcePackedHash) { @@ -490,12 +520,14 @@ public void testBooleanHashTrueFirst() { assertKeys(ordsAndKeys.keys, false, true); assertThat(ordsAndKeys.nonEmpty, equalTo(intRange(1, 3))); } - Releasables.closeExpectNoException(block); + Releasables.closeExpectNoException(ordsAndKeys.keys); + Releasables.closeExpectNoException(ordsAndKeys.nonEmpty); + assertThat(breaker.getUsed(), is(0L)); } public void testBooleanHashTrueOnly() { - boolean[] values = new boolean[]{true, true, true, true}; - BooleanBlock block = blockFactory.newBooleanArrayVector(values, values.length).asBlock(); + boolean[] values = new boolean[] { true, true, true, true }; + BooleanBlock block = BlockFactory.getNonBreakingInstance().newBooleanArrayVector(values, values.length).asBlock(); OrdsAndKeys ordsAndKeys = hash(block); if (forcePackedHash) { @@ -509,12 +541,14 @@ public void testBooleanHashTrueOnly() { assertKeys(ordsAndKeys.keys, true); assertThat(ordsAndKeys.nonEmpty, equalTo(IntVector.newVectorBuilder(1).appendInt(2).build())); } - Releasables.closeExpectNoException(block); + Releasables.closeExpectNoException(ordsAndKeys.keys); + Releasables.closeExpectNoException(ordsAndKeys.nonEmpty); + assertThat(breaker.getUsed(), is(0L)); } public void testBooleanHashFalseOnly() { - boolean[] values = new boolean[]{false, false, false, false}; - BooleanBlock block = blockFactory.newBooleanArrayVector(values, values.length).asBlock(); + boolean[] values = new boolean[] { false, false, false, false }; + BooleanBlock block = BlockFactory.getNonBreakingInstance().newBooleanArrayVector(values, values.length).asBlock(); OrdsAndKeys ordsAndKeys = hash(block); if (forcePackedHash) { @@ -527,11 +561,13 @@ public void testBooleanHashFalseOnly() { assertThat(ordsAndKeys.nonEmpty, equalTo(IntVector.newVectorBuilder(1).appendInt(1).build())); } assertKeys(ordsAndKeys.keys, false); - Releasables.closeExpectNoException(block); + Releasables.closeExpectNoException(ordsAndKeys.keys); + Releasables.closeExpectNoException(ordsAndKeys.nonEmpty); + assertThat(breaker.getUsed(), is(0L)); } public void testBooleanHashWithNulls() { - BooleanBlock.Builder builder = BooleanBlock.newBlockBuilder(4); + BooleanBlock.Builder builder = BlockFactory.getNonBreakingInstance().newBooleanBlockBuilder(4); builder.appendBoolean(false); builder.appendNull(); builder.appendBoolean(true); @@ -549,11 +585,13 @@ public void testBooleanHashWithNulls() { assertKeys(ordsAndKeys.keys, null, false, true); } assertThat(ordsAndKeys.nonEmpty, equalTo(intRange(0, 3))); - Releasables.closeExpectNoException(block); + Releasables.closeExpectNoException(ordsAndKeys.keys); + Releasables.closeExpectNoException(ordsAndKeys.nonEmpty); + assertThat(breaker.getUsed(), is(0L)); } public void testBooleanHashWithMultiValuedFields() { - var builder = BooleanBlock.newBlockBuilder(8); + var builder = BlockFactory.getNonBreakingInstance().newBooleanBlockBuilder(8); builder.appendBoolean(false); builder.beginPositionEntry(); builder.appendBoolean(false); @@ -579,37 +617,39 @@ public void testBooleanHashWithMultiValuedFields() { assertThat(ordsAndKeys.description, startsWith("PackedValuesBlockHash{groups=[0:BOOLEAN], entries=3, size=")); assertOrds( ordsAndKeys.ords, - new int[]{0}, - new int[]{0, 1}, - new int[]{0, 1}, // Order is not preserved - new int[]{1}, - new int[]{2}, - new int[]{0, 1} + new int[] { 0 }, + new int[] { 0, 1 }, + new int[] { 0, 1 }, // Order is not preserved + new int[] { 1 }, + new int[] { 2 }, + new int[] { 0, 1 } ); assertKeys(ordsAndKeys.keys, false, true, null); } else { assertThat(ordsAndKeys.description, equalTo("BooleanBlockHash{channel=0, seenFalse=true, seenTrue=true, seenNull=true}")); assertOrds( ordsAndKeys.ords, - new int[]{1}, - new int[]{1, 2}, - new int[]{1, 2}, // Order is not preserved - new int[]{2}, - new int[]{0}, - new int[]{1, 2} + new int[] { 1 }, + new int[] { 1, 2 }, + new int[] { 1, 2 }, // Order is not preserved + new int[] { 2 }, + new int[] { 0 }, + new int[] { 1, 2 } ); assertKeys(ordsAndKeys.keys, null, false, true); } assertThat(ordsAndKeys.nonEmpty, equalTo(intRange(0, 3))); - Releasables.closeExpectNoException(block); + Releasables.closeExpectNoException(ordsAndKeys.keys); + Releasables.closeExpectNoException(ordsAndKeys.nonEmpty); + assertThat(breaker.getUsed(), is(0L)); } public void testLongLongHash() { - long[] values1 = new long[]{0, 1, 0, 1, 0, 1}; - LongBlock block1 = blockFactory.newLongArrayVector(values1, values1.length).asBlock(); - long[] values2 = new long[]{0, 0, 0, 1, 1, 1}; - LongBlock block2 = blockFactory.newLongArrayVector(values2, values2.length).asBlock(); - Object[][] expectedKeys = {new Object[]{0L, 0L}, new Object[]{1L, 0L}, new Object[]{1L, 1L}, new Object[]{0L, 1L}}; + long[] values1 = new long[] { 0, 1, 0, 1, 0, 1 }; + LongBlock block1 = BlockFactory.getNonBreakingInstance().newLongArrayVector(values1, values1.length).asBlock(); + long[] values2 = new long[] { 0, 0, 0, 1, 1, 1 }; + LongBlock block2 = BlockFactory.getNonBreakingInstance().newLongArrayVector(values2, values2.length).asBlock(); + Object[][] expectedKeys = { new Object[] { 0L, 0L }, new Object[] { 1L, 0L }, new Object[] { 1L, 1L }, new Object[] { 0L, 1L } }; OrdsAndKeys ordsAndKeys = hash(block1, block2); assertThat( @@ -621,7 +661,9 @@ public void testLongLongHash() { assertOrds(ordsAndKeys.ords, 0, 1, 0, 2, 3, 2); assertKeys(ordsAndKeys.keys, expectedKeys); assertThat(ordsAndKeys.nonEmpty, equalTo(intRange(0, 4))); - Releasables.closeExpectNoException(block1, block2); + Releasables.closeExpectNoException(ordsAndKeys.keys); + Releasables.closeExpectNoException(ordsAndKeys.nonEmpty); + assertThat(breaker.getUsed(), is(0L)); } private void append(LongBlock.Builder b1, LongBlock.Builder b2, long[] v1, long[] v2) { @@ -650,17 +692,17 @@ private void append(LongBlock.Builder b1, LongBlock.Builder b2, long[] v1, long[ } public void testLongLongHashWithMultiValuedFields() { - var b1 = LongBlock.newBlockBuilder(8); - var b2 = LongBlock.newBlockBuilder(8); - append(b1, b2, new long[]{1, 2}, new long[]{10, 20}); - append(b1, b2, new long[]{1, 2}, new long[]{10}); - append(b1, b2, new long[]{1}, new long[]{10, 20}); - append(b1, b2, new long[]{1}, new long[]{10}); - append(b1, b2, null, new long[]{10}); - append(b1, b2, new long[]{1}, null); - append(b1, b2, new long[]{1, 1, 1}, new long[]{10, 10, 10}); - append(b1, b2, new long[]{1, 1, 2, 2}, new long[]{10, 20, 20}); - append(b1, b2, new long[]{1, 2, 3}, new long[]{30, 30, 10}); + var b1 = BlockFactory.getNonBreakingInstance().newLongBlockBuilder(8); + var b2 = BlockFactory.getNonBreakingInstance().newLongBlockBuilder(8); + append(b1, b2, new long[] { 1, 2 }, new long[] { 10, 20 }); + append(b1, b2, new long[] { 1, 2 }, new long[] { 10 }); + append(b1, b2, new long[] { 1 }, new long[] { 10, 20 }); + append(b1, b2, new long[] { 1 }, new long[] { 10 }); + append(b1, b2, null, new long[] { 10 }); + append(b1, b2, new long[] { 1 }, null); + append(b1, b2, new long[] { 1, 1, 1 }, new long[] { 10, 10, 10 }); + append(b1, b2, new long[] { 1, 1, 2, 2 }, new long[] { 10, 20, 20 }); + append(b1, b2, new long[] { 1, 2, 3 }, new long[] { 30, 30, 10 }); Block block1 = b1.build(); Block block2 = b2.build(); @@ -669,69 +711,73 @@ public void testLongLongHashWithMultiValuedFields() { assertThat(ordsAndKeys.description, startsWith("PackedValuesBlockHash{groups=[0:LONG, 1:LONG], entries=10, size=")); assertOrds( ordsAndKeys.ords, - new int[]{0, 1, 2, 3}, - new int[]{0, 2}, - new int[]{0, 1}, - new int[]{0}, - new int[]{4}, - new int[]{5}, - new int[]{0}, - new int[]{0, 1, 2, 3}, - new int[]{6, 0, 7, 2, 8, 9} + new int[] { 0, 1, 2, 3 }, + new int[] { 0, 2 }, + new int[] { 0, 1 }, + new int[] { 0 }, + new int[] { 4 }, + new int[] { 5 }, + new int[] { 0 }, + new int[] { 0, 1, 2, 3 }, + new int[] { 6, 0, 7, 2, 8, 9 } ); assertKeys( ordsAndKeys.keys, - new Object[][]{ - new Object[]{1L, 10L}, - new Object[]{1L, 20L}, - new Object[]{2L, 10L}, - new Object[]{2L, 20L}, - new Object[]{null, 10L}, - new Object[]{1L, null}, - new Object[]{1L, 30L}, - new Object[]{2L, 30L}, - new Object[]{3L, 30L}, - new Object[]{3L, 10L},} + new Object[][] { + new Object[] { 1L, 10L }, + new Object[] { 1L, 20L }, + new Object[] { 2L, 10L }, + new Object[] { 2L, 20L }, + new Object[] { null, 10L }, + new Object[] { 1L, null }, + new Object[] { 1L, 30L }, + new Object[] { 2L, 30L }, + new Object[] { 3L, 30L }, + new Object[] { 3L, 10L }, } ); assertThat(ordsAndKeys.nonEmpty, equalTo(intRange(0, 10))); } else { assertThat(ordsAndKeys.description, equalTo("LongLongBlockHash{channels=[0,1], entries=8}")); assertOrds( ordsAndKeys.ords, - new int[]{0, 1, 2, 3}, - new int[]{0, 2}, - new int[]{0, 1}, - new int[]{0}, + new int[] { 0, 1, 2, 3 }, + new int[] { 0, 2 }, + new int[] { 0, 1 }, + new int[] { 0 }, null, null, - new int[]{0}, - new int[]{0, 1, 2, 3}, - new int[]{4, 0, 5, 2, 6, 7} + new int[] { 0 }, + new int[] { 0, 1, 2, 3 }, + new int[] { 4, 0, 5, 2, 6, 7 } ); assertKeys( ordsAndKeys.keys, - new Object[][]{ - new Object[]{1L, 10L}, - new Object[]{1L, 20L}, - new Object[]{2L, 10L}, - new Object[]{2L, 20L}, - new Object[]{1L, 30L}, - new Object[]{2L, 30L}, - new Object[]{3L, 30L}, - new Object[]{3L, 10L},} + new Object[][] { + new Object[] { 1L, 10L }, + new Object[] { 1L, 20L }, + new Object[] { 2L, 10L }, + new Object[] { 2L, 20L }, + new Object[] { 1L, 30L }, + new Object[] { 2L, 30L }, + new Object[] { 3L, 30L }, + new Object[] { 3L, 10L }, } ); assertThat(ordsAndKeys.nonEmpty, equalTo(intRange(0, 8))); - Releasables.closeExpectNoException(block1, block2); } + Releasables.closeExpectNoException(ordsAndKeys.keys); + Releasables.closeExpectNoException(ordsAndKeys.nonEmpty); + assertThat(breaker.getUsed(), is(0L)); } public void testLongLongHashHugeCombinatorialExplosion() { long[] v1 = LongStream.range(0, 5000).toArray(); long[] v2 = LongStream.range(100, 200).toArray(); - var b1 = LongBlock.newBlockBuilder(v1.length); - var b2 = LongBlock.newBlockBuilder(v2.length); + var b1 = BlockFactory.getNonBreakingInstance().newLongBlockBuilder(v1.length); + var b2 = BlockFactory.getNonBreakingInstance().newLongBlockBuilder(v2.length); append(b1, b2, v1, v2); + var block1 = b1.build(); + var block2 = b2.build(); int[] expectedEntries = new int[1]; int pageSize = between(1000, 16 * 1024); @@ -748,64 +794,75 @@ public void testLongLongHashHugeCombinatorialExplosion() { assertKeys( ordsAndKeys.keys, IntStream.range(0, expectedEntries[0]) - .mapToObj(i -> new Object[]{v1[i / v2.length], v2[i % v2.length]}) + .mapToObj(i -> new Object[] { v1[i / v2.length], v2[i % v2.length] }) .toArray(l -> new Object[l][]) ); assertThat(ordsAndKeys.nonEmpty, equalTo(intRange(0, expectedEntries[0]))); - }, pageSize, b1.build(), b2.build()); + Releasables.closeExpectNoException(ordsAndKeys.keys); + Releasables.closeExpectNoException(ordsAndKeys.nonEmpty); + }, pageSize, block1, block2); assertThat("misconfigured test", expectedEntries[0], greaterThan(0)); + assertThat(breaker.getUsed(), is(0L)); } public void testIntLongHash() { - int[] values1 = new int[]{0, 1, 0, 1, 0, 1}; - IntBlock block1 = blockFactory.newIntArrayVector(values1, values1.length).asBlock(); - long[] values2 = new long[]{0, 0, 0, 1, 1, 1}; - LongBlock block2 = blockFactory.newLongArrayVector(values2, values2.length).asBlock(); - Object[][] expectedKeys = {new Object[]{0, 0L}, new Object[]{1, 0L}, new Object[]{1, 1L}, new Object[]{0, 1L}}; + int[] values1 = new int[] { 0, 1, 0, 1, 0, 1 }; + IntBlock block1 = BlockFactory.getNonBreakingInstance().newIntArrayVector(values1, values1.length).asBlock(); + long[] values2 = new long[] { 0, 0, 0, 1, 1, 1 }; + LongBlock block2 = BlockFactory.getNonBreakingInstance().newLongArrayVector(values2, values2.length).asBlock(); + Object[][] expectedKeys = { new Object[] { 0, 0L }, new Object[] { 1, 0L }, new Object[] { 1, 1L }, new Object[] { 0, 1L } }; OrdsAndKeys ordsAndKeys = hash(block1, block2); assertThat(ordsAndKeys.description, startsWith("PackedValuesBlockHash{groups=[0:INT, 1:LONG], entries=4, size=")); assertThat(ordsAndKeys.description, endsWith("b}")); assertOrds(ordsAndKeys.ords, 0, 1, 0, 2, 3, 2); assertKeys(ordsAndKeys.keys, expectedKeys); - Releasables.closeExpectNoException(block1, block2); + Releasables.closeExpectNoException(ordsAndKeys.keys); + Releasables.closeExpectNoException(ordsAndKeys.nonEmpty); + assertThat(breaker.getUsed(), is(0L)); } public void testLongDoubleHash() { - long[] values1 = new long[]{0, 1, 0, 1, 0, 1}; - LongBlock block1 = blockFactory.newLongArrayVector(values1, values1.length).asBlock(); - double[] values2 = new double[]{0, 0, 0, 1, 1, 1}; - DoubleBlock block2 = blockFactory.newDoubleArrayVector(values2, values2.length).asBlock(); - Object[][] expectedKeys = {new Object[]{0L, 0d}, new Object[]{1L, 0d}, new Object[]{1L, 1d}, new Object[]{0L, 1d}}; + long[] values1 = new long[] { 0, 1, 0, 1, 0, 1 }; + LongBlock block1 = BlockFactory.getNonBreakingInstance().newLongArrayVector(values1, values1.length).asBlock(); + double[] values2 = new double[] { 0, 0, 0, 1, 1, 1 }; + DoubleBlock block2 = BlockFactory.getNonBreakingInstance().newDoubleArrayVector(values2, values2.length).asBlock(); + Object[][] expectedKeys = { new Object[] { 0L, 0d }, new Object[] { 1L, 0d }, new Object[] { 1L, 1d }, new Object[] { 0L, 1d } }; OrdsAndKeys ordsAndKeys = hash(block1, block2); assertThat(ordsAndKeys.description, startsWith("PackedValuesBlockHash{groups=[0:LONG, 1:DOUBLE], entries=4, size=")); assertThat(ordsAndKeys.description, endsWith("b}")); assertOrds(ordsAndKeys.ords, 0, 1, 0, 2, 3, 2); assertKeys(ordsAndKeys.keys, expectedKeys); + Releasables.closeExpectNoException(ordsAndKeys.keys); + Releasables.closeExpectNoException(ordsAndKeys.nonEmpty); + assertThat(breaker.getUsed(), is(0L)); } public void testIntBooleanHash() { - int[] values1 = new int[]{0, 1, 0, 1, 0, 1}; - IntBlock block1 = blockFactory.newIntArrayVector(values1, values1.length).asBlock(); - boolean[] values2 = new boolean[]{false, false, false, true, true, true}; - BooleanBlock block2 = blockFactory.newBooleanArrayVector(values2, values2.length).asBlock(); + int[] values1 = new int[] { 0, 1, 0, 1, 0, 1 }; + IntBlock block1 = BlockFactory.getNonBreakingInstance().newIntArrayVector(values1, values1.length).asBlock(); + boolean[] values2 = new boolean[] { false, false, false, true, true, true }; + BooleanBlock block2 = BlockFactory.getNonBreakingInstance().newBooleanArrayVector(values2, values2.length).asBlock(); Object[][] expectedKeys = { - new Object[]{0, false}, - new Object[]{1, false}, - new Object[]{1, true}, - new Object[]{0, true}}; + new Object[] { 0, false }, + new Object[] { 1, false }, + new Object[] { 1, true }, + new Object[] { 0, true } }; OrdsAndKeys ordsAndKeys = hash(block1, block2); assertThat(ordsAndKeys.description, startsWith("PackedValuesBlockHash{groups=[0:INT, 1:BOOLEAN], entries=4, size=")); assertThat(ordsAndKeys.description, endsWith("b}")); assertOrds(ordsAndKeys.ords, 0, 1, 0, 2, 3, 2); assertKeys(ordsAndKeys.keys, expectedKeys); + Releasables.closeExpectNoException(ordsAndKeys.keys); + Releasables.closeExpectNoException(ordsAndKeys.nonEmpty); + assertThat(breaker.getUsed(), is(0L)); } public void testLongLongHashWithNull() { - LongBlock.Builder b1 = LongBlock.newBlockBuilder(2); - LongBlock.Builder b2 = LongBlock.newBlockBuilder(2); + LongBlock.Builder b1 = BlockFactory.getNonBreakingInstance().newLongBlockBuilder(2); + LongBlock.Builder b2 = BlockFactory.getNonBreakingInstance().newLongBlockBuilder(2); b1.appendLong(1); b2.appendLong(0); b1.appendNull(); @@ -823,26 +880,28 @@ public void testLongLongHashWithNull() { assertOrds(ordsAndKeys.ords, 0, 1, 2, 3, 4); assertKeys( ordsAndKeys.keys, - new Object[][]{ - new Object[]{1L, 0L}, - new Object[]{null, null}, - new Object[]{0L, 1L}, - new Object[]{0L, null}, - new Object[]{null, 0L},} + new Object[][] { + new Object[] { 1L, 0L }, + new Object[] { null, null }, + new Object[] { 0L, 1L }, + new Object[] { 0L, null }, + new Object[] { null, 0L }, } ); assertThat(ordsAndKeys.nonEmpty, equalTo(intRange(0, 5))); } else { assertThat(ordsAndKeys.description, equalTo("LongLongBlockHash{channels=[0,1], entries=2}")); assertOrds(ordsAndKeys.ords, 0, null, 1, null, null); - assertKeys(ordsAndKeys.keys, new Object[][]{new Object[]{1L, 0L}, new Object[]{0L, 1L}}); + assertKeys(ordsAndKeys.keys, new Object[][] { new Object[] { 1L, 0L }, new Object[] { 0L, 1L } }); assertThat(ordsAndKeys.nonEmpty, equalTo(intRange(0, 2))); } + Releasables.closeExpectNoException(ordsAndKeys.keys); + Releasables.closeExpectNoException(ordsAndKeys.nonEmpty); } public void testLongBytesRefHash() { - long[] values1 = new long[]{0, 1, 0, 1, 0, 1}; - LongBlock block1 = blockFactory.newLongArrayVector(values1, values1.length).asBlock(); - BytesRefBlock.Builder builder = blockFactory.newBytesRefBlockBuilder(8); + long[] values1 = new long[] { 0, 1, 0, 1, 0, 1 }; + LongBlock block1 = BlockFactory.getNonBreakingInstance().newLongArrayVector(values1, values1.length).asBlock(); + BytesRefBlock.Builder builder = BlockFactory.getNonBreakingInstance().newBytesRefBlockBuilder(8); builder.appendBytesRef(new BytesRef("cat")); builder.appendBytesRef(new BytesRef("cat")); builder.appendBytesRef(new BytesRef("cat")); @@ -851,10 +910,10 @@ public void testLongBytesRefHash() { builder.appendBytesRef(new BytesRef("dog")); BytesRefBlock block2 = builder.build(); Object[][] expectedKeys = { - new Object[]{0L, "cat"}, - new Object[]{1L, "cat"}, - new Object[]{1L, "dog"}, - new Object[]{0L, "dog"}}; + new Object[] { 0L, "cat" }, + new Object[] { 1L, "cat" }, + new Object[] { 1L, "dog" }, + new Object[] { 0L, "dog" } }; OrdsAndKeys ordsAndKeys = hash(block1, block2); assertThat( @@ -869,12 +928,14 @@ public void testLongBytesRefHash() { assertOrds(ordsAndKeys.ords, 0, 1, 0, 2, 3, 2); assertKeys(ordsAndKeys.keys, expectedKeys); assertThat(ordsAndKeys.nonEmpty, equalTo(intRange(0, 4))); - Releasables.closeExpectNoException(block1, block2); + Releasables.closeExpectNoException(ordsAndKeys.keys); + Releasables.closeExpectNoException(ordsAndKeys.nonEmpty); + assertThat(breaker.getUsed(), is(0L)); } public void testLongBytesRefHashWithNull() { - LongBlock.Builder b1 = LongBlock.newBlockBuilder(2); - BytesRefBlock.Builder b2 = BytesRefBlock.newBlockBuilder(2); + LongBlock.Builder b1 = BlockFactory.getNonBreakingInstance().newLongBlockBuilder(2); + BytesRefBlock.Builder b2 = BlockFactory.getNonBreakingInstance().newBytesRefBlockBuilder(2); b1.appendLong(1); b2.appendBytesRef(new BytesRef("cat")); b1.appendNull(); @@ -893,12 +954,12 @@ public void testLongBytesRefHashWithNull() { assertOrds(ordsAndKeys.ords, 0, 1, 2, 3, 4); assertKeys( ordsAndKeys.keys, - new Object[][]{ - new Object[]{1L, "cat"}, - new Object[]{null, null}, - new Object[]{0L, "dog"}, - new Object[]{1L, null}, - new Object[]{null, "vanish"}} + new Object[][] { + new Object[] { 1L, "cat" }, + new Object[] { null, null }, + new Object[] { 0L, "dog" }, + new Object[] { 1L, null }, + new Object[] { null, "vanish" } } ); assertThat(ordsAndKeys.nonEmpty, equalTo(intRange(0, 5))); } else { @@ -908,9 +969,12 @@ public void testLongBytesRefHashWithNull() { ); assertThat(ordsAndKeys.description, endsWith("b}")); assertOrds(ordsAndKeys.ords, 0, null, 1, null, null); - assertKeys(ordsAndKeys.keys, new Object[][]{new Object[]{1L, "cat"}, new Object[]{0L, "dog"}}); + assertKeys(ordsAndKeys.keys, new Object[][] { new Object[] { 1L, "cat" }, new Object[] { 0L, "dog" } }); assertThat(ordsAndKeys.nonEmpty, equalTo(intRange(0, 2))); } + Releasables.closeExpectNoException(ordsAndKeys.keys); + Releasables.closeExpectNoException(ordsAndKeys.nonEmpty); + assertThat(breaker.getUsed(), is(0L)); } private void append(LongBlock.Builder b1, BytesRefBlock.Builder b2, long[] v1, String[] v2) { @@ -939,46 +1003,46 @@ private void append(LongBlock.Builder b1, BytesRefBlock.Builder b2, long[] v1, S } public void testLongBytesRefHashWithMultiValuedFields() { - var b1 = LongBlock.newBlockBuilder(8); - var b2 = BytesRefBlock.newBlockBuilder(8); - append(b1, b2, new long[]{1, 2}, new String[]{"a", "b"}); - append(b1, b2, new long[]{1, 2}, new String[]{"a"}); - append(b1, b2, new long[]{1}, new String[]{"a", "b"}); - append(b1, b2, new long[]{1}, new String[]{"a"}); - append(b1, b2, null, new String[]{"a"}); - append(b1, b2, new long[]{1}, null); - append(b1, b2, new long[]{1, 1, 1}, new String[]{"a", "a", "a"}); - append(b1, b2, new long[]{1, 1, 2, 2}, new String[]{"a", "b", "b"}); - append(b1, b2, new long[]{1, 2, 3}, new String[]{"c", "c", "a"}); + var b1 = BlockFactory.getNonBreakingInstance().newLongBlockBuilder(8); + var b2 = BlockFactory.getNonBreakingInstance().newBytesRefBlockBuilder(8); + append(b1, b2, new long[] { 1, 2 }, new String[] { "a", "b" }); + append(b1, b2, new long[] { 1, 2 }, new String[] { "a" }); + append(b1, b2, new long[] { 1 }, new String[] { "a", "b" }); + append(b1, b2, new long[] { 1 }, new String[] { "a" }); + append(b1, b2, null, new String[] { "a" }); + append(b1, b2, new long[] { 1 }, null); + append(b1, b2, new long[] { 1, 1, 1 }, new String[] { "a", "a", "a" }); + append(b1, b2, new long[] { 1, 1, 2, 2 }, new String[] { "a", "b", "b" }); + append(b1, b2, new long[] { 1, 2, 3 }, new String[] { "c", "c", "a" }); OrdsAndKeys ordsAndKeys = hash(b1.build(), b2.build()); if (forcePackedHash) { assertThat(ordsAndKeys.description, startsWith("PackedValuesBlockHash{groups=[0:LONG, 1:BYTES_REF], entries=10, size=")); assertOrds( ordsAndKeys.ords, - new int[]{0, 1, 2, 3}, - new int[]{0, 2}, - new int[]{0, 1}, - new int[]{0}, - new int[]{4}, - new int[]{5}, - new int[]{0}, - new int[]{0, 1, 2, 3}, - new int[]{6, 0, 7, 2, 8, 9} + new int[] { 0, 1, 2, 3 }, + new int[] { 0, 2 }, + new int[] { 0, 1 }, + new int[] { 0 }, + new int[] { 4 }, + new int[] { 5 }, + new int[] { 0 }, + new int[] { 0, 1, 2, 3 }, + new int[] { 6, 0, 7, 2, 8, 9 } ); assertKeys( ordsAndKeys.keys, - new Object[][]{ - new Object[]{1L, "a"}, - new Object[]{1L, "b"}, - new Object[]{2L, "a"}, - new Object[]{2L, "b"}, - new Object[]{null, "a"}, - new Object[]{1L, null}, - new Object[]{1L, "c"}, - new Object[]{2L, "c"}, - new Object[]{3L, "c"}, - new Object[]{3L, "a"},} + new Object[][] { + new Object[] { 1L, "a" }, + new Object[] { 1L, "b" }, + new Object[] { 2L, "a" }, + new Object[] { 2L, "b" }, + new Object[] { null, "a" }, + new Object[] { 1L, null }, + new Object[] { 1L, "c" }, + new Object[] { 2L, "c" }, + new Object[] { 3L, "c" }, + new Object[] { 3L, "a" }, } ); assertThat(ordsAndKeys.nonEmpty, equalTo(intRange(0, 10))); } else { @@ -988,38 +1052,41 @@ public void testLongBytesRefHashWithMultiValuedFields() { ); assertOrds( ordsAndKeys.ords, - new int[]{0, 1, 2, 3}, - new int[]{0, 1}, - new int[]{0, 2}, - new int[]{0}, + new int[] { 0, 1, 2, 3 }, + new int[] { 0, 1 }, + new int[] { 0, 2 }, + new int[] { 0 }, null, null, - new int[]{0}, - new int[]{0, 1, 2, 3}, - new int[]{4, 5, 6, 0, 1, 7} + new int[] { 0 }, + new int[] { 0, 1, 2, 3 }, + new int[] { 4, 5, 6, 0, 1, 7 } ); assertKeys( ordsAndKeys.keys, - new Object[][]{ - new Object[]{1L, "a"}, - new Object[]{2L, "a"}, - new Object[]{1L, "b"}, - new Object[]{2L, "b"}, - new Object[]{1L, "c"}, - new Object[]{2L, "c"}, - new Object[]{3L, "c"}, - new Object[]{3L, "a"},} + new Object[][] { + new Object[] { 1L, "a" }, + new Object[] { 2L, "a" }, + new Object[] { 1L, "b" }, + new Object[] { 2L, "b" }, + new Object[] { 1L, "c" }, + new Object[] { 2L, "c" }, + new Object[] { 3L, "c" }, + new Object[] { 3L, "a" }, } ); assertThat(ordsAndKeys.nonEmpty, equalTo(intRange(0, 8))); } + Releasables.closeExpectNoException(ordsAndKeys.keys); + Releasables.closeExpectNoException(ordsAndKeys.nonEmpty); + assertThat(breaker.getUsed(), is(0L)); } public void testBytesRefLongHashHugeCombinatorialExplosion() { long[] v1 = LongStream.range(0, 3000).toArray(); String[] v2 = LongStream.range(100, 200).mapToObj(l -> "a" + l).toArray(String[]::new); - var b1 = LongBlock.newBlockBuilder(v1.length); - var b2 = BytesRefBlock.newBlockBuilder(v2.length); + var b1 = BlockFactory.getNonBreakingInstance().newLongBlockBuilder(v1.length); + var b2 = BlockFactory.getNonBreakingInstance().newBytesRefBlockBuilder(v2.length); append(b1, b2, v1, v2); int[] expectedEntries = new int[1]; @@ -1032,8 +1099,8 @@ public void testBytesRefLongHashHugeCombinatorialExplosion() { forcePackedHash ? startsWith("PackedValuesBlockHash{groups=[0:LONG, 1:BYTES_REF], entries=" + expectedEntries[0] + ", size=") : startsWith( - "BytesRefLongBlockHash{keys=[BytesRefKey[channel=1], LongKey[channel=0]], entries=" + expectedEntries[0] + ", size=" - ) + "BytesRefLongBlockHash{keys=[BytesRefKey[channel=1], LongKey[channel=0]], entries=" + expectedEntries[0] + ", size=" + ) ); assertOrds(ordsAndKeys.ords, IntStream.range(start, expectedEntries[0]).toArray()); assertKeys( @@ -1041,19 +1108,21 @@ public void testBytesRefLongHashHugeCombinatorialExplosion() { IntStream.range(0, expectedEntries[0]) .mapToObj( i -> forcePackedHash - ? new Object[]{v1[i / v2.length], v2[i % v2.length]} - : new Object[]{v1[i % v1.length], v2[i / v1.length]} + ? new Object[] { v1[i / v2.length], v2[i % v2.length] } + : new Object[] { v1[i % v1.length], v2[i / v1.length] } ) .toArray(l -> new Object[l][]) ); assertThat(ordsAndKeys.nonEmpty, equalTo(intRange(0, expectedEntries[0]))); + Releasables.closeExpectNoException(ordsAndKeys.keys); + Releasables.closeExpectNoException(ordsAndKeys.nonEmpty); }, pageSize, b1.build(), b2.build()); assertThat("misconfigured test", expectedEntries[0], greaterThan(0)); + assertThat(breaker.getUsed(), is(0L)); } - record OrdsAndKeys(String description, int positionOffset, IntBlock ords, Block[] keys, IntVector nonEmpty) { - } + record OrdsAndKeys(String description, int positionOffset, IntBlock ords, Block[] keys, IntVector nonEmpty) {} /** * Hash some values into a single block of group ids. If the hash produces @@ -1115,7 +1184,6 @@ public void add(int positionOffset, IntBlock groupIds) { } } callback.accept(result); - // Releasables.closeExpectNoException(result.keys()); // TODO: who should release the keys? } @Override @@ -1126,7 +1194,7 @@ public void add(int positionOffset, IntVector groupIds) { } private void assertOrds(IntBlock ordsBlock, Integer... expectedOrds) { - assertOrds(ordsBlock, Arrays.stream(expectedOrds).map(l -> l == null ? null : new int[]{l}).toArray(int[][]::new)); + assertOrds(ordsBlock, Arrays.stream(expectedOrds).map(l -> l == null ? null : new int[] { l }).toArray(int[][]::new)); } private void assertOrds(IntBlock ordsBlock, int[]... expectedOrds) { @@ -1161,7 +1229,7 @@ private void assertOrds(IntBlock ordsBlock, int[]... expectedOrds) { private void assertKeys(Block[] actualKeys, Object... expectedKeys) { Object[][] flipped = new Object[expectedKeys.length][]; for (int r = 0; r < flipped.length; r++) { - flipped[r] = new Object[]{expectedKeys[r]}; + flipped[r] = new Object[] { expectedKeys[r] }; } assertKeys(actualKeys, flipped); } @@ -1205,6 +1273,6 @@ static CircuitBreakerService mockBreakerService(CircuitBreaker breaker) { } IntVector intRange(int startInclusive, int endExclusive) { - return IntVector.range(startInclusive, endExclusive, nonBreakingFactory); + return IntVector.range(startInclusive, endExclusive, BlockFactory.getNonBreakingInstance()); } } diff --git a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/data/DocVectorTests.java b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/data/DocVectorTests.java index 1e936a4d9562..350425840a59 100644 --- a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/data/DocVectorTests.java +++ b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/data/DocVectorTests.java @@ -133,8 +133,7 @@ private void assertShardSegmentDocMap(int[][] data, int[][] expected) { } public void testCannotDoubleRelease() { - var block = new DocVector(intRange(0, 2), IntBlock.newConstantBlockWith(0, 2).asVector(), intRange(0, 2), null) - .asBlock(); + var block = new DocVector(intRange(0, 2), IntBlock.newConstantBlockWith(0, 2).asVector(), intRange(0, 2), null).asBlock(); assertThat(block.isReleased(), is(false)); Page page = new Page(block); diff --git a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/data/FilteredBlockTests.java b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/data/FilteredBlockTests.java index 19cb435a537e..5ab4266023bf 100644 --- a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/data/FilteredBlockTests.java +++ b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/data/FilteredBlockTests.java @@ -7,7 +7,6 @@ package org.elasticsearch.compute.data; -import org.apache.lucene.util.Accountable; import org.apache.lucene.util.BytesRef; import org.elasticsearch.common.breaker.CircuitBreaker; import org.elasticsearch.common.unit.ByteSizeValue; @@ -15,7 +14,6 @@ import org.elasticsearch.common.util.BytesRefArray; import org.elasticsearch.common.util.MockBigArrays; import org.elasticsearch.common.util.PageCacheRecycler; -import org.elasticsearch.core.Releasable; import org.elasticsearch.core.Releasables; import org.elasticsearch.indices.breaker.CircuitBreakerService; import org.elasticsearch.test.ESTestCase; @@ -295,30 +293,21 @@ BytesRefArray arrayOf(String... values) { return array; } - void releaseAndAssertBreaker(T data) { - releaseAndAssertBreaker(data, breaker); - } - - static void releaseAndAssertBreaker(T data, CircuitBreaker breaker) { + void releaseAndAssertBreaker(Block... blocks) { assertThat(breaker.getUsed(), greaterThan(0L)); - Releasables.closeExpectNoException(data); - if (data instanceof Block block) { - assertThat(block.isReleased(), is(true)); - assertCannotDoubleRelease(block); - assertCannotReadFromPage(block); - } + Page[] pages = Arrays.stream(blocks).map(Page::new).toArray(Page[]::new); + Releasables.closeExpectNoException(blocks); + Arrays.stream(blocks).forEach(block -> assertThat(block.isReleased(), is(true))); + Arrays.stream(blocks).forEach(BasicBlockTests::assertCannotDoubleRelease); + Arrays.stream(pages).forEach(BasicBlockTests::assertCannotReadFromPage); + Arrays.stream(blocks).forEach(BasicBlockTests::assertCannotAddToPage); assertThat(breaker.getUsed(), is(0L)); } - static void assertCannotDoubleRelease(Block block) { - var ex = expectThrows(IllegalStateException.class, () -> block.close()); - assertThat(ex.getMessage(), containsString("can't release already released block")); - } - - static void assertCannotReadFromPage(Block block) { - Page page = new Page(block); - var e = expectThrows(IllegalStateException.class, () -> page.getBlock(0)); - assertThat(e.getMessage(), containsString("can't read released block")); + void releaseAndAssertBreaker(Vector vector) { + assertThat(breaker.getUsed(), greaterThan(0L)); + Releasables.closeExpectNoException(vector); + assertThat(breaker.getUsed(), is(0L)); } // A breaker service that always returns the given breaker for getBreaker(CircuitBreaker.REQUEST) diff --git a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/ForkingOperatorTestCase.java b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/ForkingOperatorTestCase.java index abe1f780c090..189c96e1ba47 100644 --- a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/ForkingOperatorTestCase.java +++ b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/ForkingOperatorTestCase.java @@ -9,6 +9,7 @@ import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.support.PlainActionFuture; +import org.elasticsearch.common.collect.Iterators; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.common.util.concurrent.EsExecutors; @@ -20,6 +21,7 @@ import org.elasticsearch.compute.operator.exchange.ExchangeSinkOperator; import org.elasticsearch.compute.operator.exchange.ExchangeSourceHandler; import org.elasticsearch.compute.operator.exchange.ExchangeSourceOperator; +import org.elasticsearch.core.Releasables; import org.elasticsearch.core.TimeValue; import org.elasticsearch.threadpool.FixedExecutorBuilder; import org.elasticsearch.threadpool.TestThreadPool; @@ -56,13 +58,10 @@ protected final Operator.OperatorFactory simple(BigArrays bigArrays) { } public final void testInitialFinal() { - //List input = ... - // keep a copy of the original input for comparison, since the driver will consume/release the page blocks - //List origInput = BlockTestUtils.deepCopyOf(input, BlockFactory.getNonBreakingInstance()); - BigArrays bigArrays = nonBreakingBigArrays(); DriverContext driverContext = driverContext(); List input = CannedSourceOperator.collectPages(simpleInput(driverContext.blockFactory(), between(1_000, 100_000))); + List origInput = BlockTestUtils.deepCopyOf(input, BlockFactory.getNonBreakingInstance()); List results = new ArrayList<>(); try ( Driver d = new Driver( @@ -79,17 +78,15 @@ public final void testInitialFinal() { runDriver(d); } assertSimpleOutput(origInput, results); + Releasables.close(() -> Iterators.map(results.iterator(), p -> p::releaseBlocks)); assertDriverContext(driverContext); } public final void testManyInitialFinal() { - List input = CannedSourceOperator.collectPages(simpleInput(between(1_000, 100_000))); - // keep a copy of the original input for comparison, since the driver will consume/release the page blocks - List origInput = BlockTestUtils.deepCopyOf(input, BlockFactory.getNonBreakingInstance()); - BigArrays bigArrays = nonBreakingBigArrays(); DriverContext driverContext = driverContext(); List input = CannedSourceOperator.collectPages(simpleInput(driverContext.blockFactory(), between(1_000, 100_000))); + List origInput = BlockTestUtils.deepCopyOf(input, BlockFactory.getNonBreakingInstance()); List partials = oneDriverPerPage(input, () -> List.of(simpleWithMode(bigArrays, AggregatorMode.INITIAL).get(driverContext))); List results = new ArrayList<>(); try ( @@ -104,17 +101,15 @@ public final void testManyInitialFinal() { runDriver(d); } assertSimpleOutput(origInput, results); + Releasables.close(() -> Iterators.map(results.iterator(), p -> p::releaseBlocks)); assertDriverContext(driverContext); } public final void testInitialIntermediateFinal() { - List input = CannedSourceOperator.collectPages(simpleInput(between(1_000, 100_000))); - // keep a copy of the original input for comparison, since the driver will consume/release the page blocks - List origInput = BlockTestUtils.deepCopyOf(input, BlockFactory.getNonBreakingInstance()); - BigArrays bigArrays = nonBreakingBigArrays(); DriverContext driverContext = driverContext(); List input = CannedSourceOperator.collectPages(simpleInput(driverContext.blockFactory(), between(1_000, 100_000))); + List origInput = BlockTestUtils.deepCopyOf(input, BlockFactory.getNonBreakingInstance()); List results = new ArrayList<>(); try ( @@ -133,6 +128,7 @@ public final void testInitialIntermediateFinal() { runDriver(d); } assertSimpleOutput(origInput, results); + Releasables.close(() -> Iterators.map(results.iterator(), p -> p::releaseBlocks)); assertDriverContext(driverContext); } @@ -141,6 +137,7 @@ public final void testManyInitialManyPartialFinal() { BigArrays bigArrays = nonBreakingBigArrays(); DriverContext driverContext = driverContext(); List input = CannedSourceOperator.collectPages(simpleInput(driverContext.blockFactory(), between(1_000, 100_000))); + List origInput = BlockTestUtils.deepCopyOf(input, BlockFactory.getNonBreakingInstance()); List partials = oneDriverPerPage(input, () -> List.of(simpleWithMode(bigArrays, AggregatorMode.INITIAL).get(driverContext))); Collections.shuffle(partials, random()); @@ -161,7 +158,8 @@ public final void testManyInitialManyPartialFinal() { ) { runDriver(d); } - assertSimpleOutput(input, results); + assertSimpleOutput(origInput, results); + Releasables.close(() -> Iterators.map(results.iterator(), p -> p::releaseBlocks)); assertDriverContext(driverContext); } @@ -170,9 +168,7 @@ public final void testManyInitialManyPartialFinal() { public final void testManyInitialManyPartialFinalRunner() { BigArrays bigArrays = nonBreakingBigArrays(); List input = CannedSourceOperator.collectPages(simpleInput(driverContext().blockFactory(), between(1_000, 100_000))); - List results = new ArrayList<>(); - - BigArrays bigArrays = nonBreakingBigArrays(); + List origInput = BlockTestUtils.deepCopyOf(input, BlockFactory.getNonBreakingInstance()); List results = new ArrayList<>(); List drivers = createDriversForInput(bigArrays, input, results, false /* no throwing ops */); var runner = new DriverRunner(threadPool.getThreadContext()) { @@ -185,6 +181,7 @@ protected void start(Driver driver, ActionListener listener) { runner.runToCompletion(drivers, future); future.actionGet(TimeValue.timeValueMinutes(1)); assertSimpleOutput(origInput, results); + Releasables.close(() -> Iterators.map(results.iterator(), p -> p::releaseBlocks)); drivers.stream().map(Driver::driverContext).forEach(OperatorTestCase::assertDriverContext); } @@ -208,6 +205,7 @@ protected void start(Driver driver, ActionListener listener) { runner.runToCompletion(drivers, future); BadException e = expectThrows(BadException.class, () -> future.actionGet(TimeValue.timeValueMinutes(1))); assertThat(e.getMessage(), startsWith("bad exception from")); + Releasables.close(() -> Iterators.map(results.iterator(), p -> p::releaseBlocks)); drivers.stream().map(Driver::driverContext).forEach(OperatorTestCase::assertDriverContext); } diff --git a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/OperatorTestCase.java b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/OperatorTestCase.java index 9a1bba7f7da3..501004f98dcc 100644 --- a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/OperatorTestCase.java +++ b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/OperatorTestCase.java @@ -96,10 +96,17 @@ public final void testSimpleCircuitBreaking() { List input = CannedSourceOperator.collectPages(simpleInput(inputFactoryContext.blockFactory(), between(1_000, 10_000))); CircuitBreaker breaker = bigArrays.breakerService().getBreaker(CircuitBreaker.REQUEST); BlockFactory blockFactory = BlockFactory.getInstance(breaker, bigArrays); - Exception e = expectThrows( - CircuitBreakingException.class, - () -> drive(simple(bigArrays).get(new DriverContext(bigArrays, blockFactory)), input.iterator()) - ); + Exception e = expectThrows(CircuitBreakingException.class, () -> { + Operator operator; + try { + operator = simple(bigArrays).get(new DriverContext(bigArrays, blockFactory)); + } catch (CircuitBreakingException cbe) { + // if we failed to even create the operator, then release the input pages + releasePageBlocksWhileHandlingException(input); + throw cbe; + } + drive(operator, input.iterator()); + }); assertThat(e.getMessage(), equalTo(MockBigArrays.ERROR_MESSAGE)); assertThat(bigArrays.breakerService().getBreaker(CircuitBreaker.REQUEST).getUsed(), equalTo(0L)); @@ -123,7 +130,15 @@ public final void testSimpleWithCranky() { BigArrays bigArrays = new MockBigArrays(PageCacheRecycler.NON_RECYCLING_INSTANCE, cranky).withCircuitBreaking(); BlockFactory blockFactory = BlockFactory.getInstance(cranky.getBreaker(CircuitBreaker.REQUEST), bigArrays); try { - List result = drive(simple(bigArrays).get(new DriverContext(bigArrays, blockFactory)), input.iterator()); + Operator operator; + try { + operator = simple(bigArrays).get(new DriverContext(bigArrays, blockFactory)); + } catch (CircuitBreakingException cbe) { + // if we failed to even create the operator, then release the input pages + releasePageBlocksWhileHandlingException(input); + throw cbe; + } + List result = drive(operator, input.iterator()); Releasables.close(() -> Iterators.map(result.iterator(), p -> p::releaseBlocks)); // Either we get lucky and cranky doesn't throw and the test completes or we don't and it throws } catch (CircuitBreakingException e) { @@ -167,22 +182,24 @@ protected final List oneDriverPerPageList(Iterator> source, Sup private void assertSimple(DriverContext context, int size) { List input = CannedSourceOperator.collectPages(simpleInput(context.blockFactory(), size)); - // Clone the input so that the operator can close it, then, later, we can read it again to build the assertion. - List inputClone = CannedSourceOperator.deepCopyOf(input); + // The operator may release the input data, so copy it so that we can access during assertion + List origInput = BlockTestUtils.deepCopyOf(input, BlockFactory.getNonBreakingInstance()); BigArrays bigArrays = context.bigArrays().withCircuitBreaking(); - - Operator operator; + + Operator operator = null; try { - // if we fail to even create the operator, then close the input operator = simple(bigArrays).get(context); + List results = drive(operator, input.iterator()); + assertSimpleOutput(origInput, results); + results.forEach(Page::releaseBlocks); + assertThat(bigArrays.breakerService().getBreaker(CircuitBreaker.REQUEST).getUsed(), equalTo(0L)); } catch (CircuitBreakingException cbe) { - releasePageBlocksWhileHandlingException(input); + if (operator == null) { + // if we failed to even create the operator, then release the input pages + releasePageBlocksWhileHandlingException(input); + } throw cbe; } - List results = drive(operator, input.iterator()); - assertSimpleOutput(inputClone, results); - results.forEach(Page::releaseBlocks); - assertThat(bigArrays.breakerService().getBreaker(CircuitBreaker.REQUEST).getUsed(), equalTo(0L)); } protected final List drive(Operator operator, Iterator input) {