Skip to content

Commit

Permalink
ESQL: Make blocks ref counted (#100408)
Browse files Browse the repository at this point in the history
This allows to replace deep copying of blocks by simply calling
Block::incRef - the block then has to be closed (or decRefed) one
additional time for each call to incRef (and tryIncRef, if successfull).
  • Loading branch information
alex-spies authored Nov 20, 2023
1 parent 3468730 commit 92fb778
Show file tree
Hide file tree
Showing 30 changed files with 313 additions and 151 deletions.
5 changes: 5 additions & 0 deletions docs/changelog/100408.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 100408
summary: "ESQL: Make blocks ref counted"
area: ES|QL
type: enhancement
issues: []
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ public interface RefCounted {
void incRef();

/**
* Tries to increment the refCount of this instance. This method will return {@code true} iff the refCount was
* Tries to increment the refCount of this instance. This method will return {@code true} iff the refCount was successfully incremented.
*
* @see #decRef()
* @see #incRef()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,11 +135,7 @@ public String toString() {
}

@Override
public void close() {
if (released) {
throw new IllegalStateException("can't release already released block [" + this + "]");
}
released = true;
public void closeInternal() {
blockFactory.adjustBreaker(-ramBytesUsed(), true);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@ public final class BooleanVectorBlock extends AbstractVectorBlock implements Boo

private final BooleanVector vector;

/**
* @param vector considered owned by the current block; must not be used in any other {@code Block}
*/
BooleanVectorBlock(BooleanVector vector) {
super(vector.getPositionCount(), vector.blockFactory());
this.vector = vector;
Expand Down Expand Up @@ -72,15 +75,12 @@ public String toString() {

@Override
public boolean isReleased() {
return released || vector.isReleased();
return super.isReleased() || vector.isReleased();
}

@Override
public void close() {
if (released || vector.isReleased()) {
throw new IllegalStateException("can't release already released block [" + this + "]");
}
released = true;
public void closeInternal() {
assert (vector.isReleased() == false) : "can't release block [" + this + "] containing already released vector";
Releasables.closeExpectNoException(vector);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -139,11 +139,7 @@ public String toString() {
}

@Override
public void close() {
if (released) {
throw new IllegalStateException("can't release already released block [" + this + "]");
}
released = true;
public void closeInternal() {
blockFactory.adjustBreaker(-ramBytesUsed() + values.bigArraysRamBytesUsed(), true);
Releasables.closeExpectNoException(values);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@ public final class BytesRefVectorBlock extends AbstractVectorBlock implements By

private final BytesRefVector vector;

/**
* @param vector considered owned by the current block; must not be used in any other {@code Block}
*/
BytesRefVectorBlock(BytesRefVector vector) {
super(vector.getPositionCount(), vector.blockFactory());
this.vector = vector;
Expand Down Expand Up @@ -73,15 +76,12 @@ public String toString() {

@Override
public boolean isReleased() {
return released || vector.isReleased();
return super.isReleased() || vector.isReleased();
}

@Override
public void close() {
if (released || vector.isReleased()) {
throw new IllegalStateException("can't release already released block [" + this + "]");
}
released = true;
public void closeInternal() {
assert (vector.isReleased() == false) : "can't release block [" + this + "] containing already released vector";
Releasables.closeExpectNoException(vector);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -135,11 +135,7 @@ public String toString() {
}

@Override
public void close() {
if (released) {
throw new IllegalStateException("can't release already released block [" + this + "]");
}
released = true;
public void closeInternal() {
blockFactory.adjustBreaker(-ramBytesUsed(), true);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@ public final class DoubleVectorBlock extends AbstractVectorBlock implements Doub

private final DoubleVector vector;

/**
* @param vector considered owned by the current block; must not be used in any other {@code Block}
*/
DoubleVectorBlock(DoubleVector vector) {
super(vector.getPositionCount(), vector.blockFactory());
this.vector = vector;
Expand Down Expand Up @@ -72,15 +75,12 @@ public String toString() {

@Override
public boolean isReleased() {
return released || vector.isReleased();
return super.isReleased() || vector.isReleased();
}

@Override
public void close() {
if (released || vector.isReleased()) {
throw new IllegalStateException("can't release already released block [" + this + "]");
}
released = true;
public void closeInternal() {
assert (vector.isReleased() == false) : "can't release block [" + this + "] containing already released vector";
Releasables.closeExpectNoException(vector);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -135,11 +135,7 @@ public String toString() {
}

@Override
public void close() {
if (released) {
throw new IllegalStateException("can't release already released block [" + this + "]");
}
released = true;
public void closeInternal() {
blockFactory.adjustBreaker(-ramBytesUsed(), true);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@ public final class IntVectorBlock extends AbstractVectorBlock implements IntBloc

private final IntVector vector;

/**
* @param vector considered owned by the current block; must not be used in any other {@code Block}
*/
IntVectorBlock(IntVector vector) {
super(vector.getPositionCount(), vector.blockFactory());
this.vector = vector;
Expand Down Expand Up @@ -72,15 +75,12 @@ public String toString() {

@Override
public boolean isReleased() {
return released || vector.isReleased();
return super.isReleased() || vector.isReleased();
}

@Override
public void close() {
if (released || vector.isReleased()) {
throw new IllegalStateException("can't release already released block [" + this + "]");
}
released = true;
public void closeInternal() {
assert (vector.isReleased() == false) : "can't release block [" + this + "] containing already released vector";
Releasables.closeExpectNoException(vector);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -135,11 +135,7 @@ public String toString() {
}

@Override
public void close() {
if (released) {
throw new IllegalStateException("can't release already released block [" + this + "]");
}
released = true;
public void closeInternal() {
blockFactory.adjustBreaker(-ramBytesUsed(), true);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@ public final class LongVectorBlock extends AbstractVectorBlock implements LongBl

private final LongVector vector;

/**
* @param vector considered owned by the current block; must not be used in any other {@code Block}
*/
LongVectorBlock(LongVector vector) {
super(vector.getPositionCount(), vector.blockFactory());
this.vector = vector;
Expand Down Expand Up @@ -72,15 +75,12 @@ public String toString() {

@Override
public boolean isReleased() {
return released || vector.isReleased();
return super.isReleased() || vector.isReleased();
}

@Override
public void close() {
if (released || vector.isReleased()) {
throw new IllegalStateException("can't release already released block [" + this + "]");
}
released = true;
public void closeInternal() {
assert (vector.isReleased() == false) : "can't release block [" + this + "] containing already released vector";
Releasables.closeExpectNoException(vector);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
import java.util.BitSet;

abstract class AbstractBlock implements Block {

private int references = 1;
private final int positionCount;

@Nullable
Expand All @@ -23,8 +23,6 @@ abstract class AbstractBlock implements Block {

protected final BlockFactory blockFactory;

protected boolean released = false;

/**
* @param positionCount the number of values in this block
*/
Expand Down Expand Up @@ -99,6 +97,54 @@ public BlockFactory blockFactory() {

@Override
public boolean isReleased() {
return released;
return hasReferences() == false;
}

@Override
public final void incRef() {
if (isReleased()) {
throw new IllegalStateException("can't increase refCount on already released block [" + this + "]");
}
references++;
}

@Override
public final boolean tryIncRef() {
if (isReleased()) {
return false;
}
references++;
return true;
}

@Override
public final boolean decRef() {
if (isReleased()) {
throw new IllegalStateException("can't release already released block [" + this + "]");
}

references--;

if (references <= 0) {
closeInternal();
return true;
}
return false;
}

@Override
public final boolean hasReferences() {
return references >= 1;
}

@Override
public final void close() {
decRef();
}

/**
* This is called when the number of references reaches zero.
* It must release any resources held by the block (adjusting circuit breakers if needed).
*/
protected abstract void closeInternal();
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import org.elasticsearch.common.io.stream.NamedWriteable;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.core.RefCounted;
import org.elasticsearch.core.Releasable;
import org.elasticsearch.core.Releasables;
import org.elasticsearch.index.mapper.BlockLoader;
Expand All @@ -23,12 +24,19 @@
* position.
*
* <p> Blocks can represent various shapes of underlying data. A Block can represent either sparse
* or dense data. A Block can represent either single or multi valued data. A Block that represents
* or dense data. A Block can represent either single or multivalued data. A Block that represents
* dense single-valued data can be viewed as a {@link Vector}.
*
* <p> Block are immutable and can be passed between threads.
* <p> Blocks are reference counted; to make a shallow copy of a block (e.g. if a {@link Page} contains
* the same column twice), use {@link Block#incRef()}. Before a block is garbage collected,
* {@link Block#close()} must be called to release a block's resources; it must also be called one
* additional time for each time {@link Block#incRef()} was called. Calls to {@link Block#decRef()} and
* {@link Block#close()} are equivalent.
*
* <p> Block are immutable and can be passed between threads as long as no two threads hold a reference to
* the same block at the same time.
*/
public interface Block extends Accountable, BlockLoader.Block, NamedWriteable, Releasable {
public interface Block extends Accountable, BlockLoader.Block, NamedWriteable, RefCounted, Releasable {

/**
* {@return an efficient dense single-value view of this block}.
Expand Down Expand Up @@ -57,14 +65,15 @@ public interface Block extends Accountable, BlockLoader.Block, NamedWriteable, R
/** The block factory associated with this block. */
BlockFactory blockFactory();

/** Tells if this block has been released. A block is released by calling its {@link Block#close()} method. */
/**
* Tells if this block has been released. A block is released by calling its {@link Block#close()} or {@link Block#decRef()} methods.
* @return true iff the block's reference count is zero.
* */
boolean isReleased();

/**
* Returns true if the value stored at the given position is null, false otherwise.
*
* @param position the position
* @return true or false
* @return true if the value stored at the given position is null, false otherwise
*/
boolean isNull(int position);

Expand All @@ -91,6 +100,7 @@ public interface Block extends Accountable, BlockLoader.Block, NamedWriteable, R

/**
* Creates a new block that only exposes the positions provided. Materialization of the selected positions is avoided.
* The new block may hold a reference to this block, increasing this block's reference count.
* @param positions the positions to retain
* @return a filtered block
*/
Expand Down Expand Up @@ -137,6 +147,7 @@ default boolean mvSortedAscending() {
* Expand multivalued fields into one row per value. Returns the
* block if there aren't any multivalued fields to expand.
*/
// TODO: We should use refcounting instead of either deep copies or returning the same identical block.
Block expand();

/**
Expand Down Expand Up @@ -231,7 +242,7 @@ static Block[] buildAll(Block.Builder... builders) {

/**
* A reference to a {@link Block}. This is {@link Releasable} and
* {@link Ref#close closing} it will {@link Block#close release}
* {@link Ref#close closing} it will {@link Block#close() release}
* the underlying {@link Block} if it wasn't borrowed from a {@link Page}.
*
* The usual way to use this is:
Expand All @@ -248,6 +259,7 @@ static Block[] buildAll(Block.Builder... builders) {
* @param block the block referenced
* @param containedIn the page containing it or null, if it is "free floating".
*/
// We probably want to remove this; instead, we could incRef and decRef consistently in the EvalOperator.
record Ref(Block block, @Nullable Page containedIn) implements Releasable {
/**
* Create a "free floating" {@link Ref}.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,11 +126,7 @@ public String toString() {
}

@Override
public void close() {
if (isReleased()) {
throw new IllegalStateException("can't release already released block [" + this + "]");
}
released = true;
public void closeInternal() {
blockFactory.adjustBreaker(-ramBytesUsed(), true);
}

Expand Down
Loading

0 comments on commit 92fb778

Please sign in to comment.