Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ESQL: Make blocks ref counted #100408

Merged
merged 42 commits into from
Nov 20, 2023
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
Show all changes
42 commits
Select commit Hold shift + click to select a range
9e89137
Implement references counted block refs
alex-spies Oct 6, 2023
73ade8f
Use RefCounted
alex-spies Oct 6, 2023
522a47e
Move BlockRef into Block.Ref
alex-spies Oct 6, 2023
b55fd8a
Update tests
alex-spies Oct 6, 2023
5844b81
Revert all changes
alex-spies Oct 9, 2023
044bf9a
Make Block extend RefCounted
alex-spies Oct 9, 2023
dad012c
Avoid deep copies in EvalOperator
alex-spies Oct 9, 2023
d629ca4
Merge remote-tracking branch 'upstream/main' into esql-block-ref-coun…
alex-spies Oct 9, 2023
f8df502
Fix RefCounted javadoc
alex-spies Oct 9, 2023
d304a95
Improve Block::RefCounted implementation
alex-spies Oct 9, 2023
8f3df46
Add ref counting tests for vector and array blocks
alex-spies Oct 9, 2023
44ab382
Test remaining block types
alex-spies Oct 9, 2023
f25644d
Update docs/changelog/100408.yaml
alex-spies Oct 9, 2023
d84a4ed
Fix projection bug, simplify projections
alex-spies Oct 9, 2023
0112040
Fix projection for empty page
alex-spies Oct 10, 2023
8f8522d
Merge remote-tracking branch 'upstream/main' into esql-block-ref-coun…
alex-spies Oct 10, 2023
60305b7
Remove obsolete test
alex-spies Oct 10, 2023
9801f1b
Merge remote-tracking branch 'upstream/main' into esql-block-ref-coun…
alex-spies Nov 13, 2023
b0a05be
Fix multiply owned BooleanBlock in exchange sink
alex-spies Nov 14, 2023
7168c12
Improve javadoc and illegal state messages
alex-spies Nov 14, 2023
803d654
Remove newPageAndRelease
alex-spies Nov 14, 2023
57f5977
Spotless
alex-spies Nov 14, 2023
95466bb
Merge remote-tracking branch 'upstream/main' into esql-block-ref-coun…
alex-spies Nov 14, 2023
99da217
Revert error message
alex-spies Nov 14, 2023
b61e50e
Merge remote-tracking branch 'upstream/main' into esql-block-ref-coun…
alex-spies Nov 14, 2023
b33629d
Align logic with AbstractRefCounted
alex-spies Nov 14, 2023
28970ab
Cleanup
alex-spies Nov 14, 2023
46d3044
Address potential partial releases
alex-spies Nov 14, 2023
42a8559
Fix test
alex-spies Nov 14, 2023
98f6629
Make decRef and close equivalent again
alex-spies Nov 15, 2023
92ab31f
Remove releaseByDecRef
alex-spies Nov 15, 2023
2d3aa70
Update javadoc for Block
alex-spies Nov 15, 2023
73110c0
Address Nhat's feedback
alex-spies Nov 17, 2023
061563d
Merge remote-tracking branch 'upstream/main' into esql-block-ref-coun…
alex-spies Nov 17, 2023
63109c0
Align DocBlock and DocVector with others
alex-spies Nov 20, 2023
be66835
Add test for closing block's underlying vector
alex-spies Nov 20, 2023
1b7f2ff
Improve tests
alex-spies Nov 20, 2023
c4e906e
Throw IllegalStateException more consistently
alex-spies Nov 20, 2023
0d3d081
Merge remote-tracking branch 'upstream/main' into esql-block-ref-coun…
alex-spies Nov 20, 2023
4fe6e08
Check for already released vector only once
alex-spies Nov 20, 2023
9593102
spotless
alex-spies Nov 20, 2023
25794d6
Merge remote-tracking branch 'upstream/main' into esql-block-ref-coun…
alex-spies Nov 20, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ class AddWork extends LongLongBlockHash.AbstractAddBlock {
AddWork(Page page, GroupingAggregatorFunction.AddInput addInput, int batchSize) {
super(emitBatchSize, addInput);
for (Group group : groups) {
group.encoder = MultivalueDedupe.batchEncoder(new Block.Ref(page.getBlock(group.spec.channel()), page), batchSize, true);
group.encoder = MultivalueDedupe.batchEncoder(page.getBlockRef(group.spec.channel()), batchSize, true);
}
bytes.grow(nullTrackingBytes);
this.positionCount = page.getPositionCount();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
import org.apache.lucene.util.Accountable;
import org.elasticsearch.common.io.stream.NamedWriteable;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.core.RefCounted;
import org.elasticsearch.core.Releasable;

import java.util.List;
Expand Down Expand Up @@ -209,6 +209,7 @@ interface Builder extends Releasable {
Block build();
}

// TODO: Update doc
/**
* A reference to a {@link Block}. This is {@link Releasable} and
* {@link Ref#close closing} it will {@link Block#close release}
Expand All @@ -224,31 +225,97 @@ interface Builder extends Releasable {
* The {@code try} block will return the memory used by the block to the
* breaker if it was "free floating", but if it was attached to a {@link Page}
* then it'll do nothing.
*
* @param block the block referenced
* @param containedIn the page containing it or null, if it is "free floating".
*/
record Ref(Block block, @Nullable Page containedIn) implements Releasable {
/**
* Create a "free floating" {@link Ref}.
*/
public static Ref floating(Block block) {
return new Ref(block, null);
class Ref implements Releasable {

private static class RefCount implements RefCounted {
private int i = 1;

@Override
public void incRef() {
i++;
}

@Override
public boolean tryIncRef() {
if (i <= 0) {
return false;
}
incRef();
return true;
}

@Override
public boolean decRef() {
i--;

return i <= 0;
}

@Override
public boolean hasReferences() {
return i >= 1;
}
}

/**
* Is this block "free floating" or attached to a page?
*/
public boolean floating() {
return containedIn == null;
private final Block block;
private final RefCount refs;
private boolean isReleased;

public Ref(Block block) {
this(block, new RefCount());
}

private Ref(Block block, RefCount refs) {
this.block = block;
this.refs = refs;
isReleased = false;
}

public <B extends Block> B block() {
if (isReleased) {
throw new IllegalStateException("cannot get block from released reference");
}
assert block.isReleased() == false;

@SuppressWarnings("unchecked")
B castBlock = (B) block;

return castBlock;
}

public Ref shallowCopy() {
if (refs.tryIncRef() == false) {
throw new IllegalStateException("cannot copy already closed reference");
}

return new Ref(block, refs);
}

@Override
public String toString() {
return "BlockRef{" + "block=" + block.toString() + '}';
}

@Override
public void close() {
if (floating()) {
if (isReleased) {
throw new IllegalStateException("cannot release block reference twice");
}
isReleased = true;
if (refs.decRef()) {
block.close();
}
}

@Deprecated
public static Ref floating(Block b) {
return b.asRef();
}
}

default Block.Ref asRef() {
return new Ref(this);
}

static List<NamedWriteableRegistry.Entry> getNamedWriteables() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,9 @@
*
* <p> Pages are immutable and can be passed between threads.
*/
// TODO: Remove deprecated methods and ctors.
public final class Page implements Writeable {

private final Block[] blocks;

private final Block.Ref[] blocks;
private final int positionCount;

/**
Expand All @@ -46,9 +45,11 @@ public final class Page implements Writeable {
*
* @param blocks the blocks
* @throws IllegalArgumentException if all blocks do not have the same number of positions
* @deprecated use {@link Page#Page(Block.Ref...)} instead
*/
@Deprecated
public Page(Block... blocks) {
this(true, determinePositionCount(blocks), blocks);
this(blockArrayToRefs(blocks));
}

/**
Expand All @@ -58,17 +59,38 @@ public Page(Block... blocks) {
*
* @param positionCount the block position count
* @param blocks the blocks
* @deprecated use {@link Page#Page(int, Block.Ref...)} instead
*/
@Deprecated
public Page(int positionCount, Block... blocks) {
this(true, positionCount, blocks);
this(positionCount, blockArrayToRefs(blocks));
}

private Page(boolean copyBlocks, int positionCount, Block[] blocks) {
/**
* Creates a new page with the given blocks. Every block has the same number of positions.
*
* @param blocks the blocks
* @throws IllegalArgumentException if all blocks do not have the same number of positions
*/
public Page(Block.Ref... blocks) {
this(determinePositionCount(blocks), blocks);
}

/**
* Creates a new page with the given positionCount and blocks. Assumes that every block has the
* same number of positions as the positionCount that's passed in - there is no validation of
* this.
*
* @param positionCount the block position count
* @param blocks the blocks
*/
public Page(int positionCount, Block.Ref... blocks) {
Objects.requireNonNull(blocks, "blocks is null");
// assert assertPositionCount(blocks);
this.positionCount = positionCount;
this.blocks = copyBlocks ? blocks.clone() : blocks;
for (Block b : blocks) {
this.blocks = blocks.clone();
for (Block.Ref bRef : blocks) {
Block b = bRef.block();
assert b.getPositionCount() == positionCount : "expected positionCount=" + positionCount + " but was " + b;
if (b.isReleased()) {
throw new IllegalArgumentException("can't build page out of released blocks but [" + b + "] was released");
Expand All @@ -79,9 +101,9 @@ private Page(boolean copyBlocks, int positionCount, Block[] blocks) {
/**
* Appending ctor, see {@link #appendBlocks}.
*/
private Page(Page prev, Block[] toAdd) {
for (Block block : toAdd) {
if (prev.positionCount != block.getPositionCount()) {
private Page(Page prev, Block.Ref[] toAdd) {
for (Block.Ref blockRef : toAdd) {
if (prev.positionCount != blockRef.block().getPositionCount()) {
throw new IllegalArgumentException("Block does not have same position count");
}
}
Expand All @@ -96,11 +118,11 @@ private Page(Page prev, Block[] toAdd) {
public Page(StreamInput in) throws IOException {
int positionCount = in.readVInt();
int blockPositions = in.readVInt();
Block[] blocks = new Block[blockPositions];
Block.Ref[] blocks = new Block.Ref[blockPositions];
boolean success = false;
try {
for (int blockIndex = 0; blockIndex < blockPositions; blockIndex++) {
blocks[blockIndex] = in.readNamedWriteable(Block.class);
blocks[blockIndex] = new Block.Ref(in.readNamedWriteable(Block.class));
}
success = true;
} finally {
Expand All @@ -112,30 +134,40 @@ public Page(StreamInput in) throws IOException {
this.blocks = blocks;
}

private static int determinePositionCount(Block... blocks) {
private static int determinePositionCount(Block.Ref... blocks) {
Objects.requireNonNull(blocks, "blocks is null");
if (blocks.length == 0) {
throw new IllegalArgumentException("blocks is empty");
}
return blocks[0].getPositionCount();
return blocks[0].block().getPositionCount();
}

/**
* Returns the block at the given block index.
*
* @param blockIndex the block index
* @return the block
* @deprecated use {@link Page#getBlockRef} instead
*/
@Deprecated
public <B extends Block> B getBlock(int blockIndex) {
if (blocksReleased) {
throw new IllegalStateException("can't read released page");
}
@SuppressWarnings("unchecked")
B block = (B) blocks[blockIndex];
if (block.isReleased()) {
throw new IllegalStateException("can't read released block [" + block + "]");
return blocks[blockIndex].block();
}

/**
* Returns a reference to the block at the given block index.
*
* @param blockIndex the block index
* @return the block
*/
public Block.Ref getBlockRef(int blockIndex) {
if (blocksReleased) {
throw new IllegalStateException("can't read released page");
}
return block;
return blocks[blockIndex];
}

/**
Expand All @@ -145,9 +177,11 @@ public <B extends Block> B getBlock(int blockIndex) {
* @return a new Page with the block appended
* @throws IllegalArgumentException if the given block does not have the same number of
* positions as the blocks in this Page
* @deprecated use {@link Page#appendBlock(Block.Ref)} instead
*/
@Deprecated
public Page appendBlock(Block block) {
return new Page(this, new Block[] { block });
return new Page(this, new Block.Ref[] { block.asRef() });
}

/**
Expand All @@ -157,8 +191,34 @@ public Page appendBlock(Block block) {
* @return a new Page with the block appended
* @throws IllegalArgumentException if one of the given blocks does not have the same number of
* positions as the blocks in this Page
* @deprecated use {@link Page#appendBlocks(Block.Ref[])} instead
*/
@Deprecated
public Page appendBlocks(Block[] toAdd) {
return new Page(this, blockArrayToRefs(toAdd));
}

/**
* Creates a new page, appending the given block to the existing blocks in this Page.
*
* @param block the block to append
* @return a new Page with the block appended
* @throws IllegalArgumentException if the given block does not have the same number of
* positions as the blocks in this Page
*/
public Page appendBlock(Block.Ref block) {
return new Page(this, new Block.Ref[] { block });
}

/**
* Creates a new page, appending the given blocks to the existing blocks in this Page.
*
* @param toAdd the blocks to append
* @return a new Page with the block appended
* @throws IllegalArgumentException if one of the given blocks does not have the same number of
* positions as the blocks in this Page
*/
public Page appendBlocks(Block.Ref[] toAdd) {
return new Page(this, toAdd);
}

Expand All @@ -178,7 +238,7 @@ public Page appendPage(Page toAdd) {
public int hashCode() {
int result = Objects.hash(positionCount);
for (int i = 0; i < blocks.length; i++) {
result = 31 * result + Objects.hashCode(blocks[i]);
result = 31 * result + Objects.hashCode(blocks[i].block());
}
return result;
}
Expand All @@ -188,8 +248,13 @@ public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
Page page = (Page) o;
return positionCount == page.positionCount
&& (positionCount == 0 || Arrays.equals(blocks, 0, blocks.length, page.blocks, 0, page.blocks.length));

if ((positionCount == page.positionCount) == false || (blocks.length == page.blocks.length) == false) return false;
if (positionCount == 0) return true;
for (int i = 0; i < blocks.length; i++) {
if (blocks[i].block().equals(page.blocks[i].block()) == false) return false;
}
return true;
}

@Override
Expand Down Expand Up @@ -220,8 +285,8 @@ public int getBlockCount() {
public void writeTo(StreamOutput out) throws IOException {
out.writeVInt(positionCount);
out.writeVInt(getBlockCount());
for (Block block : blocks) {
out.writeNamedWriteable(block);
for (Block.Ref blockRef : blocks) {
out.writeNamedWriteable(blockRef.block());
}
}

Expand All @@ -232,4 +297,8 @@ public void releaseBlocks() {
blocksReleased = true;
Releasables.closeExpectNoException(blocks);
}

private static Block.Ref[] blockArrayToRefs(Block[] blocks) {
return Arrays.stream(blocks).map(Block::asRef).toArray(Block.Ref[]::new);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@

import org.elasticsearch.compute.data.Block;
import org.elasticsearch.compute.data.BlockFactory;
import org.elasticsearch.compute.data.BlockUtils;
import org.elasticsearch.compute.data.Page;
import org.elasticsearch.core.Releasable;
import org.elasticsearch.core.Releasables;
Expand Down Expand Up @@ -45,8 +44,7 @@ public EvalOperator(BlockFactory blockFactory, ExpressionEvaluator evaluator) {
@Override
protected Page process(Page page) {
Block.Ref ref = evaluator.eval(page);
Block block = ref.floating() ? ref.block() : BlockUtils.deepCopyOf(ref.block(), blockFactory);
return page.appendBlock(block);
return page.appendBlock(ref);
alex-spies marked this conversation as resolved.
Show resolved Hide resolved
}

@Override
Expand Down
Loading