From a7cb6ab7c0e90d52e7bf3bff9276c53003966b5c Mon Sep 17 00:00:00 2001 From: Costin Leau Date: Thu, 5 Oct 2023 13:59:31 -0700 Subject: [PATCH 1/5] ESQL: Page shouldn't close a block twice Page now takes into account that a block can be used in multiple positions (such as the same column aliased under multiple names). Relates #100001 Fix #100365 Fix #100356 --- .../java/org/elasticsearch/compute/data/Page.java | 14 +++++++++++--- .../src/main/resources/rename.csv-spec | 9 +++------ .../xpack/esql/action/EsqlActionIT.java | 2 -- 3 files changed, 14 insertions(+), 11 deletions(-) diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/Page.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/Page.java index 18f3ed7ba61bf..2c3f1ec5864ae 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/Page.java +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/Page.java @@ -14,6 +14,7 @@ import java.io.IOException; import java.util.Arrays; +import java.util.IdentityHashMap; import java.util.Objects; /** @@ -169,8 +170,8 @@ public Page appendPage(Page toAdd) { @Override public int hashCode() { int result = Objects.hash(positionCount); - for (int i = 0; i < blocks.length; i++) { - result = 31 * result + Objects.hashCode(blocks[i]); + for (Block block : blocks) { + result = 31 * result + Objects.hashCode(block); } return result; } @@ -222,6 +223,13 @@ public void writeTo(StreamOutput out) throws IOException { */ public void releaseBlocks() { blocksReleased = true; - Releasables.closeExpectNoException(blocks); + // blocks can be used as multiple columns + var set = new IdentityHashMap(); + var DUMMY = new Object(); + for (Block b : blocks) { + if (set.putIfAbsent(b, DUMMY) == null) { + Releasables.closeExpectNoException(b); + } + } } } diff --git a/x-pack/plugin/esql/qa/testFixtures/src/main/resources/rename.csv-spec b/x-pack/plugin/esql/qa/testFixtures/src/main/resources/rename.csv-spec index 44cf92254298b..5e5c70e3cbba7 100644 --- a/x-pack/plugin/esql/qa/testFixtures/src/main/resources/rename.csv-spec +++ b/x-pack/plugin/esql/qa/testFixtures/src/main/resources/rename.csv-spec @@ -84,8 +84,7 @@ x:integer | z:integer 4 | 8 ; -# AwaitsFix https://github.com/elastic/elasticsearch/issues/100356 -renameProjectEval-Ignore +renameProjectEval from employees | sort emp_no | eval y = languages | rename languages as x | keep x, y | eval x2 = x + 1 | eval y2 = y + 2 | limit 3; x:integer | y:integer | x2:integer | y2:integer @@ -94,8 +93,7 @@ x:integer | y:integer | x2:integer | y2:integer 4 | 4 | 5 | 6 ; -# AwaitsFix https://github.com/elastic/elasticsearch/issues/100356 -duplicateProjectEval-Ignore +duplicateProjectEval from employees | eval y = languages, x = languages | keep x, y | eval x2 = x + 1 | eval y2 = y + 2 | limit 3; x:integer | y:integer | x2:integer | y2:integer @@ -160,8 +158,7 @@ y:integer | x:date 10061 | 1985-09-17T00:00:00.000Z ; -# AwaitsFix https://github.com/elastic/elasticsearch/issues/100356 -renameIntertwinedWithSort-Ignore +renameIntertwinedWithSort FROM employees | eval x = salary | rename x as y | rename y as x | sort x | rename x as y | limit 10; avg_worked_seconds:l | birth_date:date | emp_no:i | first_name:s | gender:s | height:d | height.float:d | height.half_float:d | height.scaled_float:d| hire_date:date | is_rehired:bool | job_positions:s | languages:i | languages.byte:i | languages.long:l | languages.short:i | last_name:s | salary:i | salary_change:d | salary_change.int:i | salary_change.keyword:s | salary_change.long:l | still_hired:bool | y:i diff --git a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/EsqlActionIT.java b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/EsqlActionIT.java index 46d85746c3990..fd4fe13b9c1b1 100644 --- a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/EsqlActionIT.java +++ b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/EsqlActionIT.java @@ -7,7 +7,6 @@ package org.elasticsearch.xpack.esql.action; -import org.apache.lucene.tests.util.LuceneTestCase; import org.elasticsearch.Build; import org.elasticsearch.action.admin.indices.alias.IndicesAliasesRequest; import org.elasticsearch.action.bulk.BulkRequestBuilder; @@ -67,7 +66,6 @@ import static org.hamcrest.Matchers.lessThanOrEqualTo; import static org.hamcrest.Matchers.nullValue; -@LuceneTestCase.AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/100365") public class EsqlActionIT extends AbstractEsqlIntegTestCase { long epoch = System.currentTimeMillis(); From 63de76df9515756a1798b2538f8be82dd458e4a5 Mon Sep 17 00:00:00 2001 From: Costin Leau Date: Fri, 6 Oct 2023 00:04:55 +0300 Subject: [PATCH 2/5] Update docs/changelog/100370.yaml --- docs/changelog/100370.yaml | 7 +++++++ 1 file changed, 7 insertions(+) create mode 100644 docs/changelog/100370.yaml diff --git a/docs/changelog/100370.yaml b/docs/changelog/100370.yaml new file mode 100644 index 0000000000000..3e2e1b762c654 --- /dev/null +++ b/docs/changelog/100370.yaml @@ -0,0 +1,7 @@ +pr: 100370 +summary: "ESQL: Page shouldn't close a block twice" +area: ES|QL +type: bug +issues: + - 100356 + - 100365 From 7932c05c23204895f2a53b0412b3f491b1339a0b Mon Sep 17 00:00:00 2001 From: Costin Leau Date: Thu, 5 Oct 2023 16:30:33 -0700 Subject: [PATCH 3/5] Introduce newPageAndRelease method that handles clean-up of blocks that are not-used when creating a new page --- .../org/elasticsearch/compute/data/Page.java | 38 ++++++++++++++++--- .../compute/operator/ProjectOperator.java | 27 +------------ .../esql/planner/LocalExecutionPlanner.java | 4 +- 3 files changed, 35 insertions(+), 34 deletions(-) diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/Page.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/Page.java index 2c3f1ec5864ae..5daee6704b515 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/Page.java +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/Page.java @@ -89,9 +89,7 @@ private Page(Page prev, Block[] toAdd) { this.positionCount = prev.positionCount; this.blocks = Arrays.copyOf(prev.blocks, prev.blocks.length + toAdd.length); - for (int i = 0; i < toAdd.length; i++) { - this.blocks[prev.blocks.length + i] = toAdd[i]; - } + System.arraycopy(toAdd, 0, this.blocks, prev.blocks.length, toAdd.length); } public Page(StreamInput in) throws IOException { @@ -224,12 +222,42 @@ public void writeTo(StreamOutput out) throws IOException { public void releaseBlocks() { blocksReleased = true; // blocks can be used as multiple columns - var set = new IdentityHashMap(); + var map = new IdentityHashMap(mapSize(blocks.length)); var DUMMY = new Object(); for (Block b : blocks) { - if (set.putIfAbsent(b, DUMMY) == null) { + if (map.putIfAbsent(b, DUMMY) == null) { Releasables.closeExpectNoException(b); } } } + + /** + * Returns a Page from the given blocks and closes all blocks that are not included, from the current Page. + * That is, allows clean-up of the current page _after_ external manipulation of the blocks. + * The current page should no longer be used and be considered closed. + */ + public Page newPageAndRelease(Block... keep) { + blocksReleased = true; + + var newPage = new Page(positionCount, keep); + var map = new IdentityHashMap(mapSize(keep.length)); + var DUMMY = new Object(); + + // create identity set + for (Block b : keep) { + map.putIfAbsent(b, DUMMY); + } + // close blocks that have been left out + for (Block b : blocks) { + if (map.containsKey(b) == false) { + Releasables.closeExpectNoException(b); + } + } + + return newPage; + } + + static int mapSize(int expectedSize) { + return expectedSize < 2 ? expectedSize + 1 : (int) (expectedSize / 0.75 + 1.0); + } } diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/ProjectOperator.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/ProjectOperator.java index b4fb830aed641..6e52a5351de58 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/ProjectOperator.java +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/ProjectOperator.java @@ -10,9 +10,7 @@ import org.elasticsearch.compute.data.Block; import org.elasticsearch.compute.data.Page; import org.elasticsearch.core.Releasable; -import org.elasticsearch.core.Releasables; -import java.util.ArrayList; import java.util.Arrays; import java.util.HashSet; import java.util.List; @@ -70,30 +68,7 @@ protected Page process(Page page) { var block = page.getBlock(source); blocks[b++] = block; } - closeUnused(page, blocks); - return new Page(page.getPositionCount(), blocks); - } - - /** - * Close all {@link Block}s that are in {@code page} but are not in {@code blocks}. - */ - public static void closeUnused(Page page, Block[] blocks) { - List blocksToRelease = new ArrayList<>(); - - for (int i = 0; i < page.getBlockCount(); i++) { - boolean used = false; - var current = page.getBlock(i); - for (int j = 0; j < blocks.length; j++) { - if (current == blocks[j]) { - used = true; - break; - } - } - if (used == false) { - blocksToRelease.add(current); - } - } - Releasables.close(blocksToRelease); + return page.newPageAndRelease(blocks); } @Override diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/LocalExecutionPlanner.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/LocalExecutionPlanner.java index df7b921f6e585..1c26de4a599f5 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/LocalExecutionPlanner.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/LocalExecutionPlanner.java @@ -30,7 +30,6 @@ import org.elasticsearch.compute.operator.Operator; import org.elasticsearch.compute.operator.Operator.OperatorFactory; import org.elasticsearch.compute.operator.OutputOperator.OutputOperatorFactory; -import org.elasticsearch.compute.operator.ProjectOperator; import org.elasticsearch.compute.operator.RowOperator.RowOperatorFactory; import org.elasticsearch.compute.operator.ShowOperator; import org.elasticsearch.compute.operator.SinkOperator; @@ -334,8 +333,7 @@ private static Function alignPageToAttributes(List attrs, for (int i = 0; i < blocks.length; i++) { blocks[i] = p.getBlock(mappedPosition[i]); } - ProjectOperator.closeUnused(p, blocks); - return new Page(blocks); + return p.newPageAndRelease(blocks); } : Function.identity(); return transformer; From 5cc753946728607ddd7e8b0bc379b4a8f17ff6bd Mon Sep 17 00:00:00 2001 From: Costin Leau Date: Thu, 5 Oct 2023 17:20:43 -0700 Subject: [PATCH 4/5] Update project test --- .../compute/operator/ProjectOperatorTests.java | 10 ++-------- 1 file changed, 2 insertions(+), 8 deletions(-) diff --git a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/ProjectOperatorTests.java b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/ProjectOperatorTests.java index 59e85390fc522..1acdbc4895c94 100644 --- a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/ProjectOperatorTests.java +++ b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/ProjectOperatorTests.java @@ -15,14 +15,11 @@ import org.elasticsearch.compute.data.IntBlock; import org.elasticsearch.compute.data.LongBlock; import org.elasticsearch.compute.data.Page; -import org.elasticsearch.core.Releasables; import org.elasticsearch.core.Tuple; import org.elasticsearch.indices.breaker.CircuitBreakerService; import java.util.Arrays; -import java.util.HashSet; import java.util.List; -import java.util.Set; import java.util.stream.LongStream; import static org.hamcrest.Matchers.equalTo; @@ -59,15 +56,12 @@ public void testProjection() { var out = projection.getOutput(); assertThat(randomProjection.size(), lessThanOrEqualTo(out.getBlockCount())); - Set blks = new HashSet<>(); for (int i = 0; i < out.getBlockCount(); i++) { var block = out.getBlock(i); - assertEquals(block, page.getBlock(randomProjection.get(i))); - blks.add(block); + assertEquals(blocks[randomProjection.get(i)], block); } - // close all blocks separately since the same block can be used by multiple columns (aliased) - Releasables.closeWhileHandlingException(blks.toArray(new Block[0])); + out.releaseBlocks(); } private List randomProjection(int size) { From e3415a2d62ea678eae986b53c9b40edc96a78845 Mon Sep 17 00:00:00 2001 From: Costin Leau Date: Fri, 6 Oct 2023 10:40:26 -0700 Subject: [PATCH 5/5] Address feedback --- .../org/elasticsearch/compute/data/Page.java | 25 +++++++++++-------- .../compute/data/BasicPageTests.java | 19 ++++++++++++++ 2 files changed, 34 insertions(+), 10 deletions(-) diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/Page.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/Page.java index 5daee6704b515..e4412693f5649 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/Page.java +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/Page.java @@ -14,6 +14,7 @@ import java.io.IOException; import java.util.Arrays; +import java.util.Collections; import java.util.IdentityHashMap; import java.util.Objects; @@ -220,12 +221,16 @@ public void writeTo(StreamOutput out) throws IOException { * Release all blocks in this page, decrementing any breakers accounting for these blocks. */ public void releaseBlocks() { + if (blocksReleased) { + return; + } + blocksReleased = true; + // blocks can be used as multiple columns - var map = new IdentityHashMap(mapSize(blocks.length)); - var DUMMY = new Object(); + var map = new IdentityHashMap(mapSize(blocks.length)); for (Block b : blocks) { - if (map.putIfAbsent(b, DUMMY) == null) { + if (map.putIfAbsent(b, Boolean.TRUE) == null) { Releasables.closeExpectNoException(b); } } @@ -237,19 +242,19 @@ public void releaseBlocks() { * The current page should no longer be used and be considered closed. */ public Page newPageAndRelease(Block... keep) { + if (blocksReleased) { + throw new IllegalStateException("can't create new page from already released page"); + } + blocksReleased = true; var newPage = new Page(positionCount, keep); - var map = new IdentityHashMap(mapSize(keep.length)); - var DUMMY = new Object(); + var set = Collections.newSetFromMap(new IdentityHashMap(mapSize(keep.length))); + set.addAll(Arrays.asList(keep)); - // create identity set - for (Block b : keep) { - map.putIfAbsent(b, DUMMY); - } // close blocks that have been left out for (Block b : blocks) { - if (map.containsKey(b) == false) { + if (set.contains(b) == false) { Releasables.closeExpectNoException(b); } } diff --git a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/data/BasicPageTests.java b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/data/BasicPageTests.java index 25aa957e90cff..23a257e7afbbe 100644 --- a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/data/BasicPageTests.java +++ b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/data/BasicPageTests.java @@ -196,6 +196,25 @@ public void testSerializationListPages() throws IOException { } } + public void testPageMultiRelease() { + int positions = randomInt(1024); + var block = new IntArrayVector(IntStream.range(0, positions).toArray(), positions).asBlock(); + Page page = new Page(block); + page.releaseBlocks(); + assertThat(block.isReleased(), is(true)); + page.releaseBlocks(); + } + + public void testNewPageAndRelease() { + int positions = randomInt(1024); + var blockA = new IntArrayVector(IntStream.range(0, positions).toArray(), positions).asBlock(); + var blockB = new IntArrayVector(IntStream.range(0, positions).toArray(), positions).asBlock(); + Page page = new Page(blockA, blockB); + Page newPage = page.newPageAndRelease(blockA); + assertThat(blockA.isReleased(), is(false)); + assertThat(blockB.isReleased(), is(true)); + } + BytesRefArray bytesRefArrayOf(String... values) { var array = new BytesRefArray(values.length, bigArrays); Arrays.stream(values).map(BytesRef::new).forEach(array::append);