Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ESQL: Make blocks ref counted #100408

Merged
merged 42 commits into from
Nov 20, 2023
Merged
Show file tree
Hide file tree
Changes from 29 commits
Commits
Show all changes
42 commits
Select commit Hold shift + click to select a range
9e89137
Implement references counted block refs
alex-spies Oct 6, 2023
73ade8f
Use RefCounted
alex-spies Oct 6, 2023
522a47e
Move BlockRef into Block.Ref
alex-spies Oct 6, 2023
b55fd8a
Update tests
alex-spies Oct 6, 2023
5844b81
Revert all changes
alex-spies Oct 9, 2023
044bf9a
Make Block extend RefCounted
alex-spies Oct 9, 2023
dad012c
Avoid deep copies in EvalOperator
alex-spies Oct 9, 2023
d629ca4
Merge remote-tracking branch 'upstream/main' into esql-block-ref-coun…
alex-spies Oct 9, 2023
f8df502
Fix RefCounted javadoc
alex-spies Oct 9, 2023
d304a95
Improve Block::RefCounted implementation
alex-spies Oct 9, 2023
8f3df46
Add ref counting tests for vector and array blocks
alex-spies Oct 9, 2023
44ab382
Test remaining block types
alex-spies Oct 9, 2023
f25644d
Update docs/changelog/100408.yaml
alex-spies Oct 9, 2023
d84a4ed
Fix projection bug, simplify projections
alex-spies Oct 9, 2023
0112040
Fix projection for empty page
alex-spies Oct 10, 2023
8f8522d
Merge remote-tracking branch 'upstream/main' into esql-block-ref-coun…
alex-spies Oct 10, 2023
60305b7
Remove obsolete test
alex-spies Oct 10, 2023
9801f1b
Merge remote-tracking branch 'upstream/main' into esql-block-ref-coun…
alex-spies Nov 13, 2023
b0a05be
Fix multiply owned BooleanBlock in exchange sink
alex-spies Nov 14, 2023
7168c12
Improve javadoc and illegal state messages
alex-spies Nov 14, 2023
803d654
Remove newPageAndRelease
alex-spies Nov 14, 2023
57f5977
Spotless
alex-spies Nov 14, 2023
95466bb
Merge remote-tracking branch 'upstream/main' into esql-block-ref-coun…
alex-spies Nov 14, 2023
99da217
Revert error message
alex-spies Nov 14, 2023
b61e50e
Merge remote-tracking branch 'upstream/main' into esql-block-ref-coun…
alex-spies Nov 14, 2023
b33629d
Align logic with AbstractRefCounted
alex-spies Nov 14, 2023
28970ab
Cleanup
alex-spies Nov 14, 2023
46d3044
Address potential partial releases
alex-spies Nov 14, 2023
42a8559
Fix test
alex-spies Nov 14, 2023
98f6629
Make decRef and close equivalent again
alex-spies Nov 15, 2023
92ab31f
Remove releaseByDecRef
alex-spies Nov 15, 2023
2d3aa70
Update javadoc for Block
alex-spies Nov 15, 2023
73110c0
Address Nhat's feedback
alex-spies Nov 17, 2023
061563d
Merge remote-tracking branch 'upstream/main' into esql-block-ref-coun…
alex-spies Nov 17, 2023
63109c0
Align DocBlock and DocVector with others
alex-spies Nov 20, 2023
be66835
Add test for closing block's underlying vector
alex-spies Nov 20, 2023
1b7f2ff
Improve tests
alex-spies Nov 20, 2023
c4e906e
Throw IllegalStateException more consistently
alex-spies Nov 20, 2023
0d3d081
Merge remote-tracking branch 'upstream/main' into esql-block-ref-coun…
alex-spies Nov 20, 2023
4fe6e08
Check for already released vector only once
alex-spies Nov 20, 2023
9593102
spotless
alex-spies Nov 20, 2023
25794d6
Merge remote-tracking branch 'upstream/main' into esql-block-ref-coun…
alex-spies Nov 20, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions docs/changelog/100408.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 100408
summary: "ESQL: Make blocks ref counted"
area: ES|QL
type: enhancement
issues: []
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ public interface RefCounted {
void incRef();

/**
* Tries to increment the refCount of this instance. This method will return {@code true} iff the refCount was
* Tries to increment the refCount of this instance. This method will return {@code true} iff the refCount was successfully incremented.
alex-spies marked this conversation as resolved.
Show resolved Hide resolved
*
* @see #decRef()
* @see #incRef()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,10 +136,7 @@ public String toString() {

@Override
public void close() {
if (released) {
throw new IllegalStateException("can't release already released block [" + this + "]");
}
released = true;
super.close();
blockFactory.adjustBreaker(-ramBytesUsed(), true);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@ public final class BooleanVectorBlock extends AbstractVectorBlock implements Boo

private final BooleanVector vector;

/**
* @param vector considered owned by the current block; must not be used in any other {@code Block}
*/
BooleanVectorBlock(BooleanVector vector) {
super(vector.getPositionCount(), vector.blockFactory());
this.vector = vector;
Expand Down Expand Up @@ -72,15 +75,15 @@ public String toString() {

@Override
public boolean isReleased() {
return released || vector.isReleased();
return super.isReleased() || vector.isReleased();
}

@Override
public void close() {
if (released || vector.isReleased()) {
throw new IllegalStateException("can't release already released block [" + this + "]");
super.close();
if (vector.isReleased()) {
throw new IllegalStateException("cannot release block [" + this + "] containing already released vector");
}
released = true;
Releasables.closeExpectNoException(vector);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -140,10 +140,7 @@ public String toString() {

@Override
public void close() {
if (released) {
throw new IllegalStateException("can't release already released block [" + this + "]");
}
released = true;
super.close();
blockFactory.adjustBreaker(-ramBytesUsed() + values.bigArraysRamBytesUsed(), true);
Releasables.closeExpectNoException(values);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@ public final class BytesRefVectorBlock extends AbstractVectorBlock implements By

private final BytesRefVector vector;

/**
* @param vector considered owned by the current block; must not be used in any other {@code Block}
*/
BytesRefVectorBlock(BytesRefVector vector) {
super(vector.getPositionCount(), vector.blockFactory());
this.vector = vector;
Expand Down Expand Up @@ -73,15 +76,15 @@ public String toString() {

@Override
public boolean isReleased() {
return released || vector.isReleased();
return super.isReleased() || vector.isReleased();
}

@Override
public void close() {
if (released || vector.isReleased()) {
throw new IllegalStateException("can't release already released block [" + this + "]");
super.close();
if (vector.isReleased()) {
throw new IllegalStateException("cannot release block [" + this + "] containing already released vector");
}
released = true;
Releasables.closeExpectNoException(vector);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -136,10 +136,7 @@ public String toString() {

@Override
public void close() {
if (released) {
throw new IllegalStateException("can't release already released block [" + this + "]");
}
released = true;
super.close();
blockFactory.adjustBreaker(-ramBytesUsed(), true);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@ public final class DoubleVectorBlock extends AbstractVectorBlock implements Doub

private final DoubleVector vector;

/**
* @param vector considered owned by the current block; must not be used in any other {@code Block}
*/
DoubleVectorBlock(DoubleVector vector) {
super(vector.getPositionCount(), vector.blockFactory());
this.vector = vector;
Expand Down Expand Up @@ -72,15 +75,15 @@ public String toString() {

@Override
public boolean isReleased() {
return released || vector.isReleased();
return super.isReleased() || vector.isReleased();
}

@Override
public void close() {
if (released || vector.isReleased()) {
throw new IllegalStateException("can't release already released block [" + this + "]");
super.close();
if (vector.isReleased()) {
throw new IllegalStateException("cannot release block [" + this + "] containing already released vector");
}
released = true;
Releasables.closeExpectNoException(vector);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -136,10 +136,7 @@ public String toString() {

@Override
public void close() {
if (released) {
throw new IllegalStateException("can't release already released block [" + this + "]");
}
released = true;
super.close();
blockFactory.adjustBreaker(-ramBytesUsed(), true);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@ public final class IntVectorBlock extends AbstractVectorBlock implements IntBloc

private final IntVector vector;

/**
* @param vector considered owned by the current block; must not be used in any other {@code Block}
*/
IntVectorBlock(IntVector vector) {
super(vector.getPositionCount(), vector.blockFactory());
this.vector = vector;
Expand Down Expand Up @@ -72,15 +75,15 @@ public String toString() {

@Override
public boolean isReleased() {
return released || vector.isReleased();
return super.isReleased() || vector.isReleased();
}

@Override
public void close() {
if (released || vector.isReleased()) {
throw new IllegalStateException("can't release already released block [" + this + "]");
super.close();
if (vector.isReleased()) {
throw new IllegalStateException("cannot release block [" + this + "] containing already released vector");
}
released = true;
Releasables.closeExpectNoException(vector);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -136,10 +136,7 @@ public String toString() {

@Override
public void close() {
if (released) {
throw new IllegalStateException("can't release already released block [" + this + "]");
}
released = true;
super.close();
blockFactory.adjustBreaker(-ramBytesUsed(), true);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@ public final class LongVectorBlock extends AbstractVectorBlock implements LongBl

private final LongVector vector;

/**
* @param vector considered owned by the current block; must not be used in any other {@code Block}
*/
LongVectorBlock(LongVector vector) {
super(vector.getPositionCount(), vector.blockFactory());
this.vector = vector;
Expand Down Expand Up @@ -72,15 +75,15 @@ public String toString() {

@Override
public boolean isReleased() {
return released || vector.isReleased();
return super.isReleased() || vector.isReleased();
}

@Override
public void close() {
if (released || vector.isReleased()) {
throw new IllegalStateException("can't release already released block [" + this + "]");
super.close();
if (vector.isReleased()) {
throw new IllegalStateException("cannot release block [" + this + "] containing already released vector");
}
released = true;
Releasables.closeExpectNoException(vector);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -236,7 +236,7 @@ public Block[] getKeys() {
}
} finally {
if (keyBlocks[keyBlocks.length - 1] == null) {
Releasables.closeExpectNoException(keyBlocks);
Releasables.closeExpectNoException(Block.releaseByDecRef(keyBlocks));
}
}
return keyBlocks;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,8 @@
import java.util.BitSet;

abstract class AbstractBlock implements Block {

private int references = 1;
private boolean released = false;
private final int positionCount;

@Nullable
Expand All @@ -23,8 +24,6 @@ abstract class AbstractBlock implements Block {

protected final BlockFactory blockFactory;

protected boolean released = false;

/**
* @param positionCount the number of values in this block
*/
Expand Down Expand Up @@ -101,4 +100,54 @@ public BlockFactory blockFactory() {
public boolean isReleased() {
return released;
}

@Override
public final void incRef() {
if (references <= 0) {
throw new IllegalStateException("can't increase refCount on already released block [" + this + "]");
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we add an assertion assert false : ... to make tests fail hard?

}
references++;
}

@Override
public final boolean tryIncRef() {
if (references <= 0) {
return false;
}
references++;
return true;
}

@Override
public final boolean decRef() {
if (references <= 0) {
throw new IllegalStateException("can't release already released block [" + this + "]");
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we add assert false .. to make tests fail hard?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hm, I tried that but it makes many tests awkward as we assert in many tests that an IllegalStateException is indeed being thrown.

I would leave out the assert false ..; when the ISE is thrown in tests, IME it is not hard to find where it was thrown , and the often used utility Releasables.expectNoException internally does assert false as well.

}

references--;

if (references <= 0) {
close();
alex-spies marked this conversation as resolved.
Show resolved Hide resolved
return true;
}
return false;
}

@Override
public final boolean hasReferences() {
return references >= 1;
}

@Override
public void close() {
alex-spies marked this conversation as resolved.
Show resolved Hide resolved
if (released) {
alex-spies marked this conversation as resolved.
Show resolved Hide resolved
throw new IllegalStateException("can't release already released block [" + this + "]");
}
if (references > 1) {
throw new IllegalStateException("can't close block that is still referenced elsewhere [" + this + "]");
}
released = true;
// In case that there is only 1 reference, allow closing without having to call decRef.
references = 0;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,14 @@
import org.elasticsearch.common.io.stream.NamedWriteable;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.core.RefCounted;
import org.elasticsearch.core.Releasable;
import org.elasticsearch.core.Releasables;
import org.elasticsearch.index.mapper.BlockLoader;

import java.util.List;

//TODO update javadoc
/**
* A Block is a columnar representation of homogenous data. It has a position (row) count, and
* various data retrieval methods for accessing the underlying data that is stored at a given
Expand All @@ -28,7 +30,7 @@
*
* <p> Block are immutable and can be passed between threads.
*/
public interface Block extends Accountable, BlockLoader.Block, NamedWriteable, Releasable {
public interface Block extends Accountable, BlockLoader.Block, NamedWriteable, RefCounted, Releasable {

/**
* {@return an efficient dense single-value view of this block}.
Expand Down Expand Up @@ -57,7 +59,8 @@ public interface Block extends Accountable, BlockLoader.Block, NamedWriteable, R
/** The block factory associated with this block. */
BlockFactory blockFactory();

/** Tells if this block has been released. A block is released by calling its {@link Block#close()} method. */
// TODO mention close as well
/** Tells if this block has been released. A block is released by calling its {@link Block#decRef()} method. */
boolean isReleased();

/**
Expand Down Expand Up @@ -222,7 +225,7 @@ static Block[] buildAll(Block.Builder... builders) {
}
} finally {
if (blocks[blocks.length - 1] == null) {
Releasables.closeExpectNoException(blocks);
Releasables.closeExpectNoException(releaseByDecRef(blocks));
}
}
return blocks;
Expand All @@ -231,7 +234,7 @@ static Block[] buildAll(Block.Builder... builders) {

/**
* A reference to a {@link Block}. This is {@link Releasable} and
* {@link Ref#close closing} it will {@link Block#close release}
* {@link Ref#close closing} it will {@link Block#decRef() release}
* the underlying {@link Block} if it wasn't borrowed from a {@link Page}.
*
* The usual way to use this is:
Expand Down Expand Up @@ -266,7 +269,7 @@ public boolean floating() {
@Override
public void close() {
if (floating()) {
block.close();
block.decRef();
}
}
}
Expand All @@ -281,4 +284,20 @@ static List<NamedWriteableRegistry.Entry> getNamedWriteables() {
ConstantNullBlock.ENTRY
);
}

static Releasable releaseByDecRef(Block b) {
return () -> {
if (b != null) {
b.decRef();
}
};
}

static Releasable[] releaseByDecRef(Block... blocks) {
alex-spies marked this conversation as resolved.
Show resolved Hide resolved
Releasable[] mappedBlocks = new Releasable[blocks.length];
for (int i = 0; i < blocks.length; i++) {
mappedBlocks[i] = releaseByDecRef(blocks[i]);
}
return mappedBlocks;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ public static Block[] fromListRow(BlockFactory blockFactory, List<Object> row, i
return blocks;
} finally {
if (success == false) {
Releasables.closeExpectNoException(blocks);
Releasables.closeExpectNoException(Block.releaseByDecRef(blocks));
}
}
}
Expand Down
Loading