Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make vectors RefCounted #103645

Merged
merged 5 commits into from
Dec 21, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -64,11 +64,7 @@ public BooleanVector filter(int... positions) {
}

@Override
public void close() {
if (released) {
throw new IllegalStateException("can't release already released vector [" + this + "]");
}
released = true;
public void closeInternal() {
values.close();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,11 +86,7 @@ public String toString() {
}

@Override
public void close() {
if (released) {
throw new IllegalStateException("can't release already released vector [" + this + "]");
}
released = true;
public void closeInternal() {
blockFactory().adjustBreaker(-ramBytesUsed() + values.bigArraysRamBytesUsed(), true);
Releasables.closeExpectNoException(values);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,13 +70,4 @@ public int hashCode() {
public String toString() {
return getClass().getSimpleName() + "[positions=" + getPositionCount() + ", value=" + value + ']';
}

@Override
public void close() {
if (released) {
throw new IllegalStateException("can't release already released vector [" + this + "]");
}
released = true;
blockFactory().adjustBreaker(-ramBytesUsed(), true);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The super method already does exactly this.

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -75,13 +75,4 @@ public int hashCode() {
public String toString() {
return getClass().getSimpleName() + "[positions=" + getPositionCount() + ", value=" + value + ']';
}

@Override
public void close() {
if (released) {
throw new IllegalStateException("can't release already released vector [" + this + "]");
}
released = true;
blockFactory().adjustBreaker(-ramBytesUsed(), true);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -70,13 +70,4 @@ public int hashCode() {
public String toString() {
return getClass().getSimpleName() + "[positions=" + getPositionCount() + ", value=" + value + ']';
}

@Override
public void close() {
if (released) {
throw new IllegalStateException("can't release already released vector [" + this + "]");
}
released = true;
blockFactory().adjustBreaker(-ramBytesUsed(), true);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -70,13 +70,4 @@ public int hashCode() {
public String toString() {
return getClass().getSimpleName() + "[positions=" + getPositionCount() + ", value=" + value + ']';
}

@Override
public void close() {
if (released) {
throw new IllegalStateException("can't release already released vector [" + this + "]");
}
released = true;
blockFactory().adjustBreaker(-ramBytesUsed(), true);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -70,13 +70,4 @@ public int hashCode() {
public String toString() {
return getClass().getSimpleName() + "[positions=" + getPositionCount() + ", value=" + value + ']';
}

@Override
public void close() {
if (released) {
throw new IllegalStateException("can't release already released vector [" + this + "]");
}
released = true;
blockFactory().adjustBreaker(-ramBytesUsed(), true);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -62,11 +62,7 @@ public DoubleVector filter(int... positions) {
}

@Override
public void close() {
if (released) {
throw new IllegalStateException("can't release already released vector [" + this + "]");
}
released = true;
public void closeInternal() {
values.close();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,11 +62,7 @@ public IntVector filter(int... positions) {
}

@Override
public void close() {
if (released) {
throw new IllegalStateException("can't release already released vector [" + this + "]");
}
released = true;
public void closeInternal() {
values.close();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,11 +62,7 @@ public LongVector filter(int... positions) {
}

@Override
public void close() {
if (released) {
throw new IllegalStateException("can't release already released vector [" + this + "]");
}
released = true;
public void closeInternal() {
values.close();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,7 @@

import java.util.BitSet;

abstract class AbstractBlock implements Block {
private int references = 1;
abstract class AbstractBlock extends AbstractNonThreadSafeRefCounted implements Block {
private final int positionCount;

@Nullable
Expand Down Expand Up @@ -104,52 +103,4 @@ public void allowPassingToDifferentDriver() {
public final boolean isReleased() {
return hasReferences() == false;
}

@Override
public final void incRef() {
if (isReleased()) {
throw new IllegalStateException("can't increase refCount on already released block [" + this + "]");
}
references++;
}

@Override
public final boolean tryIncRef() {
if (isReleased()) {
return false;
}
references++;
return true;
}

@Override
public final boolean decRef() {
if (isReleased()) {
throw new IllegalStateException("can't release already released block [" + this + "]");
}

references--;

if (references <= 0) {
closeInternal();
return true;
}
return false;
}

@Override
public final boolean hasReferences() {
return references >= 1;
}

@Override
public final void close() {
decRef();
}

/**
* This is called when the number of references reaches zero.
* It must release any resources held by the block (adjusting circuit breakers if needed).
*/
protected abstract void closeInternal();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

package org.elasticsearch.compute.data;

import org.elasticsearch.core.RefCounted;
import org.elasticsearch.core.Releasable;

/**
* Releasable, non-threadsafe version of {@link org.elasticsearch.core.AbstractRefCounted}.
* Calls to {@link AbstractNonThreadSafeRefCounted#decRef()} and {@link AbstractNonThreadSafeRefCounted#close()} are equivalent.
*/
abstract class AbstractNonThreadSafeRefCounted implements RefCounted, Releasable {
private int references = 1;

@Override
public final void incRef() {
if (hasReferences() == false) {
throw new IllegalStateException("can't increase refCount on already released object [" + this + "]");
}
references++;
}

@Override
public final boolean tryIncRef() {
if (hasReferences() == false) {
return false;
}
references++;
return true;
}

@Override
public final boolean decRef() {
if (hasReferences() == false) {
throw new IllegalStateException("can't release already released object [" + this + "]");
}

references--;

if (references <= 0) {
closeInternal();
return true;
}
return false;
}

@Override
public final boolean hasReferences() {
return references >= 1;
}

@Override
public final void close() {
decRef();
}

/**
* This is called when the number of references reaches zero.
* This is where resources should be released (adjusting circuit breakers if needed).
*/
protected abstract void closeInternal();
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,10 @@
/**
* A dense Vector of single values.
*/
abstract class AbstractVector implements Vector {
abstract class AbstractVector extends AbstractNonThreadSafeRefCounted implements Vector {

private final int positionCount;
private BlockFactory blockFactory;
protected boolean released;

protected AbstractVector(int positionCount, BlockFactory blockFactory) {
this.positionCount = positionCount;
Expand All @@ -41,16 +40,12 @@ public void allowPassingToDifferentDriver() {
}

@Override
public void close() {
if (released) {
throw new IllegalStateException("can't release already released vector [" + this + "]");
}
released = true;
protected void closeInternal() {
blockFactory.adjustBreaker(-ramBytesUsed(), true);
}

@Override
public final boolean isReleased() {
return released;
return hasReferences() == false;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -228,8 +228,7 @@ public void allowPassingToDifferentDriver() {
}

@Override
public void close() {
released = true;
public void closeInternal() {
Releasables.closeExpectNoException(shards.asBlock(), segments.asBlock(), docs.asBlock()); // Ugh! we always close blocks
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,13 @@
package org.elasticsearch.compute.data;

import org.apache.lucene.util.Accountable;
import org.elasticsearch.core.RefCounted;
import org.elasticsearch.core.Releasable;

/**
* A dense Vector of single values.
*/
public interface Vector extends Accountable, Releasable {
public interface Vector extends Accountable, RefCounted, Releasable {

/**
* {@return Returns a new Block containing this vector.}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,11 +117,7 @@ $endif$

$if(BytesRef)$
@Override
public void close() {
if (released) {
throw new IllegalStateException("can't release already released vector [" + this + "]");
}
released = true;
public void closeInternal() {
blockFactory().adjustBreaker(-ramBytesUsed() + values.bigArraysRamBytesUsed(), true);
Releasables.closeExpectNoException(values);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,11 +71,7 @@ public final class $Type$BigArrayVector extends AbstractVector implements $Type$
}

@Override
public void close() {
if (released) {
throw new IllegalStateException("can't release already released vector [" + this + "]");
}
released = true;
public void closeInternal() {
values.close();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,13 +94,4 @@ $endif$
public String toString() {
return getClass().getSimpleName() + "[positions=" + getPositionCount() + ", value=" + value + ']';
}

@Override
public void close() {
if (released) {
throw new IllegalStateException("can't release already released vector [" + this + "]");
}
released = true;
blockFactory().adjustBreaker(-ramBytesUsed(), true);
}
}
Loading