Skip to content

Commit

Permalink
Introduce newPageAndRelease method that handles clean-up of blocks that
Browse files Browse the repository at this point in the history
 are not-used when creating a new page
  • Loading branch information
costin committed Oct 5, 2023
1 parent 63de76d commit 7932c05
Show file tree
Hide file tree
Showing 3 changed files with 35 additions and 34 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -89,9 +89,7 @@ private Page(Page prev, Block[] toAdd) {
this.positionCount = prev.positionCount;

this.blocks = Arrays.copyOf(prev.blocks, prev.blocks.length + toAdd.length);
for (int i = 0; i < toAdd.length; i++) {
this.blocks[prev.blocks.length + i] = toAdd[i];
}
System.arraycopy(toAdd, 0, this.blocks, prev.blocks.length, toAdd.length);
}

public Page(StreamInput in) throws IOException {
Expand Down Expand Up @@ -224,12 +222,42 @@ public void writeTo(StreamOutput out) throws IOException {
public void releaseBlocks() {
blocksReleased = true;
// blocks can be used as multiple columns
var set = new IdentityHashMap<Block, Object>();
var map = new IdentityHashMap<Block, Object>(mapSize(blocks.length));
var DUMMY = new Object();
for (Block b : blocks) {
if (set.putIfAbsent(b, DUMMY) == null) {
if (map.putIfAbsent(b, DUMMY) == null) {
Releasables.closeExpectNoException(b);
}
}
}

/**
* Returns a Page from the given blocks and closes all blocks that are not included, from the current Page.
* That is, allows clean-up of the current page _after_ external manipulation of the blocks.
* The current page should no longer be used and be considered closed.
*/
public Page newPageAndRelease(Block... keep) {
blocksReleased = true;

var newPage = new Page(positionCount, keep);
var map = new IdentityHashMap<Block, Object>(mapSize(keep.length));
var DUMMY = new Object();

// create identity set
for (Block b : keep) {
map.putIfAbsent(b, DUMMY);
}
// close blocks that have been left out
for (Block b : blocks) {
if (map.containsKey(b) == false) {
Releasables.closeExpectNoException(b);
}
}

return newPage;
}

static int mapSize(int expectedSize) {
return expectedSize < 2 ? expectedSize + 1 : (int) (expectedSize / 0.75 + 1.0);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,7 @@
import org.elasticsearch.compute.data.Block;
import org.elasticsearch.compute.data.Page;
import org.elasticsearch.core.Releasable;
import org.elasticsearch.core.Releasables;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
Expand Down Expand Up @@ -70,30 +68,7 @@ protected Page process(Page page) {
var block = page.getBlock(source);
blocks[b++] = block;
}
closeUnused(page, blocks);
return new Page(page.getPositionCount(), blocks);
}

/**
* Close all {@link Block}s that are in {@code page} but are not in {@code blocks}.
*/
public static void closeUnused(Page page, Block[] blocks) {
List<Releasable> blocksToRelease = new ArrayList<>();

for (int i = 0; i < page.getBlockCount(); i++) {
boolean used = false;
var current = page.getBlock(i);
for (int j = 0; j < blocks.length; j++) {
if (current == blocks[j]) {
used = true;
break;
}
}
if (used == false) {
blocksToRelease.add(current);
}
}
Releasables.close(blocksToRelease);
return page.newPageAndRelease(blocks);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@
import org.elasticsearch.compute.operator.Operator;
import org.elasticsearch.compute.operator.Operator.OperatorFactory;
import org.elasticsearch.compute.operator.OutputOperator.OutputOperatorFactory;
import org.elasticsearch.compute.operator.ProjectOperator;
import org.elasticsearch.compute.operator.RowOperator.RowOperatorFactory;
import org.elasticsearch.compute.operator.ShowOperator;
import org.elasticsearch.compute.operator.SinkOperator;
Expand Down Expand Up @@ -334,8 +333,7 @@ private static Function<Page, Page> alignPageToAttributes(List<Attribute> attrs,
for (int i = 0; i < blocks.length; i++) {
blocks[i] = p.getBlock(mappedPosition[i]);
}
ProjectOperator.closeUnused(p, blocks);
return new Page(blocks);
return p.newPageAndRelease(blocks);
} : Function.identity();

return transformer;
Expand Down

0 comments on commit 7932c05

Please sign in to comment.