Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ESQL: Make blocks ref counted #100408

Merged
merged 42 commits into from
Nov 20, 2023
Merged
Show file tree
Hide file tree
Changes from 12 commits
Commits
Show all changes
42 commits
Select commit Hold shift + click to select a range
9e89137
Implement references counted block refs
alex-spies Oct 6, 2023
73ade8f
Use RefCounted
alex-spies Oct 6, 2023
522a47e
Move BlockRef into Block.Ref
alex-spies Oct 6, 2023
b55fd8a
Update tests
alex-spies Oct 6, 2023
5844b81
Revert all changes
alex-spies Oct 9, 2023
044bf9a
Make Block extend RefCounted
alex-spies Oct 9, 2023
dad012c
Avoid deep copies in EvalOperator
alex-spies Oct 9, 2023
d629ca4
Merge remote-tracking branch 'upstream/main' into esql-block-ref-coun…
alex-spies Oct 9, 2023
f8df502
Fix RefCounted javadoc
alex-spies Oct 9, 2023
d304a95
Improve Block::RefCounted implementation
alex-spies Oct 9, 2023
8f3df46
Add ref counting tests for vector and array blocks
alex-spies Oct 9, 2023
44ab382
Test remaining block types
alex-spies Oct 9, 2023
f25644d
Update docs/changelog/100408.yaml
alex-spies Oct 9, 2023
d84a4ed
Fix projection bug, simplify projections
alex-spies Oct 9, 2023
0112040
Fix projection for empty page
alex-spies Oct 10, 2023
8f8522d
Merge remote-tracking branch 'upstream/main' into esql-block-ref-coun…
alex-spies Oct 10, 2023
60305b7
Remove obsolete test
alex-spies Oct 10, 2023
9801f1b
Merge remote-tracking branch 'upstream/main' into esql-block-ref-coun…
alex-spies Nov 13, 2023
b0a05be
Fix multiply owned BooleanBlock in exchange sink
alex-spies Nov 14, 2023
7168c12
Improve javadoc and illegal state messages
alex-spies Nov 14, 2023
803d654
Remove newPageAndRelease
alex-spies Nov 14, 2023
57f5977
Spotless
alex-spies Nov 14, 2023
95466bb
Merge remote-tracking branch 'upstream/main' into esql-block-ref-coun…
alex-spies Nov 14, 2023
99da217
Revert error message
alex-spies Nov 14, 2023
b61e50e
Merge remote-tracking branch 'upstream/main' into esql-block-ref-coun…
alex-spies Nov 14, 2023
b33629d
Align logic with AbstractRefCounted
alex-spies Nov 14, 2023
28970ab
Cleanup
alex-spies Nov 14, 2023
46d3044
Address potential partial releases
alex-spies Nov 14, 2023
42a8559
Fix test
alex-spies Nov 14, 2023
98f6629
Make decRef and close equivalent again
alex-spies Nov 15, 2023
92ab31f
Remove releaseByDecRef
alex-spies Nov 15, 2023
2d3aa70
Update javadoc for Block
alex-spies Nov 15, 2023
73110c0
Address Nhat's feedback
alex-spies Nov 17, 2023
061563d
Merge remote-tracking branch 'upstream/main' into esql-block-ref-coun…
alex-spies Nov 17, 2023
63109c0
Align DocBlock and DocVector with others
alex-spies Nov 20, 2023
be66835
Add test for closing block's underlying vector
alex-spies Nov 20, 2023
1b7f2ff
Improve tests
alex-spies Nov 20, 2023
c4e906e
Throw IllegalStateException more consistently
alex-spies Nov 20, 2023
0d3d081
Merge remote-tracking branch 'upstream/main' into esql-block-ref-coun…
alex-spies Nov 20, 2023
4fe6e08
Check for already released vector only once
alex-spies Nov 20, 2023
9593102
spotless
alex-spies Nov 20, 2023
25794d6
Merge remote-tracking branch 'upstream/main' into esql-block-ref-coun…
alex-spies Nov 20, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ public interface RefCounted {
void incRef();

/**
* Tries to increment the refCount of this instance. This method will return {@code true} iff the refCount was
* Tries to increment the refCount of this instance. This method will return {@code true} iff the refCount was successfully incremented.
alex-spies marked this conversation as resolved.
Show resolved Hide resolved
*
* @see #decRef()
* @see #incRef()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,10 +110,9 @@ public String toString() {

@Override
public void close() {
if (released) {
throw new IllegalStateException("can't release already released block [" + this + "]");
super.close();
if (hasReferences() == false) {
blockFactory.adjustBreaker(-ramBytesUsed(), true);
}
released = true;
blockFactory.adjustBreaker(-ramBytesUsed(), true);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -72,10 +72,9 @@ public String toString() {

@Override
public void close() {
if (released) {
throw new IllegalStateException("can't release already released block [" + this + "]");
super.close();
if (hasReferences() == false) {
Releasables.closeExpectNoException(vector);
}
released = true;
Releasables.closeExpectNoException(vector);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -112,11 +112,10 @@ public String toString() {

@Override
public void close() {
if (released) {
throw new IllegalStateException("can't release already released block [" + this + "]");
super.close();
if (hasReferences() == false) {
blockFactory.adjustBreaker(-ramBytesUsed() + values.bigArraysRamBytesUsed(), true);
Releasables.closeExpectNoException(values);
}
released = true;
blockFactory.adjustBreaker(-ramBytesUsed() + values.bigArraysRamBytesUsed(), true);
Releasables.closeExpectNoException(values);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -73,10 +73,9 @@ public String toString() {

@Override
public void close() {
if (released) {
throw new IllegalStateException("can't release already released block [" + this + "]");
super.close();
if (hasReferences() == false) {
Releasables.closeExpectNoException(vector);
}
released = true;
Releasables.closeExpectNoException(vector);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -110,10 +110,9 @@ public String toString() {

@Override
public void close() {
if (released) {
throw new IllegalStateException("can't release already released block [" + this + "]");
super.close();
if (hasReferences() == false) {
blockFactory.adjustBreaker(-ramBytesUsed(), true);
}
released = true;
blockFactory.adjustBreaker(-ramBytesUsed(), true);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -72,10 +72,9 @@ public String toString() {

@Override
public void close() {
if (released) {
throw new IllegalStateException("can't release already released block [" + this + "]");
super.close();
if (hasReferences() == false) {
Releasables.closeExpectNoException(vector);
}
released = true;
Releasables.closeExpectNoException(vector);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -132,9 +132,9 @@ private void appendValues(StringBuilder sb) {

@Override
public void close() {
if (block.isReleased()) {
throw new IllegalStateException("can't release already released block [" + this + "]");
super.close();
if (hasReferences() == false) {
Releasables.closeExpectNoException(block);
}
Releasables.closeExpectNoException(block);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -135,9 +135,9 @@ private void appendValues(StringBuilder sb) {

@Override
public void close() {
if (block.isReleased()) {
throw new IllegalStateException("can't release already released block [" + this + "]");
super.close();
if (hasReferences() == false) {
Releasables.closeExpectNoException(block);
}
Releasables.closeExpectNoException(block);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -132,9 +132,9 @@ private void appendValues(StringBuilder sb) {

@Override
public void close() {
if (block.isReleased()) {
throw new IllegalStateException("can't release already released block [" + this + "]");
super.close();
if (hasReferences() == false) {
Releasables.closeExpectNoException(block);
}
Releasables.closeExpectNoException(block);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -132,9 +132,9 @@ private void appendValues(StringBuilder sb) {

@Override
public void close() {
if (block.isReleased()) {
throw new IllegalStateException("can't release already released block [" + this + "]");
super.close();
if (hasReferences() == false) {
Releasables.closeExpectNoException(block);
}
Releasables.closeExpectNoException(block);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -132,9 +132,9 @@ private void appendValues(StringBuilder sb) {

@Override
public void close() {
if (block.isReleased()) {
throw new IllegalStateException("can't release already released block [" + this + "]");
super.close();
if (hasReferences() == false) {
Releasables.closeExpectNoException(block);
}
Releasables.closeExpectNoException(block);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -110,10 +110,9 @@ public String toString() {

@Override
public void close() {
if (released) {
throw new IllegalStateException("can't release already released block [" + this + "]");
super.close();
if (hasReferences() == false) {
blockFactory.adjustBreaker(-ramBytesUsed(), true);
}
released = true;
blockFactory.adjustBreaker(-ramBytesUsed(), true);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -72,10 +72,9 @@ public String toString() {

@Override
public void close() {
if (released) {
throw new IllegalStateException("can't release already released block [" + this + "]");
super.close();
if (hasReferences() == false) {
Releasables.closeExpectNoException(vector);
}
released = true;
Releasables.closeExpectNoException(vector);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -110,10 +110,9 @@ public String toString() {

@Override
public void close() {
if (released) {
throw new IllegalStateException("can't release already released block [" + this + "]");
super.close();
if (hasReferences() == false) {
blockFactory.adjustBreaker(-ramBytesUsed(), true);
}
released = true;
blockFactory.adjustBreaker(-ramBytesUsed(), true);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -72,10 +72,9 @@ public String toString() {

@Override
public void close() {
if (released) {
throw new IllegalStateException("can't release already released block [" + this + "]");
super.close();
if (hasReferences() == false) {
Releasables.closeExpectNoException(vector);
}
released = true;
Releasables.closeExpectNoException(vector);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
import java.util.BitSet;

abstract class AbstractBlock implements Block {

private int references = 1;
private final int positionCount;

@Nullable
Expand All @@ -23,8 +23,6 @@ abstract class AbstractBlock implements Block {

protected final BlockFactory blockFactory;

protected boolean released = false;

/**
* @param positionCount the number of values in this block
*/
Expand Down Expand Up @@ -99,6 +97,47 @@ public BlockFactory blockFactory() {

@Override
public boolean isReleased() {
return released;
return hasReferences() == false;
}

@Override
public final void incRef() {
if (references <= 0) {
throw new IllegalStateException("can't increase refCount on already released block [" + this + "]");
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we add an assertion assert false : ... to make tests fail hard?

}
references++;
}

@Override
public final boolean tryIncRef() {
if (references <= 0) {
return false;
}
references++;
return true;
}

@Override
public final boolean decRef() {
close();
alex-spies marked this conversation as resolved.
Show resolved Hide resolved

return references <= 0;
}

@Override
public final boolean hasReferences() {
return references >= 1;
}

/**
* When overriding this, {@code super.close()} must be called to ensure correct reference counting.
* Used in {@link AbstractBlock#decRef()}.
*/
@Override
public void close() {
alex-spies marked this conversation as resolved.
Show resolved Hide resolved
if (references <= 0) {
throw new IllegalStateException("can't release already released block [" + this + "]");
}
references--;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
import java.util.Arrays;

abstract class AbstractFilterBlock implements Block {

private int references = 1;
protected final int[] positions;

private final Block block;
Expand Down Expand Up @@ -125,4 +125,45 @@ protected final boolean assertPosition(int position) {
: "illegal position, " + position + ", position count:" + getPositionCount();
return true;
}

@Override
public final void incRef() {
if (references <= 0) {
throw new IllegalStateException("can't increase refCount on already released block [" + this + "]");
}
references++;
}

@Override
public final boolean tryIncRef() {
if (references <= 0) {
return false;
}
references++;
return true;
}

@Override
public final boolean decRef() {
close();

return references <= 0;
}

@Override
public final boolean hasReferences() {
return references >= 1;
}

/**
* When overriding this, {@code super.close()} must be called to ensure correct reference counting.
* Used in {@link AbstractFilterBlock#decRef()}.
*/
@Override
public void close() {
if (references <= 0) {
throw new IllegalStateException("can't release already released block [" + this + "]");
}
references--;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import org.elasticsearch.common.io.stream.NamedWriteable;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.core.RefCounted;
import org.elasticsearch.core.Releasable;

import java.util.List;
Expand All @@ -33,7 +34,7 @@
*
* <p> Block are immutable and can be passed between threads.
*/
public interface Block extends Accountable, NamedWriteable, Releasable {
public interface Block extends Accountable, NamedWriteable, RefCounted, Releasable {

/**
* {@return an efficient dense single-value view of this block}.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,11 +128,10 @@ public String toString() {

@Override
public void close() {
if (isReleased()) {
throw new IllegalStateException("can't release already released block [" + this + "]");
super.close();
if (hasReferences() == false) {
blockFactory.adjustBreaker(-ramBytesUsed(), true);
}
released = true;
blockFactory.adjustBreaker(-ramBytesUsed(), true);
}

static class Builder implements Block.Builder {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,11 +72,10 @@ public long ramBytesUsed() {

@Override
public void close() {
if (released) {
throw new IllegalStateException("can't release already released block [" + this + "]");
super.close();
if (hasReferences() == false) {
Releasables.closeExpectNoException(vector);
}
released = true;
Releasables.closeExpectNoException(vector);
}

/**
Expand Down
Loading