Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ESQL: Make blocks ref counted #100408

Merged
merged 42 commits into from
Nov 20, 2023
Merged
Show file tree
Hide file tree
Changes from 32 commits
Commits
Show all changes
42 commits
Select commit Hold shift + click to select a range
9e89137
Implement references counted block refs
alex-spies Oct 6, 2023
73ade8f
Use RefCounted
alex-spies Oct 6, 2023
522a47e
Move BlockRef into Block.Ref
alex-spies Oct 6, 2023
b55fd8a
Update tests
alex-spies Oct 6, 2023
5844b81
Revert all changes
alex-spies Oct 9, 2023
044bf9a
Make Block extend RefCounted
alex-spies Oct 9, 2023
dad012c
Avoid deep copies in EvalOperator
alex-spies Oct 9, 2023
d629ca4
Merge remote-tracking branch 'upstream/main' into esql-block-ref-coun…
alex-spies Oct 9, 2023
f8df502
Fix RefCounted javadoc
alex-spies Oct 9, 2023
d304a95
Improve Block::RefCounted implementation
alex-spies Oct 9, 2023
8f3df46
Add ref counting tests for vector and array blocks
alex-spies Oct 9, 2023
44ab382
Test remaining block types
alex-spies Oct 9, 2023
f25644d
Update docs/changelog/100408.yaml
alex-spies Oct 9, 2023
d84a4ed
Fix projection bug, simplify projections
alex-spies Oct 9, 2023
0112040
Fix projection for empty page
alex-spies Oct 10, 2023
8f8522d
Merge remote-tracking branch 'upstream/main' into esql-block-ref-coun…
alex-spies Oct 10, 2023
60305b7
Remove obsolete test
alex-spies Oct 10, 2023
9801f1b
Merge remote-tracking branch 'upstream/main' into esql-block-ref-coun…
alex-spies Nov 13, 2023
b0a05be
Fix multiply owned BooleanBlock in exchange sink
alex-spies Nov 14, 2023
7168c12
Improve javadoc and illegal state messages
alex-spies Nov 14, 2023
803d654
Remove newPageAndRelease
alex-spies Nov 14, 2023
57f5977
Spotless
alex-spies Nov 14, 2023
95466bb
Merge remote-tracking branch 'upstream/main' into esql-block-ref-coun…
alex-spies Nov 14, 2023
99da217
Revert error message
alex-spies Nov 14, 2023
b61e50e
Merge remote-tracking branch 'upstream/main' into esql-block-ref-coun…
alex-spies Nov 14, 2023
b33629d
Align logic with AbstractRefCounted
alex-spies Nov 14, 2023
28970ab
Cleanup
alex-spies Nov 14, 2023
46d3044
Address potential partial releases
alex-spies Nov 14, 2023
42a8559
Fix test
alex-spies Nov 14, 2023
98f6629
Make decRef and close equivalent again
alex-spies Nov 15, 2023
92ab31f
Remove releaseByDecRef
alex-spies Nov 15, 2023
2d3aa70
Update javadoc for Block
alex-spies Nov 15, 2023
73110c0
Address Nhat's feedback
alex-spies Nov 17, 2023
061563d
Merge remote-tracking branch 'upstream/main' into esql-block-ref-coun…
alex-spies Nov 17, 2023
63109c0
Align DocBlock and DocVector with others
alex-spies Nov 20, 2023
be66835
Add test for closing block's underlying vector
alex-spies Nov 20, 2023
1b7f2ff
Improve tests
alex-spies Nov 20, 2023
c4e906e
Throw IllegalStateException more consistently
alex-spies Nov 20, 2023
0d3d081
Merge remote-tracking branch 'upstream/main' into esql-block-ref-coun…
alex-spies Nov 20, 2023
4fe6e08
Check for already released vector only once
alex-spies Nov 20, 2023
9593102
spotless
alex-spies Nov 20, 2023
25794d6
Merge remote-tracking branch 'upstream/main' into esql-block-ref-coun…
alex-spies Nov 20, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions docs/changelog/100408.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 100408
summary: "ESQL: Make blocks ref counted"
area: ES|QL
type: enhancement
issues: []
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ public interface RefCounted {
void incRef();

/**
* Tries to increment the refCount of this instance. This method will return {@code true} iff the refCount was
* Tries to increment the refCount of this instance. This method will return {@code true} iff the refCount was successfully incremented.
alex-spies marked this conversation as resolved.
Show resolved Hide resolved
*
* @see #decRef()
* @see #incRef()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,11 +135,7 @@ public String toString() {
}

@Override
public void close() {
if (released) {
throw new IllegalStateException("can't release already released block [" + this + "]");
}
released = true;
public void closeInternal() {
blockFactory.adjustBreaker(-ramBytesUsed(), true);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@ public final class BooleanVectorBlock extends AbstractVectorBlock implements Boo

private final BooleanVector vector;

/**
* @param vector considered owned by the current block; must not be used in any other {@code Block}
*/
BooleanVectorBlock(BooleanVector vector) {
super(vector.getPositionCount(), vector.blockFactory());
this.vector = vector;
Expand Down Expand Up @@ -72,15 +75,14 @@ public String toString() {

@Override
public boolean isReleased() {
return released || vector.isReleased();
return super.isReleased() || vector.isReleased();
}

@Override
public void close() {
if (released || vector.isReleased()) {
throw new IllegalStateException("can't release already released block [" + this + "]");
public void closeInternal() {
if (vector.isReleased()) {
throw new IllegalStateException("cannot release block [" + this + "] containing already released vector");
}
released = true;
Releasables.closeExpectNoException(vector);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -139,11 +139,7 @@ public String toString() {
}

@Override
public void close() {
if (released) {
throw new IllegalStateException("can't release already released block [" + this + "]");
}
released = true;
public void closeInternal() {
blockFactory.adjustBreaker(-ramBytesUsed() + values.bigArraysRamBytesUsed(), true);
Releasables.closeExpectNoException(values);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@ public final class BytesRefVectorBlock extends AbstractVectorBlock implements By

private final BytesRefVector vector;

/**
* @param vector considered owned by the current block; must not be used in any other {@code Block}
*/
BytesRefVectorBlock(BytesRefVector vector) {
super(vector.getPositionCount(), vector.blockFactory());
this.vector = vector;
Expand Down Expand Up @@ -73,15 +76,14 @@ public String toString() {

@Override
public boolean isReleased() {
return released || vector.isReleased();
return super.isReleased() || vector.isReleased();
}

@Override
public void close() {
if (released || vector.isReleased()) {
throw new IllegalStateException("can't release already released block [" + this + "]");
public void closeInternal() {
if (vector.isReleased()) {
throw new IllegalStateException("cannot release block [" + this + "] containing already released vector");
}
released = true;
Releasables.closeExpectNoException(vector);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -135,11 +135,7 @@ public String toString() {
}

@Override
public void close() {
if (released) {
throw new IllegalStateException("can't release already released block [" + this + "]");
}
released = true;
public void closeInternal() {
blockFactory.adjustBreaker(-ramBytesUsed(), true);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@ public final class DoubleVectorBlock extends AbstractVectorBlock implements Doub

private final DoubleVector vector;

/**
* @param vector considered owned by the current block; must not be used in any other {@code Block}
*/
DoubleVectorBlock(DoubleVector vector) {
super(vector.getPositionCount(), vector.blockFactory());
this.vector = vector;
Expand Down Expand Up @@ -72,15 +75,14 @@ public String toString() {

@Override
public boolean isReleased() {
return released || vector.isReleased();
return super.isReleased() || vector.isReleased();
}

@Override
public void close() {
if (released || vector.isReleased()) {
throw new IllegalStateException("can't release already released block [" + this + "]");
public void closeInternal() {
if (vector.isReleased()) {
throw new IllegalStateException("cannot release block [" + this + "] containing already released vector");
}
released = true;
Releasables.closeExpectNoException(vector);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -135,11 +135,7 @@ public String toString() {
}

@Override
public void close() {
if (released) {
throw new IllegalStateException("can't release already released block [" + this + "]");
}
released = true;
public void closeInternal() {
blockFactory.adjustBreaker(-ramBytesUsed(), true);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@ public final class IntVectorBlock extends AbstractVectorBlock implements IntBloc

private final IntVector vector;

/**
* @param vector considered owned by the current block; must not be used in any other {@code Block}
*/
IntVectorBlock(IntVector vector) {
super(vector.getPositionCount(), vector.blockFactory());
this.vector = vector;
Expand Down Expand Up @@ -72,15 +75,14 @@ public String toString() {

@Override
public boolean isReleased() {
return released || vector.isReleased();
return super.isReleased() || vector.isReleased();
}

@Override
public void close() {
if (released || vector.isReleased()) {
throw new IllegalStateException("can't release already released block [" + this + "]");
public void closeInternal() {
if (vector.isReleased()) {
throw new IllegalStateException("cannot release block [" + this + "] containing already released vector");
}
released = true;
Releasables.closeExpectNoException(vector);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -135,11 +135,7 @@ public String toString() {
}

@Override
public void close() {
if (released) {
throw new IllegalStateException("can't release already released block [" + this + "]");
}
released = true;
public void closeInternal() {
blockFactory.adjustBreaker(-ramBytesUsed(), true);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@ public final class LongVectorBlock extends AbstractVectorBlock implements LongBl

private final LongVector vector;

/**
* @param vector considered owned by the current block; must not be used in any other {@code Block}
*/
LongVectorBlock(LongVector vector) {
super(vector.getPositionCount(), vector.blockFactory());
this.vector = vector;
Expand Down Expand Up @@ -72,15 +75,14 @@ public String toString() {

@Override
public boolean isReleased() {
return released || vector.isReleased();
return super.isReleased() || vector.isReleased();
}

@Override
public void close() {
if (released || vector.isReleased()) {
throw new IllegalStateException("can't release already released block [" + this + "]");
public void closeInternal() {
if (vector.isReleased()) {
throw new IllegalStateException("cannot release block [" + this + "] containing already released vector");
}
released = true;
Releasables.closeExpectNoException(vector);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
import java.util.BitSet;

abstract class AbstractBlock implements Block {

private int references = 1;
private final int positionCount;

@Nullable
Expand All @@ -23,8 +23,6 @@ abstract class AbstractBlock implements Block {

protected final BlockFactory blockFactory;

protected boolean released = false;

/**
* @param positionCount the number of values in this block
*/
Expand Down Expand Up @@ -99,6 +97,54 @@ public BlockFactory blockFactory() {

@Override
public boolean isReleased() {
return released;
return hasReferences() == false;
}

@Override
public final void incRef() {
if (references <= 0) {
throw new IllegalStateException("can't increase refCount on already released block [" + this + "]");
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we add an assertion assert false : ... to make tests fail hard?

}
references++;
}

@Override
public final boolean tryIncRef() {
if (references <= 0) {
return false;
}
references++;
return true;
}

@Override
public final boolean decRef() {
if (references <= 0) {
throw new IllegalStateException("can't release already released block [" + this + "]");
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we add assert false .. to make tests fail hard?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hm, I tried that but it makes many tests awkward as we assert in many tests that an IllegalStateException is indeed being thrown.

I would leave out the assert false ..; when the ISE is thrown in tests, IME it is not hard to find where it was thrown , and the often used utility Releasables.expectNoException internally does assert false as well.

}

references--;

if (references <= 0) {
closeInternal();
return true;
}
return false;
}

@Override
public final boolean hasReferences() {
return references >= 1;
}

@Override
public final void close() {
decRef();
}

/**
* This is called when the number of references reaches zero.
* It must release any resources held by the block (adjusting circuit breakers if needed).
*/
protected abstract void closeInternal();
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import org.elasticsearch.common.io.stream.NamedWriteable;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.core.RefCounted;
import org.elasticsearch.core.Releasable;
import org.elasticsearch.core.Releasables;
import org.elasticsearch.index.mapper.BlockLoader;
Expand All @@ -23,12 +24,19 @@
* position.
*
* <p> Blocks can represent various shapes of underlying data. A Block can represent either sparse
* or dense data. A Block can represent either single or multi valued data. A Block that represents
* or dense data. A Block can represent either single or multivalued data. A Block that represents
* dense single-valued data can be viewed as a {@link Vector}.
*
* <p> Block are immutable and can be passed between threads.
* <p> Blocks are reference counted; to make a shallow copy of a block (e.g. if a {@link Page} contains
* the same column twice), use {@link Block#incRef()}. Before a block is garbage collected,
* {@link Block#close()} must be called to release a block's resources; it must also be called one
* additional time for each time {@link Block#incRef()} was called. Calls to {@link Block#decRef()} and
* {@link Block#close()} are equivalent.
*
* <p> Block are immutable and can be passed between threads as long as no two threads hold a reference to
* the same block at the same time.
*/
public interface Block extends Accountable, BlockLoader.Block, NamedWriteable, Releasable {
public interface Block extends Accountable, BlockLoader.Block, NamedWriteable, RefCounted, Releasable {

/**
* {@return an efficient dense single-value view of this block}.
Expand Down Expand Up @@ -57,14 +65,15 @@ public interface Block extends Accountable, BlockLoader.Block, NamedWriteable, R
/** The block factory associated with this block. */
BlockFactory blockFactory();

/** Tells if this block has been released. A block is released by calling its {@link Block#close()} method. */
/**
* Tells if this block has been released. A block is released by calling its {@link Block#close()} or {@link Block#decRef()} methods.
* @return true iff the block's reference count is zero.
* */
boolean isReleased();

/**
* Returns true if the value stored at the given position is null, false otherwise.
*
* @param position the position
* @return true or false
* @return true if the value stored at the given position is null, false otherwise
*/
boolean isNull(int position);

Expand All @@ -91,6 +100,7 @@ public interface Block extends Accountable, BlockLoader.Block, NamedWriteable, R

/**
* Creates a new block that only exposes the positions provided. Materialization of the selected positions is avoided.
* The new block may hold a reference to this block, increasing this block's reference count.
* @param positions the positions to retain
* @return a filtered block
*/
Expand Down Expand Up @@ -137,6 +147,7 @@ default boolean mvSortedAscending() {
* Expand multivalued fields into one row per value. Returns the
* block if there aren't any multivalued fields to expand.
*/
// TODO: We should use refcounting instead of either deep copies or returning the same identical block.
Block expand();

/**
Expand Down Expand Up @@ -231,7 +242,7 @@ static Block[] buildAll(Block.Builder... builders) {

/**
* A reference to a {@link Block}. This is {@link Releasable} and
* {@link Ref#close closing} it will {@link Block#close release}
* {@link Ref#close closing} it will {@link Block#close() release}
* the underlying {@link Block} if it wasn't borrowed from a {@link Page}.
*
* The usual way to use this is:
Expand All @@ -248,6 +259,7 @@ static Block[] buildAll(Block.Builder... builders) {
* @param block the block referenced
* @param containedIn the page containing it or null, if it is "free floating".
*/
// We probably want to remove this; instead, we could incRef and decRef consistently in the EvalOperator.
record Ref(Block block, @Nullable Page containedIn) implements Releasable {
/**
* Create a "free floating" {@link Ref}.
Expand Down
Loading