diff --git a/benchmarks/src/main/java/org/elasticsearch/benchmark/compute/operator/ValuesSourceReaderBenchmark.java b/benchmarks/src/main/java/org/elasticsearch/benchmark/compute/operator/ValuesSourceReaderBenchmark.java index 1827babe6f091..9fa876a00c35c 100644 --- a/benchmarks/src/main/java/org/elasticsearch/benchmark/compute/operator/ValuesSourceReaderBenchmark.java +++ b/benchmarks/src/main/java/org/elasticsearch/benchmark/compute/operator/ValuesSourceReaderBenchmark.java @@ -19,6 +19,7 @@ import org.apache.lucene.store.Directory; import org.apache.lucene.util.BytesRef; import org.apache.lucene.util.NumericUtils; +import org.elasticsearch.compute.data.BlockFactory; import org.elasticsearch.compute.data.BytesRefBlock; import org.elasticsearch.compute.data.BytesRefVector; import org.elasticsearch.compute.data.DocVector; @@ -132,6 +133,7 @@ private static BlockLoader numericBlockLoader(String name, NumberFieldMapper.Num @OperationsPerInvocation(INDEX_SIZE) public void benchmark() { ValuesSourceReaderOperator op = new ValuesSourceReaderOperator( + BlockFactory.getNonBreakingInstance(), List.of(BlockReaderFactories.loaderToFactory(reader, blockLoader(name))), 0, name diff --git a/docs/changelog/101383.yaml b/docs/changelog/101383.yaml new file mode 100644 index 0000000000000..4875403acfaeb --- /dev/null +++ b/docs/changelog/101383.yaml @@ -0,0 +1,5 @@ +pr: 101383 +summary: "ESQL: Track memory from values loaded from lucene" +area: ES|QL +type: enhancement +issues: [] diff --git a/server/src/main/java/org/elasticsearch/index/mapper/BlockDocValuesReader.java b/server/src/main/java/org/elasticsearch/index/mapper/BlockDocValuesReader.java index 049a779c97503..90a295e5a25f2 100644 --- a/server/src/main/java/org/elasticsearch/index/mapper/BlockDocValuesReader.java +++ b/server/src/main/java/org/elasticsearch/index/mapper/BlockDocValuesReader.java @@ -26,7 +26,6 @@ import org.elasticsearch.index.mapper.BlockLoader.DoubleBuilder; import org.elasticsearch.index.mapper.BlockLoader.IntBuilder; import org.elasticsearch.index.mapper.BlockLoader.LongBuilder; -import org.elasticsearch.index.mapper.BlockLoader.SingletonOrdinalsBuilder; import java.io.IOException; @@ -61,7 +60,7 @@ public BlockDocValuesReader() { /** * Reads the values of the given documents specified in the input block */ - public abstract Builder readValues(BuilderFactory factory, Docs docs) throws IOException; + public abstract BlockLoader.Block readValues(BuilderFactory factory, Docs docs) throws IOException; /** * Reads the values of the given document into the builder @@ -189,27 +188,28 @@ private static class SingletonLongs extends BlockDocValuesReader { } @Override - public LongBuilder builder(BuilderFactory factory, int expectedCount) { + public BlockLoader.LongBuilder builder(BuilderFactory factory, int expectedCount) { return factory.longsFromDocValues(expectedCount); } @Override - public LongBuilder readValues(BuilderFactory factory, Docs docs) throws IOException { - var blockBuilder = builder(factory, docs.count()); - int lastDoc = -1; - for (int i = 0; i < docs.count(); i++) { - int doc = docs.get(i); - if (doc < lastDoc) { - throw new IllegalStateException("docs within same block must be in order"); - } - if (numericDocValues.advanceExact(doc)) { - blockBuilder.appendLong(numericDocValues.longValue()); - } else { - blockBuilder.appendNull(); + public BlockLoader.Block readValues(BuilderFactory factory, Docs docs) throws IOException { + try (BlockLoader.LongBuilder builder = builder(factory, docs.count())) { + int lastDoc = -1; + for (int i = 0; i < docs.count(); i++) { + int doc = docs.get(i); + if (doc < lastDoc) { + throw new IllegalStateException("docs within same block must be in order"); + } + if (numericDocValues.advanceExact(doc)) { + builder.appendLong(numericDocValues.longValue()); + } else { + builder.appendNull(); + } + lastDoc = doc; } - lastDoc = doc; + return builder.build(); } - return blockBuilder; } @Override @@ -247,16 +247,17 @@ public BlockLoader.LongBuilder builder(BuilderFactory factory, int expectedCount } @Override - public BlockLoader.LongBuilder readValues(BuilderFactory factory, Docs docs) throws IOException { - var blockBuilder = builder(factory, docs.count()); - for (int i = 0; i < docs.count(); i++) { - int doc = docs.get(i); - if (doc < this.docID) { - throw new IllegalStateException("docs within same block must be in order"); + public BlockLoader.Block readValues(BuilderFactory factory, Docs docs) throws IOException { + try (BlockLoader.LongBuilder builder = builder(factory, docs.count())) { + for (int i = 0; i < docs.count(); i++) { + int doc = docs.get(i); + if (doc < this.docID) { + throw new IllegalStateException("docs within same block must be in order"); + } + read(doc, builder); } - read(doc, blockBuilder); + return builder.build(); } - return blockBuilder; } @Override @@ -307,22 +308,23 @@ public IntBuilder builder(BuilderFactory factory, int expectedCount) { } @Override - public IntBuilder readValues(BuilderFactory factory, Docs docs) throws IOException { - var blockBuilder = builder(factory, docs.count()); - int lastDoc = -1; - for (int i = 0; i < docs.count(); i++) { - int doc = docs.get(i); - if (doc < lastDoc) { - throw new IllegalStateException("docs within same block must be in order"); - } - if (numericDocValues.advanceExact(doc)) { - blockBuilder.appendInt(Math.toIntExact(numericDocValues.longValue())); - } else { - blockBuilder.appendNull(); + public BlockLoader.Block readValues(BuilderFactory factory, Docs docs) throws IOException { + try (BlockLoader.IntBuilder builder = builder(factory, docs.count())) { + int lastDoc = -1; + for (int i = 0; i < docs.count(); i++) { + int doc = docs.get(i); + if (doc < lastDoc) { + throw new IllegalStateException("docs within same block must be in order"); + } + if (numericDocValues.advanceExact(doc)) { + builder.appendInt(Math.toIntExact(numericDocValues.longValue())); + } else { + builder.appendNull(); + } + lastDoc = doc; } - lastDoc = doc; + return builder.build(); } - return blockBuilder; } @Override @@ -360,16 +362,17 @@ public IntBuilder builder(BuilderFactory factory, int expectedCount) { } @Override - public IntBuilder readValues(BuilderFactory factory, Docs docs) throws IOException { - var blockBuilder = builder(factory, docs.count()); - for (int i = 0; i < docs.count(); i++) { - int doc = docs.get(i); - if (doc < this.docID) { - throw new IllegalStateException("docs within same block must be in order"); + public BlockLoader.Block readValues(BuilderFactory factory, Docs docs) throws IOException { + try (BlockLoader.IntBuilder builder = builder(factory, docs.count())) { + for (int i = 0; i < docs.count(); i++) { + int doc = docs.get(i); + if (doc < this.docID) { + throw new IllegalStateException("docs within same block must be in order"); + } + read(doc, builder); } - read(doc, blockBuilder); + return builder.build(); } - return blockBuilder; } @Override @@ -423,23 +426,24 @@ public DoubleBuilder builder(BuilderFactory factory, int expectedCount) { } @Override - public DoubleBuilder readValues(BuilderFactory factory, Docs docs) throws IOException { - var blockBuilder = builder(factory, docs.count()); - int lastDoc = -1; - for (int i = 0; i < docs.count(); i++) { - int doc = docs.get(i); - if (doc < lastDoc) { - throw new IllegalStateException("docs within same block must be in order"); + public BlockLoader.Block readValues(BuilderFactory factory, Docs docs) throws IOException { + try (BlockLoader.DoubleBuilder builder = builder(factory, docs.count())) { + int lastDoc = -1; + for (int i = 0; i < docs.count(); i++) { + int doc = docs.get(i); + if (doc < lastDoc) { + throw new IllegalStateException("docs within same block must be in order"); + } + if (docValues.advanceExact(doc)) { + builder.appendDouble(toDouble.convert(docValues.longValue())); + } else { + builder.appendNull(); + } + lastDoc = doc; + this.docID = doc; } - if (docValues.advanceExact(doc)) { - blockBuilder.appendDouble(toDouble.convert(docValues.longValue())); - } else { - blockBuilder.appendNull(); - } - lastDoc = doc; - this.docID = doc; + return builder.build(); } - return blockBuilder; } @Override @@ -480,16 +484,17 @@ public DoubleBuilder builder(BuilderFactory factory, int expectedCount) { } @Override - public DoubleBuilder readValues(BuilderFactory factory, Docs docs) throws IOException { - var blockBuilder = builder(factory, docs.count()); - for (int i = 0; i < docs.count(); i++) { - int doc = docs.get(i); - if (doc < this.docID) { - throw new IllegalStateException("docs within same block must be in order"); + public BlockLoader.Block readValues(BuilderFactory factory, Docs docs) throws IOException { + try (BlockLoader.DoubleBuilder builder = builder(factory, docs.count())) { + for (int i = 0; i < docs.count(); i++) { + int doc = docs.get(i); + if (doc < this.docID) { + throw new IllegalStateException("docs within same block must be in order"); + } + read(doc, builder); } - read(doc, blockBuilder); + return builder.build(); } - return blockBuilder; } @Override @@ -539,21 +544,21 @@ public BytesRefBuilder builder(BuilderFactory factory, int expectedCount) { } @Override - public SingletonOrdinalsBuilder readValues(BuilderFactory factory, Docs docs) throws IOException { - SingletonOrdinalsBuilder builder = factory.singletonOrdinalsBuilder(ordinals, docs.count()); - - for (int i = 0; i < docs.count(); i++) { - int doc = docs.get(i); - if (doc < ordinals.docID()) { - throw new IllegalStateException("docs within same block must be in order"); - } - if (ordinals.advanceExact(doc)) { - builder.appendOrd(ordinals.ordValue()); - } else { - builder.appendNull(); + public BlockLoader.Block readValues(BuilderFactory factory, Docs docs) throws IOException { + try (BlockLoader.SingletonOrdinalsBuilder builder = factory.singletonOrdinalsBuilder(ordinals, docs.count())) { + for (int i = 0; i < docs.count(); i++) { + int doc = docs.get(i); + if (doc < ordinals.docID()) { + throw new IllegalStateException("docs within same block must be in order"); + } + if (ordinals.advanceExact(doc)) { + builder.appendOrd(ordinals.ordValue()); + } else { + builder.appendNull(); + } } + return builder.build(); } - return builder; } @Override @@ -589,17 +594,17 @@ public BytesRefBuilder builder(BuilderFactory factory, int expectedCount) { } @Override - public BytesRefBuilder readValues(BuilderFactory factory, Docs docs) throws IOException { - BytesRefBuilder builder = builder(factory, docs.count()); - - for (int i = 0; i < docs.count(); i++) { - int doc = docs.get(i); - if (doc < ordinals.docID()) { - throw new IllegalStateException("docs within same block must be in order"); + public BlockLoader.Block readValues(BuilderFactory factory, Docs docs) throws IOException { + try (BytesRefBuilder builder = builder(factory, docs.count())) { + for (int i = 0; i < docs.count(); i++) { + int doc = docs.get(i); + if (doc < ordinals.docID()) { + throw new IllegalStateException("docs within same block must be in order"); + } + read(doc, builder); } - read(doc, builder); + return builder.build(); } - return builder; } @Override @@ -649,16 +654,17 @@ public BytesRefBuilder builder(BuilderFactory factory, int expectedCount) { } @Override - public BytesRefBuilder readValues(BuilderFactory factory, Docs docs) throws IOException { - var blockBuilder = builder(factory, docs.count()); - for (int i = 0; i < docs.count(); i++) { - int doc = docs.get(i); - if (doc < docID) { - throw new IllegalStateException("docs within same block must be in order"); + public BlockLoader.Block readValues(BuilderFactory factory, Docs docs) throws IOException { + try (BlockLoader.BytesRefBuilder builder = builder(factory, docs.count())) { + for (int i = 0; i < docs.count(); i++) { + int doc = docs.get(i); + if (doc < docID) { + throw new IllegalStateException("docs within same block must be in order"); + } + read(doc, builder); } - read(doc, blockBuilder); + return builder.build(); } - return blockBuilder; } @Override @@ -709,22 +715,23 @@ public BooleanBuilder builder(BuilderFactory factory, int expectedCount) { } @Override - public BooleanBuilder readValues(BuilderFactory factory, Docs docs) throws IOException { - var blockBuilder = builder(factory, docs.count()); - int lastDoc = -1; - for (int i = 0; i < docs.count(); i++) { - int doc = docs.get(i); - if (doc < lastDoc) { - throw new IllegalStateException("docs within same block must be in order"); - } - if (numericDocValues.advanceExact(doc)) { - blockBuilder.appendBoolean(numericDocValues.longValue() != 0); - } else { - blockBuilder.appendNull(); + public BlockLoader.Block readValues(BuilderFactory factory, Docs docs) throws IOException { + try (BlockLoader.BooleanBuilder builder = builder(factory, docs.count())) { + int lastDoc = -1; + for (int i = 0; i < docs.count(); i++) { + int doc = docs.get(i); + if (doc < lastDoc) { + throw new IllegalStateException("docs within same block must be in order"); + } + if (numericDocValues.advanceExact(doc)) { + builder.appendBoolean(numericDocValues.longValue() != 0); + } else { + builder.appendNull(); + } + lastDoc = doc; } - lastDoc = doc; + return builder.build(); } - return blockBuilder; } @Override @@ -762,16 +769,17 @@ public BooleanBuilder builder(BuilderFactory factory, int expectedCount) { } @Override - public BooleanBuilder readValues(BuilderFactory factory, Docs docs) throws IOException { - var blockBuilder = builder(factory, docs.count()); - for (int i = 0; i < docs.count(); i++) { - int doc = docs.get(i); - if (doc < this.docID) { - throw new IllegalStateException("docs within same block must be in order"); + public BlockLoader.Block readValues(BuilderFactory factory, Docs docs) throws IOException { + try (BlockLoader.BooleanBuilder builder = builder(factory, docs.count())) { + for (int i = 0; i < docs.count(); i++) { + int doc = docs.get(i); + if (doc < this.docID) { + throw new IllegalStateException("docs within same block must be in order"); + } + read(doc, builder); } - read(doc, blockBuilder); + return builder.build(); } - return blockBuilder; } @Override @@ -813,17 +821,18 @@ private static class Nulls extends BlockDocValuesReader { private int docID = -1; @Override - public Builder builder(BuilderFactory factory, int expectedCount) { + public BlockLoader.Builder builder(BuilderFactory factory, int expectedCount) { return factory.nulls(expectedCount); } @Override - public Builder readValues(BuilderFactory factory, Docs docs) throws IOException { - Builder builder = builder(factory, docs.count()); - for (int i = 0; i < docs.count(); i++) { - builder.appendNull(); + public BlockLoader.Block readValues(BuilderFactory factory, Docs docs) throws IOException { + try (BlockLoader.Builder builder = builder(factory, docs.count())) { + for (int i = 0; i < docs.count(); i++) { + builder.appendNull(); + } + return builder.build(); } - return builder; } @Override diff --git a/server/src/main/java/org/elasticsearch/index/mapper/BlockLoader.java b/server/src/main/java/org/elasticsearch/index/mapper/BlockLoader.java index 7e973f9c32033..af53ab42d35d9 100644 --- a/server/src/main/java/org/elasticsearch/index/mapper/BlockLoader.java +++ b/server/src/main/java/org/elasticsearch/index/mapper/BlockLoader.java @@ -12,6 +12,7 @@ import org.apache.lucene.index.SortedDocValues; import org.apache.lucene.index.SortedSetDocValues; import org.apache.lucene.util.BytesRef; +import org.elasticsearch.core.Releasable; import java.io.IOException; @@ -124,13 +125,24 @@ interface BuilderFactory { // TODO support non-singleton ords } + /** + * Marker interface for block results. The compute engine has a fleshed + * out implementation. + */ + interface Block {} + /** * A builder for typed values. For each document you may either call * {@link #appendNull}, {@code append}, or * {@link #beginPositionEntry} followed by two or more {@code append} * calls, and then {@link #endPositionEntry}. */ - interface Builder { + interface Builder extends Releasable { + /** + * Build the actual block. + */ + Block build(); + /** * Insert a null value. */ diff --git a/server/src/main/java/org/elasticsearch/index/mapper/BlockSourceReader.java b/server/src/main/java/org/elasticsearch/index/mapper/BlockSourceReader.java index 2b9daadda31d6..1261a3612d3cb 100644 --- a/server/src/main/java/org/elasticsearch/index/mapper/BlockSourceReader.java +++ b/server/src/main/java/org/elasticsearch/index/mapper/BlockSourceReader.java @@ -156,16 +156,17 @@ public String toString() { } @Override - public BlockLoader.Builder readValues(BlockLoader.BuilderFactory factory, BlockLoader.Docs docs) throws IOException { - BlockLoader.Builder blockBuilder = builder(factory, docs.count()); - for (int i = 0; i < docs.count(); i++) { - int doc = docs.get(i); - if (doc < this.docID) { - throw new IllegalStateException("docs within same block must be in order"); - } - readValuesFromSingleDoc(doc, blockBuilder); + public BlockLoader.Block readValues(BlockLoader.BuilderFactory factory, BlockLoader.Docs docs) throws IOException { + try (BlockLoader.Builder builder = builder(factory, docs.count())) { + for (int i = 0; i < docs.count(); i++) { + int doc = docs.get(i); + if (doc < this.docID) { + throw new IllegalStateException("docs within same block must be in order"); + } + readValuesFromSingleDoc(doc, builder); + } + return builder.build(); } - return blockBuilder; } @Override diff --git a/server/src/main/java/org/elasticsearch/index/mapper/BlockStoredFieldsReader.java b/server/src/main/java/org/elasticsearch/index/mapper/BlockStoredFieldsReader.java index d38d30a03b275..5984482fd9441 100644 --- a/server/src/main/java/org/elasticsearch/index/mapper/BlockStoredFieldsReader.java +++ b/server/src/main/java/org/elasticsearch/index/mapper/BlockStoredFieldsReader.java @@ -63,12 +63,13 @@ protected BlockStoredFieldsReader(LeafStoredFieldLoader loader) { } @Override - public final BlockLoader.Builder readValues(BlockLoader.BuilderFactory factory, BlockLoader.Docs docs) throws IOException { - var builder = builder(factory, docs.count()); - for (int i = 0; i < docs.count(); i++) { - readValuesFromSingleDoc(docs.get(i), builder); + public final BlockLoader.Block readValues(BlockLoader.BuilderFactory factory, BlockLoader.Docs docs) throws IOException { + try (BlockLoader.Builder builder = builder(factory, docs.count())) { + for (int i = 0; i < docs.count(); i++) { + readValuesFromSingleDoc(docs.get(i), builder); + } + return builder.build(); } - return builder; } @Override diff --git a/server/src/main/java/org/elasticsearch/index/mapper/BooleanScriptBlockDocValuesReader.java b/server/src/main/java/org/elasticsearch/index/mapper/BooleanScriptBlockDocValuesReader.java index 0d29bc43700e8..b59df56791fbe 100644 --- a/server/src/main/java/org/elasticsearch/index/mapper/BooleanScriptBlockDocValuesReader.java +++ b/server/src/main/java/org/elasticsearch/index/mapper/BooleanScriptBlockDocValuesReader.java @@ -37,12 +37,13 @@ public BlockLoader.BooleanBuilder builder(BlockLoader.BuilderFactory factory, in } @Override - public BlockLoader.Builder readValues(BlockLoader.BuilderFactory factory, BlockLoader.Docs docs) { - BlockLoader.BooleanBuilder builder = builder(factory, docs.count()); - for (int i = 0; i < docs.count(); i++) { - read(docs.get(i), builder); + public BlockLoader.Block readValues(BlockLoader.BuilderFactory factory, BlockLoader.Docs docs) { + try (BlockLoader.BooleanBuilder builder = builder(factory, docs.count())) { + for (int i = 0; i < docs.count(); i++) { + read(docs.get(i), builder); + } + return builder.build(); } - return builder; } @Override diff --git a/server/src/main/java/org/elasticsearch/index/mapper/DateScriptBlockDocValuesReader.java b/server/src/main/java/org/elasticsearch/index/mapper/DateScriptBlockDocValuesReader.java index 6e6cdd3d1f057..ad630a71870a4 100644 --- a/server/src/main/java/org/elasticsearch/index/mapper/DateScriptBlockDocValuesReader.java +++ b/server/src/main/java/org/elasticsearch/index/mapper/DateScriptBlockDocValuesReader.java @@ -36,12 +36,13 @@ public BlockLoader.LongBuilder builder(BlockLoader.BuilderFactory factory, int e } @Override - public BlockLoader.Builder readValues(BlockLoader.BuilderFactory factory, BlockLoader.Docs docs) { - BlockLoader.LongBuilder builder = builder(factory, docs.count()); - for (int i = 0; i < docs.count(); i++) { - read(docs.get(i), builder); + public BlockLoader.Block readValues(BlockLoader.BuilderFactory factory, BlockLoader.Docs docs) { + try (BlockLoader.LongBuilder builder = builder(factory, docs.count())) { + for (int i = 0; i < docs.count(); i++) { + read(docs.get(i), builder); + } + return builder.build(); } - return builder; } @Override diff --git a/server/src/main/java/org/elasticsearch/index/mapper/DoubleScriptBlockDocValuesReader.java b/server/src/main/java/org/elasticsearch/index/mapper/DoubleScriptBlockDocValuesReader.java index 856321f53244d..4e317a3ed11cb 100644 --- a/server/src/main/java/org/elasticsearch/index/mapper/DoubleScriptBlockDocValuesReader.java +++ b/server/src/main/java/org/elasticsearch/index/mapper/DoubleScriptBlockDocValuesReader.java @@ -36,12 +36,13 @@ public BlockLoader.DoubleBuilder builder(BlockLoader.BuilderFactory factory, int } @Override - public BlockLoader.Builder readValues(BlockLoader.BuilderFactory factory, BlockLoader.Docs docs) { - BlockLoader.DoubleBuilder builder = builder(factory, docs.count()); - for (int i = 0; i < docs.count(); i++) { - read(docs.get(i), builder); + public BlockLoader.Block readValues(BlockLoader.BuilderFactory factory, BlockLoader.Docs docs) { + try (BlockLoader.DoubleBuilder builder = builder(factory, docs.count())) { + for (int i = 0; i < docs.count(); i++) { + read(docs.get(i), builder); + } + return builder.build(); } - return builder; } @Override diff --git a/server/src/main/java/org/elasticsearch/index/mapper/IndexFieldMapper.java b/server/src/main/java/org/elasticsearch/index/mapper/IndexFieldMapper.java index cb69fa4b7c50a..5f987fd96ca66 100644 --- a/server/src/main/java/org/elasticsearch/index/mapper/IndexFieldMapper.java +++ b/server/src/main/java/org/elasticsearch/index/mapper/IndexFieldMapper.java @@ -96,12 +96,13 @@ public BlockLoader.BytesRefBuilder builder(BlockLoader.BuilderFactory factory, i } @Override - public BlockLoader.Builder readValues(BlockLoader.BuilderFactory factory, BlockLoader.Docs docs) { - BlockLoader.BytesRefBuilder builder = builder(factory, docs.count()); - for (int i = 0; i < docs.count(); i++) { - builder.appendBytesRef(bytes); + public BlockLoader.Block readValues(BlockLoader.BuilderFactory factory, BlockLoader.Docs docs) { + try (BlockLoader.BytesRefBuilder builder = builder(factory, docs.count())) { + for (int i = 0; i < docs.count(); i++) { + builder.appendBytesRef(bytes); + } + return builder.build(); } - return builder; } @Override diff --git a/server/src/main/java/org/elasticsearch/index/mapper/IpScriptBlockDocValuesReader.java b/server/src/main/java/org/elasticsearch/index/mapper/IpScriptBlockDocValuesReader.java index f05b9aff890af..23229a6533cdb 100644 --- a/server/src/main/java/org/elasticsearch/index/mapper/IpScriptBlockDocValuesReader.java +++ b/server/src/main/java/org/elasticsearch/index/mapper/IpScriptBlockDocValuesReader.java @@ -36,12 +36,13 @@ public BlockLoader.BytesRefBuilder builder(BlockLoader.BuilderFactory factory, i } @Override - public BlockLoader.Builder readValues(BlockLoader.BuilderFactory factory, BlockLoader.Docs docs) { - BlockLoader.BytesRefBuilder builder = builder(factory, docs.count()); - for (int i = 0; i < docs.count(); i++) { - read(docs.get(i), builder); + public BlockLoader.Block readValues(BlockLoader.BuilderFactory factory, BlockLoader.Docs docs) { + try (BlockLoader.BytesRefBuilder builder = builder(factory, docs.count())) { + for (int i = 0; i < docs.count(); i++) { + read(docs.get(i), builder); + } + return builder.build(); } - return builder; } @Override diff --git a/server/src/main/java/org/elasticsearch/index/mapper/KeywordScriptBlockDocValuesReader.java b/server/src/main/java/org/elasticsearch/index/mapper/KeywordScriptBlockDocValuesReader.java index 51058b3b60bf4..6afbcae50d31f 100644 --- a/server/src/main/java/org/elasticsearch/index/mapper/KeywordScriptBlockDocValuesReader.java +++ b/server/src/main/java/org/elasticsearch/index/mapper/KeywordScriptBlockDocValuesReader.java @@ -38,12 +38,13 @@ public BlockLoader.BytesRefBuilder builder(BlockLoader.BuilderFactory factory, i } @Override - public BlockLoader.Builder readValues(BlockLoader.BuilderFactory factory, BlockLoader.Docs docs) { - BlockLoader.BytesRefBuilder builder = builder(factory, docs.count()); - for (int i = 0; i < docs.count(); i++) { - read(docs.get(i), builder); + public BlockLoader.Block readValues(BlockLoader.BuilderFactory factory, BlockLoader.Docs docs) { + try (BlockLoader.BytesRefBuilder builder = builder(factory, docs.count())) { + for (int i = 0; i < docs.count(); i++) { + read(docs.get(i), builder); + } + return builder.build(); } - return builder; } @Override diff --git a/server/src/main/java/org/elasticsearch/index/mapper/LongScriptBlockDocValuesReader.java b/server/src/main/java/org/elasticsearch/index/mapper/LongScriptBlockDocValuesReader.java index 4896f7d858144..91c099cd2813b 100644 --- a/server/src/main/java/org/elasticsearch/index/mapper/LongScriptBlockDocValuesReader.java +++ b/server/src/main/java/org/elasticsearch/index/mapper/LongScriptBlockDocValuesReader.java @@ -36,12 +36,13 @@ public BlockLoader.LongBuilder builder(BlockLoader.BuilderFactory factory, int e } @Override - public BlockLoader.Builder readValues(BlockLoader.BuilderFactory factory, BlockLoader.Docs docs) { - BlockLoader.LongBuilder builder = builder(factory, docs.count()); - for (int i = 0; i < docs.count(); i++) { - read(docs.get(i), builder); + public BlockLoader.Block readValues(BlockLoader.BuilderFactory factory, BlockLoader.Docs docs) { + try (BlockLoader.LongBuilder builder = builder(factory, docs.count())) { + for (int i = 0; i < docs.count(); i++) { + read(docs.get(i), builder); + } + return builder.build(); } - return builder; } @Override diff --git a/test/framework/src/main/java/org/elasticsearch/index/mapper/TestBlock.java b/test/framework/src/main/java/org/elasticsearch/index/mapper/TestBlock.java index 5a42d5c6890b5..298acb9519532 100644 --- a/test/framework/src/main/java/org/elasticsearch/index/mapper/TestBlock.java +++ b/test/framework/src/main/java/org/elasticsearch/index/mapper/TestBlock.java @@ -28,7 +28,8 @@ public class TestBlock BlockLoader.DoubleBuilder, BlockLoader.IntBuilder, BlockLoader.LongBuilder, - BlockLoader.SingletonOrdinalsBuilder { + BlockLoader.SingletonOrdinalsBuilder, + BlockLoader.Block { public static BlockLoader.BuilderFactory FACTORY = new BlockLoader.BuilderFactory() { @Override public BlockLoader.BooleanBuilder booleansFromDocValues(int expectedCount) { @@ -192,8 +193,18 @@ public TestBlock appendOrd(int value) { } } + @Override + public TestBlock build() { + return this; + } + private TestBlock add(Object value) { (currentPosition == null ? values : currentPosition).add(value); return this; } + + @Override + public void close() { + // TODO assert that we close the test blocks + } } diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/AbstractBlockBuilder.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/AbstractBlockBuilder.java index bd13a9045a28a..d6d6584e1b534 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/AbstractBlockBuilder.java +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/AbstractBlockBuilder.java @@ -140,8 +140,9 @@ protected final void ensureCapacity() { return; } int newSize = calculateNewArraySize(valuesLength); - adjustBreaker((long) (newSize - valuesLength) * elementSize()); + adjustBreaker(newSize * elementSize()); growValuesArray(newSize); + adjustBreaker(-valuesLength * elementSize()); } @Override diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/Block.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/Block.java index d7c3a5cb9bfab..b9506153aac5a 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/Block.java +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/Block.java @@ -25,16 +25,9 @@ * or dense data. A Block can represent either single or multi valued data. A Block that represents * dense single-valued data can be viewed as a {@link Vector}. * - * TODO: update comment - *

All Blocks share the same set of data retrieval methods, but actual concrete implementations - * effectively support a subset of these, throwing {@code UnsupportedOperationException} where a - * particular data retrieval method is not supported. For example, a Block of primitive longs may - * not support retrieval as an integer, {code getInt}. This greatly simplifies Block usage and - * avoids cumbersome use-site casting. - * *

Block are immutable and can be passed between threads. */ -public interface Block extends Accountable, NamedWriteable, Releasable { +public interface Block extends Accountable, BlockLoader.Block, NamedWriteable, Releasable { /** * {@return an efficient dense single-value view of this block}. diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/BlockUtils.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/BlockUtils.java index 56a4dc249388f..89b40d6e46a14 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/BlockUtils.java +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/BlockUtils.java @@ -161,6 +161,7 @@ public static Block[] fromList(BlockFactory blockFactory, List> lis public static Block deepCopyOf(Block block, BlockFactory blockFactory) { try (Block.Builder builder = block.elementType().newBlockBuilder(block.getPositionCount(), blockFactory)) { builder.copyFrom(block, 0, block.getPositionCount()); + builder.mvOrdering(block.mvOrdering()); return builder.build(); } } diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/ConstantNullBlock.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/ConstantNullBlock.java index dba0ced86e60e..00b93d08a36ff 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/ConstantNullBlock.java +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/ConstantNullBlock.java @@ -182,7 +182,12 @@ public Block.Builder appendAllValuesToCurrentPosition(Block block) { @Override public Block.Builder mvOrdering(MvOrdering mvOrdering) { - throw new UnsupportedOperationException(); + /* + * This is called when copying but otherwise doesn't do + * anything because there aren't multivalue fields in a + * block containing only nulls. + */ + return this; } @Override diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/DocBlock.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/DocBlock.java index ccd740bc91ba9..ed7e317bfc4c7 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/DocBlock.java +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/DocBlock.java @@ -145,7 +145,13 @@ public Block.Builder appendAllValuesToCurrentPosition(Block block) { @Override public Block.Builder mvOrdering(MvOrdering mvOrdering) { - throw new UnsupportedOperationException("doc blocks only contain one value per position"); + /* + * This is called when copying but otherwise doesn't do + * anything because there aren't multivalue fields in a + * block containing doc references. Every position can + * only reference one doc. + */ + return this; } @Override diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/ValuesSourceReaderOperator.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/ValuesSourceReaderOperator.java index 8b1c4f78825ad..61c1bd9730e02 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/ValuesSourceReaderOperator.java +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/ValuesSourceReaderOperator.java @@ -53,7 +53,7 @@ public record ValuesSourceReaderOperatorFactory(List factories, int docChannel, String field) { + public ValuesSourceReaderOperator( + BlockFactory blockFactory, + List factories, + int docChannel, + String field + ) { this.factories = factories; this.docChannel = docChannel; this.field = field; - this.blockFactory = new ComputeBlockLoaderFactory(BlockFactory.getNonBreakingInstance()); // TODO breaking! + this.blockFactory = new ComputeBlockLoaderFactory(blockFactory); } @Override @@ -106,7 +111,7 @@ protected Page process(Page page) { private Block loadFromSingleLeaf(DocVector docVector) throws IOException { setupReader(docVector.shards().getInt(0), docVector.segments().getInt(0), docVector.docs().getInt(0)); - return ((Block.Builder) lastReader.readValues(blockFactory, new BlockLoader.Docs() { + return ((Block) lastReader.readValues(blockFactory, new BlockLoader.Docs() { private final IntVector docs = docVector.docs(); @Override @@ -118,26 +123,28 @@ public int count() { public int get(int i) { return docs.getInt(i); } - })).build(); + })); } private Block loadFromManyLeaves(DocVector docVector) throws IOException { int[] forwards = docVector.shardSegmentDocMapForwards(); int doc = docVector.docs().getInt(forwards[0]); setupReader(docVector.shards().getInt(forwards[0]), docVector.segments().getInt(forwards[0]), doc); - BlockLoader.Builder builder = lastReader.builder(blockFactory, forwards.length); - lastReader.readValuesFromSingleDoc(doc, builder); - for (int i = 1; i < forwards.length; i++) { - int shard = docVector.shards().getInt(forwards[i]); - int segment = docVector.segments().getInt(forwards[i]); - doc = docVector.docs().getInt(forwards[i]); - if (segment != lastSegment || shard != lastShard) { - setupReader(shard, segment, doc); - } + try (BlockLoader.Builder builder = lastReader.builder(blockFactory, forwards.length)) { lastReader.readValuesFromSingleDoc(doc, builder); + for (int i = 1; i < forwards.length; i++) { + int shard = docVector.shards().getInt(forwards[i]); + int segment = docVector.segments().getInt(forwards[i]); + doc = docVector.docs().getInt(forwards[i]); + if (segment != lastSegment || shard != lastShard) { + setupReader(shard, segment, doc); + } + lastReader.readValuesFromSingleDoc(doc, builder); + } + try (Block orig = ((Block.Builder) builder).build()) { + return orig.filter(docVector.shardSegmentDocMapBackwards()); + } } - // TODO maybe it's better for downstream consumers if we perform a copy here. - return ((Block.Builder) builder).build().filter(docVector.shardSegmentDocMapBackwards()); } private void setupReader(int shard, int segment, int doc) throws IOException { diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/OrdinalsGroupingOperator.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/OrdinalsGroupingOperator.java index 2c5eedb6d8bbe..07494f97cfd6d 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/OrdinalsGroupingOperator.java +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/OrdinalsGroupingOperator.java @@ -467,7 +467,7 @@ private static class ValuesAggregator implements Releasable { int maxPageSize, DriverContext driverContext ) { - this.extractor = new ValuesSourceReaderOperator(factories, docChannel, groupingField); + this.extractor = new ValuesSourceReaderOperator(BlockFactory.getNonBreakingInstance(), factories, docChannel, groupingField); this.aggregator = new HashAggregationOperator( aggregatorFactories, () -> BlockHash.build(List.of(new GroupSpec(channelIndex, groupingElementType)), driverContext, maxPageSize, false), diff --git a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/OperatorTests.java b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/OperatorTests.java index 6951cdf4d56ca..bde2ae0a747e4 100644 --- a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/OperatorTests.java +++ b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/OperatorTests.java @@ -141,6 +141,7 @@ public void testQueryOperator() throws IOException { } } + @AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/101413") public void testGroupingWithOrdinals() throws Exception { DriverContext driverContext = driverContext(); BlockFactory blockFactory = driverContext.blockFactory(); diff --git a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/aggregation/AggregatorFunctionTestCase.java b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/aggregation/AggregatorFunctionTestCase.java index 6c1f088aeca7b..894b94476c08d 100644 --- a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/aggregation/AggregatorFunctionTestCase.java +++ b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/aggregation/AggregatorFunctionTestCase.java @@ -43,12 +43,6 @@ import static org.hamcrest.Matchers.hasSize; public abstract class AggregatorFunctionTestCase extends ForkingOperatorTestCase { - - @Override - protected DriverContext driverContext() { - return breakingDriverContext(); - } - protected abstract AggregatorFunctionSupplier aggregatorFunction(BigArrays bigArrays, List inputChannels); protected final int aggregatorIntermediateBlockCount() { diff --git a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/aggregation/GroupingAggregatorFunctionTestCase.java b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/aggregation/GroupingAggregatorFunctionTestCase.java index ba930e943e79a..753b5878de2ae 100644 --- a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/aggregation/GroupingAggregatorFunctionTestCase.java +++ b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/aggregation/GroupingAggregatorFunctionTestCase.java @@ -48,12 +48,6 @@ import static org.hamcrest.Matchers.hasSize; public abstract class GroupingAggregatorFunctionTestCase extends ForkingOperatorTestCase { - - @Override - protected DriverContext driverContext() { - return breakingDriverContext(); - } - protected abstract AggregatorFunctionSupplier aggregatorFunction(BigArrays bigArrays, List inputChannels); protected final int aggregatorIntermediateBlockCount() { diff --git a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/LuceneSourceOperatorTests.java b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/LuceneSourceOperatorTests.java index ced1f6360bbbc..41fe1a93d9c8b 100644 --- a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/LuceneSourceOperatorTests.java +++ b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/LuceneSourceOperatorTests.java @@ -221,9 +221,4 @@ public static SearchContext mockSearchContext(IndexReader reader) { throw new UncheckedIOException(e); } } - - @Override - protected DriverContext driverContext() { - return breakingDriverContext(); - } } diff --git a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/LuceneTopNSourceOperatorTests.java b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/LuceneTopNSourceOperatorTests.java index 4e604144927c0..d1b9e706750df 100644 --- a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/LuceneTopNSourceOperatorTests.java +++ b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/LuceneTopNSourceOperatorTests.java @@ -200,9 +200,4 @@ private void testSimple(DriverContext ctx, int size, int limit) { int pages = (int) Math.ceil((float) Math.min(size, limit) / factory.maxPageSize()); assertThat(results, hasSize(pages)); } - - @Override - protected DriverContext driverContext() { - return breakingDriverContext(); - } } diff --git a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/ValuesSourceReaderOperatorTests.java b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/ValuesSourceReaderOperatorTests.java index a6e5e3bf4744d..269a478560bac 100644 --- a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/ValuesSourceReaderOperatorTests.java +++ b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/ValuesSourceReaderOperatorTests.java @@ -223,7 +223,6 @@ public void testLoadAllInOnePageShuffled() { } private void loadSimpleAndAssert(DriverContext driverContext, List input) { - List results = new ArrayList<>(); List operators = List.of( factory(reader, new NumberFieldMapper.NumberFieldType("key", NumberFieldMapper.NumberType.INTEGER)).get(driverContext), factory(reader, new NumberFieldMapper.NumberFieldType("long", NumberFieldMapper.NumberType.LONG)).get(driverContext), @@ -236,17 +235,7 @@ private void loadSimpleAndAssert(DriverContext driverContext, List input) factory(reader, new NumberFieldMapper.NumberFieldType("double", NumberFieldMapper.NumberType.DOUBLE)).get(driverContext), factory(reader, new NumberFieldMapper.NumberFieldType("mv_double", NumberFieldMapper.NumberType.DOUBLE)).get(driverContext) ); - try ( - Driver d = new Driver( - driverContext, - new CannedSourceOperator(input.iterator()), - operators, - new PageConsumerOperator(page -> results.add(page)), - () -> {} - ) - ) { - runDriver(d); - } + List results = drive(operators, input.iterator(), driverContext); assertThat(results, hasSize(input.size())); for (Page p : results) { assertThat(p.getBlockCount(), equalTo(11)); @@ -368,20 +357,24 @@ public void testValuesSourceReaderOperatorWithNulls() throws IOException { factory(reader, kwFt).get(driverContext) ), new PageConsumerOperator(page -> { - logger.debug("New page: {}", page); - IntBlock intValuesBlock = page.getBlock(1); - LongBlock longValuesBlock = page.getBlock(2); - DoubleBlock doubleValuesBlock = page.getBlock(3); - BytesRefBlock keywordValuesBlock = page.getBlock(4); + try { + logger.debug("New page: {}", page); + IntBlock intValuesBlock = page.getBlock(1); + LongBlock longValuesBlock = page.getBlock(2); + DoubleBlock doubleValuesBlock = page.getBlock(3); + BytesRefBlock keywordValuesBlock = page.getBlock(4); - for (int i = 0; i < page.getPositionCount(); i++) { - assertFalse(intValuesBlock.isNull(i)); - long j = intValuesBlock.getInt(i); - // Every 100 documents we set fields to null - boolean fieldIsEmpty = j % 100 == 0; - assertEquals(fieldIsEmpty, longValuesBlock.isNull(i)); - assertEquals(fieldIsEmpty, doubleValuesBlock.isNull(i)); - assertEquals(fieldIsEmpty, keywordValuesBlock.isNull(i)); + for (int i = 0; i < page.getPositionCount(); i++) { + assertFalse(intValuesBlock.isNull(i)); + long j = intValuesBlock.getInt(i); + // Every 100 documents we set fields to null + boolean fieldIsEmpty = j % 100 == 0; + assertEquals(fieldIsEmpty, longValuesBlock.isNull(i)); + assertEquals(fieldIsEmpty, doubleValuesBlock.isNull(i)); + assertEquals(fieldIsEmpty, keywordValuesBlock.isNull(i)); + } + } finally { + page.releaseBlocks(); } }), () -> {} diff --git a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/AnyOperatorTestCase.java b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/AnyOperatorTestCase.java index 984bc7527e59a..290756e81cfae 100644 --- a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/AnyOperatorTestCase.java +++ b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/AnyOperatorTestCase.java @@ -101,17 +101,7 @@ protected final BigArrays nonBreakingBigArrays() { /** * A {@link DriverContext} with a nonBreakingBigArrays. */ - protected DriverContext driverContext() { // TODO make this final and return a breaking block factory - return new DriverContext(nonBreakingBigArrays(), BlockFactory.getNonBreakingInstance()); - } - - private final List breakers = new ArrayList<>(); - private final List blockFactories = new ArrayList<>(); - - /** - * A {@link DriverContext} with a breaking {@link BigArrays} and {@link BlockFactory}. - */ - protected DriverContext breakingDriverContext() { // TODO move this to driverContext once everyone supports breaking + protected DriverContext driverContext() { // TODO make this final once all operators support memory tracking BigArrays bigArrays = new MockBigArrays(PageCacheRecycler.NON_RECYCLING_INSTANCE, ByteSizeValue.ofGb(1)).withCircuitBreaking(); CircuitBreaker breaker = bigArrays.breakerService().getBreaker(CircuitBreaker.REQUEST); breakers.add(breaker); @@ -120,6 +110,13 @@ protected DriverContext breakingDriverContext() { // TODO move this to driverCon return new DriverContext(bigArrays, factory); } + protected final DriverContext nonBreakingDriverContext() { // TODO drop this once the driverContext method isn't overrideable + return new DriverContext(nonBreakingBigArrays(), BlockFactory.getNonBreakingInstance()); + } + + private final List breakers = new ArrayList<>(); + private final List blockFactories = new ArrayList<>(); + protected final DriverContext crankyDriverContext() { CrankyCircuitBreakerService cranky = new CrankyCircuitBreakerService(); BigArrays bigArrays = new MockBigArrays(PageCacheRecycler.NON_RECYCLING_INSTANCE, cranky).withCircuitBreaking(); diff --git a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/CannedSourceOperator.java b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/CannedSourceOperator.java index f5dd2680e0ac7..47febc09e45f5 100644 --- a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/CannedSourceOperator.java +++ b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/CannedSourceOperator.java @@ -7,8 +7,11 @@ package org.elasticsearch.compute.operator; +import org.elasticsearch.common.collect.Iterators; import org.elasticsearch.compute.data.Block; import org.elasticsearch.compute.data.Page; +import org.elasticsearch.core.Releasable; +import org.elasticsearch.core.Releasables; import java.util.ArrayList; import java.util.Iterator; @@ -42,19 +45,32 @@ public static Page mergePages(List pages) { int totalPositions = pages.stream().mapToInt(Page::getPositionCount).sum(); Page first = pages.get(0); Block.Builder[] builders = new Block.Builder[first.getBlockCount()]; - for (int b = 0; b < builders.length; b++) { - builders[b] = first.getBlock(b).elementType().newBlockBuilder(totalPositions); - } - for (Page p : pages) { + try { for (int b = 0; b < builders.length; b++) { - builders[b].copyFrom(p.getBlock(b), 0, p.getPositionCount()); + builders[b] = first.getBlock(b).elementType().newBlockBuilder(totalPositions); } + for (Page p : pages) { + for (int b = 0; b < builders.length; b++) { + builders[b].copyFrom(p.getBlock(b), 0, p.getPositionCount()); + } + } + Block[] blocks = new Block[builders.length]; + Page result = null; + try { + for (int b = 0; b < blocks.length; b++) { + blocks[b] = builders[b].build(); + } + result = new Page(blocks); + } finally { + if (result == null) { + Releasables.close(blocks); + } + } + return result; + } finally { + Iterable releasePages = () -> Iterators.map(pages.iterator(), p -> p::releaseBlocks); + Releasables.closeExpectNoException(Releasables.wrap(builders), Releasables.wrap(releasePages)); } - Block[] blocks = new Block[builders.length]; - for (int b = 0; b < blocks.length; b++) { - blocks[b] = builders[b].build(); - } - return new Page(blocks); } /** diff --git a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/EvalOperatorTests.java b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/EvalOperatorTests.java index 95a5647717851..e7f5db7579869 100644 --- a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/EvalOperatorTests.java +++ b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/EvalOperatorTests.java @@ -118,9 +118,4 @@ public void testReadFromBlock() { protected ByteSizeValue smallEnoughToCircuitBreak() { return ByteSizeValue.ofBytes(between(1, 8000)); } - - @Override - protected DriverContext driverContext() { // TODO remove this when the parent uses a breaking block factory - return breakingDriverContext(); - } } diff --git a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/FilterOperatorTests.java b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/FilterOperatorTests.java index fa4c7bea7c9cc..e16f643e1ca4d 100644 --- a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/FilterOperatorTests.java +++ b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/FilterOperatorTests.java @@ -119,9 +119,4 @@ public void testReadFromBlock() { protected ByteSizeValue smallEnoughToCircuitBreak() { return ByteSizeValue.ofBytes(between(1, 600)); } - - @Override - protected DriverContext driverContext() { // TODO remove this when the parent uses a breaking block factory - return breakingDriverContext(); - } } diff --git a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/HashAggregationOperatorTests.java b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/HashAggregationOperatorTests.java index afa307c494431..b1ef784ca339c 100644 --- a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/HashAggregationOperatorTests.java +++ b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/HashAggregationOperatorTests.java @@ -31,12 +31,6 @@ import static org.hamcrest.Matchers.hasSize; public class HashAggregationOperatorTests extends ForkingOperatorTestCase { - - @Override - protected DriverContext driverContext() { - return breakingDriverContext(); - } - @Override protected SourceOperator simpleInput(BlockFactory blockFactory, int size) { long max = randomLongBetween(1, Long.MAX_VALUE / size); diff --git a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/LimitOperatorTests.java b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/LimitOperatorTests.java index 76f99389a697b..8c85f5927196f 100644 --- a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/LimitOperatorTests.java +++ b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/LimitOperatorTests.java @@ -23,11 +23,6 @@ import static org.hamcrest.Matchers.sameInstance; public class LimitOperatorTests extends OperatorTestCase { - @Override - protected DriverContext driverContext() { - return breakingDriverContext(); - } - @Override protected LimitOperator.Factory simple(BigArrays bigArrays) { return new LimitOperator.Factory(100); diff --git a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/MvExpandOperatorTests.java b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/MvExpandOperatorTests.java index a105818a2bf03..3572dc620287d 100644 --- a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/MvExpandOperatorTests.java +++ b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/MvExpandOperatorTests.java @@ -257,9 +257,4 @@ protected Page createPage(int positionOffset, int length) { List results = drive(new MvExpandOperator(0, randomIntBetween(1, 1000)), input.iterator(), context); assertSimpleOutput(origInput, results); } - - @Override - protected DriverContext driverContext() { - return breakingDriverContext(); - } } diff --git a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/ProjectOperatorTests.java b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/ProjectOperatorTests.java index 1acdbc4895c94..1aff6be7594aa 100644 --- a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/ProjectOperatorTests.java +++ b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/ProjectOperatorTests.java @@ -28,11 +28,6 @@ import static org.mockito.Mockito.when; public class ProjectOperatorTests extends OperatorTestCase { - @Override - protected DriverContext driverContext() { - return breakingDriverContext(); - } - public void testProjectionOnEmptyPage() { var page = new Page(0); var projection = new ProjectOperator(randomProjection(10)); diff --git a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/StringExtractOperatorTests.java b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/StringExtractOperatorTests.java index 5dff00ec930c4..55470d1dcae12 100644 --- a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/StringExtractOperatorTests.java +++ b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/StringExtractOperatorTests.java @@ -126,4 +126,9 @@ public void close() {} assertThat(brb.getBytesRef(idx + 1, spare).utf8ToString(), equalTo("foo4")); assertThat(brb.getBytesRef(idx + 2, spare).utf8ToString(), equalTo("foo5")); } + + @Override + protected DriverContext driverContext() { + return nonBreakingDriverContext(); + } } diff --git a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/topn/TopNOperatorTests.java b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/topn/TopNOperatorTests.java index 049bc449069b8..6c5bab9b8f784 100644 --- a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/topn/TopNOperatorTests.java +++ b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/topn/TopNOperatorTests.java @@ -1359,11 +1359,6 @@ public void testCloseWithoutCompleting() { } } - @Override - protected DriverContext driverContext() { // TODO remove this when the parent uses a breaking block factory - return breakingDriverContext(); - } - @SuppressWarnings({ "unchecked", "rawtypes" }) private static void readAsRows(List>> values, Page page) { if (page.getBlockCount() == 0) { diff --git a/x-pack/plugin/esql/qa/server/single-node/src/javaRestTest/java/org/elasticsearch/xpack/esql/qa/single_node/HeapAttackIT.java b/x-pack/plugin/esql/qa/server/single-node/src/javaRestTest/java/org/elasticsearch/xpack/esql/qa/single_node/HeapAttackIT.java index eed96a007a1b9..31d0a7646e1b7 100644 --- a/x-pack/plugin/esql/qa/server/single-node/src/javaRestTest/java/org/elasticsearch/xpack/esql/qa/single_node/HeapAttackIT.java +++ b/x-pack/plugin/esql/qa/server/single-node/src/javaRestTest/java/org/elasticsearch/xpack/esql/qa/single_node/HeapAttackIT.java @@ -290,7 +290,6 @@ public void testFetchManyBigFields() throws IOException { fetchManyBigFields(100); } - @AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/100528") public void testFetchTooManyBigFields() throws IOException { initManyBigFieldsIndex(500); assertCircuitBreaks(() -> fetchManyBigFields(500)); diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/EnrichLookupService.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/EnrichLookupService.java index bf246b5ac02d4..98c1397d97860 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/EnrichLookupService.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/EnrichLookupService.java @@ -255,7 +255,9 @@ private void doLookup( extractField instanceof Alias a ? ((NamedExpression) a.child()).name() : extractField.name(), EsqlDataTypes.isUnsupported(extractField.dataType()) ); - intermediateOperators.add(new ValuesSourceReaderOperator(sources, 0, extractField.name())); + intermediateOperators.add( + new ValuesSourceReaderOperator(BlockFactory.getNonBreakingInstance(), sources, 0, extractField.name()) + ); } // drop docs block intermediateOperators.add(droppingBlockOperator(extractFields.size() + 2, 0)); diff --git a/x-pack/plugin/mapper-constant-keyword/src/main/java/org/elasticsearch/xpack/constantkeyword/mapper/ConstantKeywordFieldMapper.java b/x-pack/plugin/mapper-constant-keyword/src/main/java/org/elasticsearch/xpack/constantkeyword/mapper/ConstantKeywordFieldMapper.java index 75202749d8dca..61db9dcd54a02 100644 --- a/x-pack/plugin/mapper-constant-keyword/src/main/java/org/elasticsearch/xpack/constantkeyword/mapper/ConstantKeywordFieldMapper.java +++ b/x-pack/plugin/mapper-constant-keyword/src/main/java/org/elasticsearch/xpack/constantkeyword/mapper/ConstantKeywordFieldMapper.java @@ -153,12 +153,13 @@ public BlockLoader.BytesRefBuilder builder(BlockLoader.BuilderFactory factory, i } @Override - public BlockLoader.Builder readValues(BlockLoader.BuilderFactory factory, BlockLoader.Docs docs) { - BlockLoader.BytesRefBuilder builder = builder(factory, docs.count()); - for (int i = 0; i < docs.count(); i++) { - builder.appendBytesRef(bytes); + public BlockLoader.Block readValues(BlockLoader.BuilderFactory factory, BlockLoader.Docs docs) { + try (BlockLoader.BytesRefBuilder builder = builder(factory, docs.count())) { + for (int i = 0; i < docs.count(); i++) { + builder.appendBytesRef(bytes); + } + return builder.build(); } - return builder; } @Override