Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce local block factory #102901

Merged
merged 7 commits into from
Dec 5, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions docs/changelog/102901.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 102901
summary: Introduce local block factory
area: ES|QL
type: enhancement
issues: []
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ public boolean getBoolean(int valueIndex) {

@Override
public BooleanBlock filter(int... positions) {
try (var builder = blockFactory.newBooleanBlockBuilder(positions.length)) {
try (var builder = blockFactory().newBooleanBlockBuilder(positions.length)) {
for (int pos : positions) {
if (isNull(pos)) {
builder.appendNull();
Expand Down Expand Up @@ -84,7 +84,7 @@ public BooleanBlock expand() {
return this;
}
// TODO use reference counting to share the values
try (var builder = blockFactory.newBooleanBlockBuilder(firstValueIndexes[getPositionCount()])) {
try (var builder = blockFactory().newBooleanBlockBuilder(firstValueIndexes[getPositionCount()])) {
for (int pos = 0; pos < getPositionCount(); pos++) {
if (isNull(pos)) {
builder.appendNull();
Expand Down Expand Up @@ -137,6 +137,6 @@ public String toString() {

@Override
public void closeInternal() {
blockFactory.adjustBreaker(-ramBytesUsed(), true);
blockFactory().adjustBreaker(-ramBytesUsed(), true);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ public boolean isConstant() {

@Override
public BooleanVector filter(int... positions) {
try (BooleanVector.Builder builder = blockFactory.newBooleanVectorBuilder(positions.length)) {
try (BooleanVector.Builder builder = blockFactory().newBooleanVectorBuilder(positions.length)) {
for (int pos : positions) {
builder.appendBoolean(values[pos]);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ public long ramBytesUsed() {

@Override
public BooleanVector filter(int... positions) {
var blockFactory = blockFactory();
final BitArray filtered = new BitArray(positions.length, blockFactory.bigArrays());
for (int i = 0; i < positions.length; i++) {
if (values.get(positions[i])) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,4 +83,9 @@ public void closeInternal() {
assert (vector.isReleased() == false) : "can't release block [" + this + "] containing already released vector";
Releasables.closeExpectNoException(vector);
}

@Override
public void allowPassingToDifferentDriver() {
vector.allowPassingToDifferentDriver();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ public BytesRef getBytesRef(int valueIndex, BytesRef dest) {
@Override
public BytesRefBlock filter(int... positions) {
final BytesRef scratch = new BytesRef();
try (var builder = blockFactory.newBytesRefBlockBuilder(positions.length)) {
try (var builder = blockFactory().newBytesRefBlockBuilder(positions.length)) {
for (int pos : positions) {
if (isNull(pos)) {
builder.appendNull();
Expand Down Expand Up @@ -88,7 +88,7 @@ public BytesRefBlock expand() {
}
// TODO use reference counting to share the values
final BytesRef scratch = new BytesRef();
try (var builder = blockFactory.newBytesRefBlockBuilder(firstValueIndexes[getPositionCount()])) {
try (var builder = blockFactory().newBytesRefBlockBuilder(firstValueIndexes[getPositionCount()])) {
for (int pos = 0; pos < getPositionCount(); pos++) {
if (isNull(pos)) {
builder.appendNull();
Expand Down Expand Up @@ -141,7 +141,7 @@ public String toString() {

@Override
public void closeInternal() {
blockFactory.adjustBreaker(-ramBytesUsed() + values.bigArraysRamBytesUsed(), true);
blockFactory().adjustBreaker(-ramBytesUsed() + values.bigArraysRamBytesUsed(), true);
Releasables.closeExpectNoException(values);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ public boolean isConstant() {
@Override
public BytesRefVector filter(int... positions) {
final var scratch = new BytesRef();
try (BytesRefVector.Builder builder = blockFactory.newBytesRefVectorBuilder(positions.length)) {
try (BytesRefVector.Builder builder = blockFactory().newBytesRefVectorBuilder(positions.length)) {
for (int pos : positions) {
builder.appendBytesRef(values.get(pos, scratch));
}
Expand Down Expand Up @@ -98,7 +98,7 @@ public void close() {
throw new IllegalStateException("can't release already released vector [" + this + "]");
}
released = true;
blockFactory.adjustBreaker(-ramBytesUsed() + values.bigArraysRamBytesUsed(), true);
blockFactory().adjustBreaker(-ramBytesUsed() + values.bigArraysRamBytesUsed(), true);
Releasables.closeExpectNoException(values);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -84,4 +84,9 @@ public void closeInternal() {
assert (vector.isReleased() == false) : "can't release block [" + this + "] containing already released vector";
Releasables.closeExpectNoException(vector);
}

@Override
public void allowPassingToDifferentDriver() {
vector.allowPassingToDifferentDriver();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,6 @@ public void close() {
throw new IllegalStateException("can't release already released vector [" + this + "]");
}
released = true;
blockFactory.adjustBreaker(-ramBytesUsed(), true);
blockFactory().adjustBreaker(-ramBytesUsed(), true);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,6 @@ public void close() {
throw new IllegalStateException("can't release already released vector [" + this + "]");
}
released = true;
blockFactory.adjustBreaker(-ramBytesUsed(), true);
blockFactory().adjustBreaker(-ramBytesUsed(), true);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,6 @@ public void close() {
throw new IllegalStateException("can't release already released vector [" + this + "]");
}
released = true;
blockFactory.adjustBreaker(-ramBytesUsed(), true);
blockFactory().adjustBreaker(-ramBytesUsed(), true);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,6 @@ public void close() {
throw new IllegalStateException("can't release already released vector [" + this + "]");
}
released = true;
blockFactory.adjustBreaker(-ramBytesUsed(), true);
blockFactory().adjustBreaker(-ramBytesUsed(), true);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,6 @@ public void close() {
throw new IllegalStateException("can't release already released vector [" + this + "]");
}
released = true;
blockFactory.adjustBreaker(-ramBytesUsed(), true);
blockFactory().adjustBreaker(-ramBytesUsed(), true);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ public double getDouble(int valueIndex) {

@Override
public DoubleBlock filter(int... positions) {
try (var builder = blockFactory.newDoubleBlockBuilder(positions.length)) {
try (var builder = blockFactory().newDoubleBlockBuilder(positions.length)) {
for (int pos : positions) {
if (isNull(pos)) {
builder.appendNull();
Expand Down Expand Up @@ -84,7 +84,7 @@ public DoubleBlock expand() {
return this;
}
// TODO use reference counting to share the values
try (var builder = blockFactory.newDoubleBlockBuilder(firstValueIndexes[getPositionCount()])) {
try (var builder = blockFactory().newDoubleBlockBuilder(firstValueIndexes[getPositionCount()])) {
for (int pos = 0; pos < getPositionCount(); pos++) {
if (isNull(pos)) {
builder.appendNull();
Expand Down Expand Up @@ -137,6 +137,6 @@ public String toString() {

@Override
public void closeInternal() {
blockFactory.adjustBreaker(-ramBytesUsed(), true);
blockFactory().adjustBreaker(-ramBytesUsed(), true);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ public boolean isConstant() {

@Override
public DoubleVector filter(int... positions) {
try (DoubleVector.Builder builder = blockFactory.newDoubleVectorBuilder(positions.length)) {
try (DoubleVector.Builder builder = blockFactory().newDoubleVectorBuilder(positions.length)) {
for (int pos : positions) {
builder.appendDouble(values[pos]);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ public long ramBytesUsed() {

@Override
public DoubleVector filter(int... positions) {
var blockFactory = blockFactory();
final DoubleArray filtered = blockFactory.bigArrays().newDoubleArray(positions.length, true);
for (int i = 0; i < positions.length; i++) {
filtered.set(i, values.get(positions[i]));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,4 +83,9 @@ public void closeInternal() {
assert (vector.isReleased() == false) : "can't release block [" + this + "] containing already released vector";
Releasables.closeExpectNoException(vector);
}

@Override
public void allowPassingToDifferentDriver() {
vector.allowPassingToDifferentDriver();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ public int getInt(int valueIndex) {

@Override
public IntBlock filter(int... positions) {
try (var builder = blockFactory.newIntBlockBuilder(positions.length)) {
try (var builder = blockFactory().newIntBlockBuilder(positions.length)) {
for (int pos : positions) {
if (isNull(pos)) {
builder.appendNull();
Expand Down Expand Up @@ -84,7 +84,7 @@ public IntBlock expand() {
return this;
}
// TODO use reference counting to share the values
try (var builder = blockFactory.newIntBlockBuilder(firstValueIndexes[getPositionCount()])) {
try (var builder = blockFactory().newIntBlockBuilder(firstValueIndexes[getPositionCount()])) {
for (int pos = 0; pos < getPositionCount(); pos++) {
if (isNull(pos)) {
builder.appendNull();
Expand Down Expand Up @@ -137,6 +137,6 @@ public String toString() {

@Override
public void closeInternal() {
blockFactory.adjustBreaker(-ramBytesUsed(), true);
blockFactory().adjustBreaker(-ramBytesUsed(), true);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ public boolean isConstant() {

@Override
public IntVector filter(int... positions) {
try (IntVector.Builder builder = blockFactory.newIntVectorBuilder(positions.length)) {
try (IntVector.Builder builder = blockFactory().newIntVectorBuilder(positions.length)) {
for (int pos : positions) {
builder.appendInt(values[pos]);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ public long ramBytesUsed() {

@Override
public IntVector filter(int... positions) {
var blockFactory = blockFactory();
final IntArray filtered = blockFactory.bigArrays().newIntArray(positions.length, true);
for (int i = 0; i < positions.length; i++) {
filtered.set(i, values.get(positions[i]));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,4 +83,9 @@ public void closeInternal() {
assert (vector.isReleased() == false) : "can't release block [" + this + "] containing already released vector";
Releasables.closeExpectNoException(vector);
}

@Override
public void allowPassingToDifferentDriver() {
vector.allowPassingToDifferentDriver();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ public long getLong(int valueIndex) {

@Override
public LongBlock filter(int... positions) {
try (var builder = blockFactory.newLongBlockBuilder(positions.length)) {
try (var builder = blockFactory().newLongBlockBuilder(positions.length)) {
for (int pos : positions) {
if (isNull(pos)) {
builder.appendNull();
Expand Down Expand Up @@ -84,7 +84,7 @@ public LongBlock expand() {
return this;
}
// TODO use reference counting to share the values
try (var builder = blockFactory.newLongBlockBuilder(firstValueIndexes[getPositionCount()])) {
try (var builder = blockFactory().newLongBlockBuilder(firstValueIndexes[getPositionCount()])) {
for (int pos = 0; pos < getPositionCount(); pos++) {
if (isNull(pos)) {
builder.appendNull();
Expand Down Expand Up @@ -137,6 +137,6 @@ public String toString() {

@Override
public void closeInternal() {
blockFactory.adjustBreaker(-ramBytesUsed(), true);
blockFactory().adjustBreaker(-ramBytesUsed(), true);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ public boolean isConstant() {

@Override
public LongVector filter(int... positions) {
try (LongVector.Builder builder = blockFactory.newLongVectorBuilder(positions.length)) {
try (LongVector.Builder builder = blockFactory().newLongVectorBuilder(positions.length)) {
for (int pos : positions) {
builder.appendLong(values[pos]);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ public long ramBytesUsed() {

@Override
public LongVector filter(int... positions) {
var blockFactory = blockFactory();
final LongArray filtered = blockFactory.bigArrays().newLongArray(positions.length, true);
for (int i = 0; i < positions.length; i++) {
filtered.set(i, values.get(positions[i]));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,4 +83,9 @@ public void closeInternal() {
assert (vector.isReleased() == false) : "can't release block [" + this + "] containing already released vector";
Releasables.closeExpectNoException(vector);
}

@Override
public void allowPassingToDifferentDriver() {
vector.allowPassingToDifferentDriver();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ abstract class AbstractBlock implements Block {
@Nullable
protected final BitSet nullsMask;

protected final BlockFactory blockFactory;
private BlockFactory blockFactory;

/**
* @param positionCount the number of values in this block
Expand Down Expand Up @@ -95,6 +95,11 @@ public BlockFactory blockFactory() {
return blockFactory;
}

@Override
public void allowPassingToDifferentDriver() {
blockFactory = blockFactory.parent();
}

@Override
public boolean isReleased() {
return hasReferences() == false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
abstract class AbstractVector implements Vector {

private final int positionCount;
protected final BlockFactory blockFactory;
private BlockFactory blockFactory;
protected boolean released;

protected AbstractVector(int positionCount, BlockFactory blockFactory) {
Expand All @@ -35,6 +35,11 @@ public BlockFactory blockFactory() {
return blockFactory;
}

@Override
public void allowPassingToDifferentDriver() {
blockFactory = blockFactory.parent();
}

@Override
public void close() {
if (released) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,8 +62,17 @@ public interface Block extends Accountable, BlockLoader.Block, NamedWriteable, R
ElementType elementType();

/** The block factory associated with this block. */
// TODO: renaming this to owning blockFactory once we pass blockFactory for filter and expand
BlockFactory blockFactory();

/**
* Before passing a Block to another Driver, it is necessary to switch the owning block factory to its parent, which is associated
* with the global circuit breaker. This ensures that when the new driver releases this Block, it returns memory directly to the
* parent block factory instead of the local block factory of this Block. This is important because the local block factory is
* not thread safe and doesn't support simultaneous access by more than one thread.
*/
void allowPassingToDifferentDriver();

/**
* Tells if this block has been released. A block is released by calling its {@link Block#close()} or {@link Block#decRef()} methods.
* @return true iff the block's reference count is zero.
Expand Down Expand Up @@ -102,6 +111,7 @@ public interface Block extends Accountable, BlockLoader.Block, NamedWriteable, R
* The new block may hold a reference to this block, increasing this block's reference count.
* @param positions the positions to retain
* @return a filtered block
* TODO: pass BlockFactory
*/
Block filter(int... positions);

Expand Down Expand Up @@ -145,6 +155,7 @@ default boolean mvSortedAscending() {
/**
* Expand multivalued fields into one row per value. Returns the same block if there aren't any multivalued
* fields to expand. The returned block needs to be closed by the caller to release the block's resources.
* TODO: pass BlockFactory
*/
Block expand();

Expand Down
Loading