From 7932c05c23204895f2a53b0412b3f491b1339a0b Mon Sep 17 00:00:00 2001 From: Costin Leau Date: Thu, 5 Oct 2023 16:30:33 -0700 Subject: [PATCH] Introduce newPageAndRelease method that handles clean-up of blocks that are not-used when creating a new page --- .../org/elasticsearch/compute/data/Page.java | 38 ++++++++++++++++--- .../compute/operator/ProjectOperator.java | 27 +------------ .../esql/planner/LocalExecutionPlanner.java | 4 +- 3 files changed, 35 insertions(+), 34 deletions(-) diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/Page.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/Page.java index 2c3f1ec5864ae..5daee6704b515 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/Page.java +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/Page.java @@ -89,9 +89,7 @@ private Page(Page prev, Block[] toAdd) { this.positionCount = prev.positionCount; this.blocks = Arrays.copyOf(prev.blocks, prev.blocks.length + toAdd.length); - for (int i = 0; i < toAdd.length; i++) { - this.blocks[prev.blocks.length + i] = toAdd[i]; - } + System.arraycopy(toAdd, 0, this.blocks, prev.blocks.length, toAdd.length); } public Page(StreamInput in) throws IOException { @@ -224,12 +222,42 @@ public void writeTo(StreamOutput out) throws IOException { public void releaseBlocks() { blocksReleased = true; // blocks can be used as multiple columns - var set = new IdentityHashMap(); + var map = new IdentityHashMap(mapSize(blocks.length)); var DUMMY = new Object(); for (Block b : blocks) { - if (set.putIfAbsent(b, DUMMY) == null) { + if (map.putIfAbsent(b, DUMMY) == null) { Releasables.closeExpectNoException(b); } } } + + /** + * Returns a Page from the given blocks and closes all blocks that are not included, from the current Page. + * That is, allows clean-up of the current page _after_ external manipulation of the blocks. + * The current page should no longer be used and be considered closed. + */ + public Page newPageAndRelease(Block... keep) { + blocksReleased = true; + + var newPage = new Page(positionCount, keep); + var map = new IdentityHashMap(mapSize(keep.length)); + var DUMMY = new Object(); + + // create identity set + for (Block b : keep) { + map.putIfAbsent(b, DUMMY); + } + // close blocks that have been left out + for (Block b : blocks) { + if (map.containsKey(b) == false) { + Releasables.closeExpectNoException(b); + } + } + + return newPage; + } + + static int mapSize(int expectedSize) { + return expectedSize < 2 ? expectedSize + 1 : (int) (expectedSize / 0.75 + 1.0); + } } diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/ProjectOperator.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/ProjectOperator.java index b4fb830aed641..6e52a5351de58 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/ProjectOperator.java +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/ProjectOperator.java @@ -10,9 +10,7 @@ import org.elasticsearch.compute.data.Block; import org.elasticsearch.compute.data.Page; import org.elasticsearch.core.Releasable; -import org.elasticsearch.core.Releasables; -import java.util.ArrayList; import java.util.Arrays; import java.util.HashSet; import java.util.List; @@ -70,30 +68,7 @@ protected Page process(Page page) { var block = page.getBlock(source); blocks[b++] = block; } - closeUnused(page, blocks); - return new Page(page.getPositionCount(), blocks); - } - - /** - * Close all {@link Block}s that are in {@code page} but are not in {@code blocks}. - */ - public static void closeUnused(Page page, Block[] blocks) { - List blocksToRelease = new ArrayList<>(); - - for (int i = 0; i < page.getBlockCount(); i++) { - boolean used = false; - var current = page.getBlock(i); - for (int j = 0; j < blocks.length; j++) { - if (current == blocks[j]) { - used = true; - break; - } - } - if (used == false) { - blocksToRelease.add(current); - } - } - Releasables.close(blocksToRelease); + return page.newPageAndRelease(blocks); } @Override diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/LocalExecutionPlanner.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/LocalExecutionPlanner.java index df7b921f6e585..1c26de4a599f5 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/LocalExecutionPlanner.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/LocalExecutionPlanner.java @@ -30,7 +30,6 @@ import org.elasticsearch.compute.operator.Operator; import org.elasticsearch.compute.operator.Operator.OperatorFactory; import org.elasticsearch.compute.operator.OutputOperator.OutputOperatorFactory; -import org.elasticsearch.compute.operator.ProjectOperator; import org.elasticsearch.compute.operator.RowOperator.RowOperatorFactory; import org.elasticsearch.compute.operator.ShowOperator; import org.elasticsearch.compute.operator.SinkOperator; @@ -334,8 +333,7 @@ private static Function alignPageToAttributes(List attrs, for (int i = 0; i < blocks.length; i++) { blocks[i] = p.getBlock(mappedPosition[i]); } - ProjectOperator.closeUnused(p, blocks); - return new Page(blocks); + return p.newPageAndRelease(blocks); } : Function.identity(); return transformer;