From cb57d48d77bba4100448c4620d34752b34f0d296 Mon Sep 17 00:00:00 2001 From: Nik Everett Date: Thu, 5 Oct 2023 09:53:32 -0400 Subject: [PATCH] ESQL: Read from the `BlockFactory` (#100231) (#100310) This links the `BlockFactory` into the `Block` serialization code. With this blocks that are deserialized from over the wire are tracked. Co-authored-by: Nhat Nguyen --- .../elasticsearch/indices/IndicesService.java | 5 + .../test/AbstractSerializationTestCase.java | 71 +++++----- .../test/AbstractWireTestCase.java | 131 ++++++++++++------ .../test/AbstractXContentTestCase.java | 49 ++++--- .../test/EqualsHashCodeTestUtils.java | 67 +++++++-- .../compute/data/BooleanBlock.java | 31 +++-- .../compute/data/BooleanVector.java | 13 +- .../compute/data/BytesRefBlock.java | 31 +++-- .../compute/data/BytesRefVector.java | 13 +- .../compute/data/ConstantBytesRefVector.java | 6 +- .../compute/data/DoubleBlock.java | 31 +++-- .../compute/data/DoubleVector.java | 13 +- .../elasticsearch/compute/data/IntBlock.java | 31 +++-- .../elasticsearch/compute/data/IntVector.java | 13 +- .../elasticsearch/compute/data/LongBlock.java | 31 +++-- .../compute/data/LongVector.java | 13 +- .../compute/data/BlockFactory.java | 31 ++++- .../compute/data/BlockStreamInput.java | 24 ++++ .../org/elasticsearch/compute/data/Page.java | 16 --- .../compute/data/X-Block.java.st | 31 +++-- .../compute/data/X-ConstantVector.java.st | 14 +- .../compute/data/X-Vector.java.st | 13 +- .../compute/operator/LimitOperator.java | 2 +- .../operator/exchange/ExchangeResponse.java | 4 +- .../operator/exchange/ExchangeService.java | 15 +- .../compute/data/BasicPageTests.java | 79 ++++++----- .../compute/data/BigArrayVectorTests.java | 11 +- .../compute/data/BlockSerializationTests.java | 70 ++++++---- .../compute/data/MultiValueBlockTests.java | 62 +++++---- .../compute/data/SerializationTestCase.java | 41 +++++- .../compute/operator/LimitOperatorTests.java | 35 +++-- .../exchange/ExchangeServiceTests.java | 77 ++++++---- .../xpack/esql/action/EsqlActionIT.java | 1 + .../xpack/esql/plugin/CanMatchIT.java | 66 +++++---- .../xpack/esql/action/EsqlQueryAction.java | 2 +- .../xpack/esql/action/EsqlQueryResponse.java | 13 +- .../esql/enrich/EnrichLookupService.java | 7 +- .../xpack/esql/plugin/EsqlPlugin.java | 9 +- .../esql/plugin/TransportEsqlQueryAction.java | 12 +- .../esql/action/EsqlQueryResponseTests.java | 78 ++++++++--- 40 files changed, 795 insertions(+), 467 deletions(-) create mode 100644 x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/BlockStreamInput.java diff --git a/server/src/main/java/org/elasticsearch/indices/IndicesService.java b/server/src/main/java/org/elasticsearch/indices/IndicesService.java index 36ebaa96b0bb2..fd0e19295e2e0 100644 --- a/server/src/main/java/org/elasticsearch/indices/IndicesService.java +++ b/server/src/main/java/org/elasticsearch/indices/IndicesService.java @@ -1844,4 +1844,9 @@ public DateFieldMapper.DateFieldType getTimestampFieldType(Index index) { public IndexScopedSettings getIndexScopedSettings() { return indexScopedSettings; } + + // TODO move this? + public BigArrays getBigArrays() { + return bigArrays; + } } diff --git a/test/framework/src/main/java/org/elasticsearch/test/AbstractSerializationTestCase.java b/test/framework/src/main/java/org/elasticsearch/test/AbstractSerializationTestCase.java index 5abe6bc2fc640..238f523872f83 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/AbstractSerializationTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/test/AbstractSerializationTestCase.java @@ -39,6 +39,7 @@ public final void testFromXContent() throws IOException { .randomFieldsExcludeFilter(getRandomFieldsExcludeFilter()) .assertEqualsConsumer(this::assertEqualInstances) .assertToXContentEquivalence(assertToXContentEquivalence()) + .dispose(this::dispose) .test(); } @@ -61,41 +62,45 @@ public final void testConcurrentToXContent() throws IOException, InterruptedExce () -> randomFrom(XContentType.values()) ); T testInstance = createXContextTestInstance(xContentType); - ToXContent.Params params = new ToXContent.DelegatingMapParams( - singletonMap(RestSearchAction.TYPED_KEYS_PARAM, "true"), - getToXContentParams() - ); - boolean humanReadable = randomBoolean(); - BytesRef firstTimeBytes = toXContent(asXContent(testInstance), xContentType, params, humanReadable).toBytesRef(); - - /* - * 500 rounds seems to consistently reproduce the issue on Nik's - * laptop. Larger numbers are going to be slower but more likely - * to reproduce the issue. - */ - int rounds = scaledRandomIntBetween(300, 5000); - concurrentTest(() -> { - try { - for (int r = 0; r < rounds; r++) { - BytesRef thisRoundBytes = toXContent(asXContent(testInstance), xContentType, params, humanReadable).toBytesRef(); - if (firstTimeBytes.bytesEquals(thisRoundBytes)) { - continue; - } - StringBuilder error = new StringBuilder("Failed to round trip over "); - if (humanReadable) { - error.append("human readable "); + try { + ToXContent.Params params = new ToXContent.DelegatingMapParams( + singletonMap(RestSearchAction.TYPED_KEYS_PARAM, "true"), + getToXContentParams() + ); + boolean humanReadable = randomBoolean(); + BytesRef firstTimeBytes = toXContent(asXContent(testInstance), xContentType, params, humanReadable).toBytesRef(); + + /* + * 500 rounds seems to consistently reproduce the issue on Nik's + * laptop. Larger numbers are going to be slower but more likely + * to reproduce the issue. + */ + int rounds = scaledRandomIntBetween(300, 5000); + concurrentTest(() -> { + try { + for (int r = 0; r < rounds; r++) { + BytesRef thisRoundBytes = toXContent(asXContent(testInstance), xContentType, params, humanReadable).toBytesRef(); + if (firstTimeBytes.bytesEquals(thisRoundBytes)) { + continue; + } + StringBuilder error = new StringBuilder("Failed to round trip over "); + if (humanReadable) { + error.append("human readable "); + } + error.append(xContentType); + error.append("\nCanonical is:\n").append(Strings.toString(asXContent(testInstance), true, true)); + boolean showBytes = xContentType.xContent() == CborXContent.cborXContent; + error.append("\nWanted : ").append(showBytes ? firstTimeBytes : firstTimeBytes.utf8ToString()); + error.append("\nBut got: ").append(showBytes ? thisRoundBytes : thisRoundBytes.utf8ToString()); + fail(error.toString()); } - error.append(xContentType); - error.append("\nCanonical is:\n").append(Strings.toString(asXContent(testInstance), true, true)); - boolean showBytes = xContentType.xContent() == CborXContent.cborXContent; - error.append("\nWanted : ").append(showBytes ? firstTimeBytes : firstTimeBytes.utf8ToString()); - error.append("\nBut got: ").append(showBytes ? thisRoundBytes : thisRoundBytes.utf8ToString()); - fail(error.toString()); + } catch (IOException e) { + throw new AssertionError(e); } - } catch (IOException e) { - throw new AssertionError(e); - } - }); + }); + } finally { + dispose(testInstance); + } } protected abstract ToXContent asXContent(T instance); diff --git a/test/framework/src/main/java/org/elasticsearch/test/AbstractWireTestCase.java b/test/framework/src/main/java/org/elasticsearch/test/AbstractWireTestCase.java index 0f2a64920fcbd..8d4085623d156 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/AbstractWireTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/test/AbstractWireTestCase.java @@ -12,6 +12,7 @@ import org.elasticsearch.common.io.stream.NamedWriteable; import org.elasticsearch.common.io.stream.NamedWriteableRegistry; import org.elasticsearch.common.io.stream.Writeable; +import org.elasticsearch.core.Releasable; import org.elasticsearch.xcontent.ToXContent; import java.io.IOException; @@ -54,10 +55,20 @@ public abstract class AbstractWireTestCase extends ESTestCase { */ public final void testEqualsAndHashcode() { for (int runs = 0; runs < NUMBER_OF_TEST_RUNS; runs++) { - EqualsHashCodeTestUtils.checkEqualsAndHashCode(createTestInstance(), this::copyInstance, this::mutateInstance); + T testInstance = createTestInstance(); + try { + EqualsHashCodeTestUtils.checkEqualsAndHashCode(testInstance, this::copyInstance, this::mutateInstance, this::dispose); + } finally { + dispose(testInstance); + } } } + /** + * Dispose of the copy, usually {@link Releasable#close} or a noop. + */ + protected void dispose(T t) {} + /** * Calls {@link Object#equals} on equal objects on many threads and verifies * they all return true. Folks tend to assume this is true about @@ -67,19 +78,27 @@ public final void testEqualsAndHashcode() { */ public final void testConcurrentEquals() throws IOException, InterruptedException, ExecutionException { T testInstance = createTestInstance(); - T copy = copyInstance(testInstance); - - /* - * 500 rounds seems to consistently reproduce the issue on Nik's - * laptop. Larger numbers are going to be slower but more likely - * to reproduce the issue. - */ - int rounds = scaledRandomIntBetween(300, 5000); - concurrentTest(() -> { - for (int r = 0; r < rounds; r++) { - assertEquals(testInstance, copy); + try { + T copy = copyInstance(testInstance); + try { + + /* + * 500 rounds seems to consistently reproduce the issue on Nik's + * laptop. Larger numbers are going to be slower but more likely + * to reproduce the issue. + */ + int rounds = scaledRandomIntBetween(300, 5000); + concurrentTest(() -> { + for (int r = 0; r < rounds; r++) { + assertEquals(testInstance, copy); + } + }); + } finally { + dispose(copy); } - }); + } finally { + dispose(testInstance); + } } /** @@ -111,25 +130,34 @@ protected void concurrentTest(Runnable r) throws InterruptedException, Execution */ public final void testConcurrentHashCode() throws InterruptedException, ExecutionException { T testInstance = createTestInstance(); - int firstHashCode = testInstance.hashCode(); - - /* - * 500 rounds seems to consistently reproduce the issue on Nik's - * laptop. Larger numbers are going to be slower but more likely - * to reproduce the issue. - */ - int rounds = scaledRandomIntBetween(300, 5000); - concurrentTest(() -> { - for (int r = 0; r < rounds; r++) { - assertEquals(firstHashCode, testInstance.hashCode()); - } - }); + try { + int firstHashCode = testInstance.hashCode(); + + /* + * 500 rounds seems to consistently reproduce the issue on Nik's + * laptop. Larger numbers are going to be slower but more likely + * to reproduce the issue. + */ + int rounds = scaledRandomIntBetween(300, 5000); + concurrentTest(() -> { + for (int r = 0; r < rounds; r++) { + assertEquals(firstHashCode, testInstance.hashCode()); + } + }); + } finally { + dispose(testInstance); + } } public void testToString() throws Exception { - final String toString = createTestInstance().toString(); - assertNotNull(toString); - assertThat(toString, not(emptyString())); + T testInstance = createTestInstance(); + try { + final String toString = testInstance.toString(); + assertNotNull(toString); + assertThat(toString, not(emptyString())); + } finally { + dispose(testInstance); + } } /** @@ -138,7 +166,11 @@ public void testToString() throws Exception { public final void testSerialization() throws IOException { for (int runs = 0; runs < NUMBER_OF_TEST_RUNS; runs++) { T testInstance = createTestInstance(); - assertSerialization(testInstance); + try { + assertSerialization(testInstance); + } finally { + dispose(testInstance); + } } } @@ -155,22 +187,25 @@ public final void testSerialization() throws IOException { */ public final void testConcurrentSerialization() throws InterruptedException, ExecutionException { T testInstance = createTestInstance(); - - /* - * 500 rounds seems to consistently reproduce the issue on Nik's - * laptop. Larger numbers are going to be slower but more likely - * to reproduce the issue. - */ - int rounds = scaledRandomIntBetween(300, 2000); - concurrentTest(() -> { - try { - for (int r = 0; r < rounds; r++) { - assertSerialization(testInstance); + try { + /* + * 500 rounds seems to consistently reproduce the issue on Nik's + * laptop. Larger numbers are going to be slower but more likely + * to reproduce the issue. + */ + int rounds = scaledRandomIntBetween(300, 2000); + concurrentTest(() -> { + try { + for (int r = 0; r < rounds; r++) { + assertSerialization(testInstance); + } + } catch (IOException e) { + throw new AssertionError("error serializing", e); } - } catch (IOException e) { - throw new AssertionError("error serializing", e); - } - }); + }); + } finally { + dispose(testInstance); + } } /** @@ -187,7 +222,11 @@ protected final void assertSerialization(T testInstance) throws IOException { */ protected final void assertSerialization(T testInstance, TransportVersion version) throws IOException { T deserializedInstance = copyInstance(testInstance, version); - assertEqualInstances(testInstance, deserializedInstance); + try { + assertEqualInstances(testInstance, deserializedInstance); + } finally { + dispose(deserializedInstance); + } } /** diff --git a/test/framework/src/main/java/org/elasticsearch/test/AbstractXContentTestCase.java b/test/framework/src/main/java/org/elasticsearch/test/AbstractXContentTestCase.java index 5e9ed3e26d970..fa4d196ceaeda 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/AbstractXContentTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/test/AbstractXContentTestCase.java @@ -24,6 +24,7 @@ import java.io.IOException; import java.util.function.BiConsumer; +import java.util.function.Consumer; import java.util.function.Function; import java.util.function.Predicate; import java.util.function.Supplier; @@ -125,6 +126,7 @@ public static class XContentTester { assertEquals(expectedInstance.hashCode(), newInstance.hashCode()); }; private boolean assertToXContentEquivalence = true; + private Consumer dispose = t -> {}; private XContentTester( CheckedBiFunction createParser, @@ -142,24 +144,32 @@ public void test() throws IOException { for (int runs = 0; runs < numberOfTestRuns; runs++) { XContentType xContentType = randomFrom(XContentType.values()).canonical(); T testInstance = instanceSupplier.apply(xContentType); - BytesReference originalXContent = toXContent.apply(testInstance, xContentType); - BytesReference shuffledContent = insertRandomFieldsAndShuffle( - originalXContent, - xContentType, - supportsUnknownFields, - shuffleFieldsExceptions, - randomFieldsExcludeFilter, - createParser - ); - XContentParser parser = createParser.apply(XContentFactory.xContent(xContentType), shuffledContent); - T parsed = fromXContent.apply(parser); - assertEqualsConsumer.accept(testInstance, parsed); - if (assertToXContentEquivalence) { - assertToXContentEquivalent( - toXContent.apply(testInstance, xContentType), - toXContent.apply(parsed, xContentType), - xContentType + try { + BytesReference originalXContent = toXContent.apply(testInstance, xContentType); + BytesReference shuffledContent = insertRandomFieldsAndShuffle( + originalXContent, + xContentType, + supportsUnknownFields, + shuffleFieldsExceptions, + randomFieldsExcludeFilter, + createParser ); + XContentParser parser = createParser.apply(XContentFactory.xContent(xContentType), shuffledContent); + T parsed = fromXContent.apply(parser); + try { + assertEqualsConsumer.accept(testInstance, parsed); + if (assertToXContentEquivalence) { + assertToXContentEquivalent( + toXContent.apply(testInstance, xContentType), + toXContent.apply(parsed, xContentType), + xContentType + ); + } + } finally { + dispose.accept(parsed); + } + } finally { + dispose.accept(testInstance); } } } @@ -193,6 +203,11 @@ public XContentTester assertToXContentEquivalence(boolean assertToXContentEqu this.assertToXContentEquivalence = assertToXContentEquivalence; return this; } + + public XContentTester dispose(Consumer dispose) { + this.dispose = dispose; + return this; + } } public static void testFromXContent( diff --git a/test/framework/src/main/java/org/elasticsearch/test/EqualsHashCodeTestUtils.java b/test/framework/src/main/java/org/elasticsearch/test/EqualsHashCodeTestUtils.java index 143ab9011c85d..b295dad6a6b16 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/EqualsHashCodeTestUtils.java +++ b/test/framework/src/main/java/org/elasticsearch/test/EqualsHashCodeTestUtils.java @@ -8,7 +8,10 @@ package org.elasticsearch.test; +import org.elasticsearch.core.Releasable; + import java.io.IOException; +import java.util.function.Consumer; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.not; @@ -57,6 +60,24 @@ public static void checkEqualsAndHashCode(T original, CopyFunction copyFu * to the input object */ public static void checkEqualsAndHashCode(T original, CopyFunction copyFunction, MutateFunction mutationFunction) { + checkEqualsAndHashCode(original, copyFunction, mutationFunction, unused -> {}); + } + + /** + * Perform common equality and hashCode checks on the input object + * @param original the object under test + * @param copyFunction a function that creates a deep copy of the input object + * @param mutationFunction a function that creates a copy of the input object that is different + * @param dispose dispose of the copy, usually {@link Releasable#close} or a noop + * from the input in one aspect. The output of this call is used to check that it is not equal() + * to the input object + */ + public static void checkEqualsAndHashCode( + T original, + CopyFunction copyFunction, + MutateFunction mutationFunction, + Consumer dispose + ) { try { String objectName = original.getClass().getSimpleName(); assertFalse(objectName + " is equal to null", original.equals(null)); @@ -70,25 +91,41 @@ public static void checkEqualsAndHashCode(T original, CopyFunction copyFu ); if (mutationFunction != null) { T mutation = mutationFunction.mutate(original); - assertThat(objectName + " mutation should not be equal to original", mutation, not(equalTo(original))); - // equals is symmetric: for any non-null reference values x and y, x.equals(y) should return true if and only - // if y.equals(x) returns true. Conversely, y.equals(x) should return true if and only if x.equals(y) - assertThat("original should not be equal to mutation" + objectName, original, not(equalTo(mutation))); + try { + assertThat(objectName + " mutation should not be equal to original", mutation, not(equalTo(original))); + // equals is symmetric: for any non-null reference values x and y, x.equals(y) should return true if and only + // if y.equals(x) returns true. Conversely, y.equals(x) should return true if and only if x.equals(y) + assertThat("original should not be equal to mutation" + objectName, original, not(equalTo(mutation))); + } finally { + dispose.accept(mutation); + } } T copy = copyFunction.copy(original); - assertTrue(objectName + " copy is not equal to self", copy.equals(copy)); - assertTrue(objectName + " is not equal to its copy", original.equals(copy)); - assertTrue("equals is not symmetric", copy.equals(original)); - assertThat(objectName + " hashcode is different from copies hashcode", copy.hashCode(), equalTo(original.hashCode())); + try { + assertTrue(objectName + " copy is not equal to self", copy.equals(copy)); + assertTrue(objectName + " is not equal to its copy", original.equals(copy)); + assertTrue("equals is not symmetric", copy.equals(original)); + assertThat(objectName + " hashcode is different from copies hashcode", copy.hashCode(), equalTo(original.hashCode())); - T secondCopy = copyFunction.copy(copy); - assertTrue("second copy is not equal to self", secondCopy.equals(secondCopy)); - assertTrue("copy is not equal to its second copy", copy.equals(secondCopy)); - assertThat("second copy's hashcode is different from original hashcode", copy.hashCode(), equalTo(secondCopy.hashCode())); - assertTrue("equals is not transitive", original.equals(secondCopy)); - assertTrue("equals is not symmetric", secondCopy.equals(copy)); - assertTrue("equals is not symmetric", secondCopy.equals(original)); + T secondCopy = copyFunction.copy(copy); + try { + assertTrue("second copy is not equal to self", secondCopy.equals(secondCopy)); + assertTrue("copy is not equal to its second copy", copy.equals(secondCopy)); + assertThat( + "second copy's hashcode is different from original hashcode", + copy.hashCode(), + equalTo(secondCopy.hashCode()) + ); + assertTrue("equals is not transitive", original.equals(secondCopy)); + assertTrue("equals is not symmetric", secondCopy.equals(copy)); + assertTrue("equals is not symmetric", secondCopy.equals(original)); + } finally { + dispose.accept(secondCopy); + } + } finally { + dispose.accept(copy); + } } catch (IOException e) { throw new RuntimeException(e); } diff --git a/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/data/BooleanBlock.java b/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/data/BooleanBlock.java index f2501d54a4ae3..80f396695fc2f 100644 --- a/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/data/BooleanBlock.java +++ b/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/data/BooleanBlock.java @@ -36,33 +36,34 @@ public sealed interface BooleanBlock extends Block permits FilterBooleanBlock, B @Override BooleanBlock filter(int... positions); - NamedWriteableRegistry.Entry ENTRY = new NamedWriteableRegistry.Entry(Block.class, "BooleanBlock", BooleanBlock::of); - @Override default String getWriteableName() { return "BooleanBlock"; } - static BooleanBlock of(StreamInput in) throws IOException { + NamedWriteableRegistry.Entry ENTRY = new NamedWriteableRegistry.Entry(Block.class, "BooleanBlock", BooleanBlock::readFrom); + + private static BooleanBlock readFrom(StreamInput in) throws IOException { final boolean isVector = in.readBoolean(); if (isVector) { - return BooleanVector.of(in).asBlock(); + return BooleanVector.readFrom(((BlockStreamInput) in).blockFactory(), in).asBlock(); } final int positions = in.readVInt(); - var builder = newBlockBuilder(positions); - for (int i = 0; i < positions; i++) { - if (in.readBoolean()) { - builder.appendNull(); - } else { - final int valueCount = in.readVInt(); - builder.beginPositionEntry(); - for (int valueIndex = 0; valueIndex < valueCount; valueIndex++) { - builder.appendBoolean(in.readBoolean()); + try (BooleanBlock.Builder builder = ((BlockStreamInput) in).blockFactory().newBooleanBlockBuilder(positions)) { + for (int i = 0; i < positions; i++) { + if (in.readBoolean()) { + builder.appendNull(); + } else { + final int valueCount = in.readVInt(); + builder.beginPositionEntry(); + for (int valueIndex = 0; valueIndex < valueCount; valueIndex++) { + builder.appendBoolean(in.readBoolean()); + } + builder.endPositionEntry(); } - builder.endPositionEntry(); } + return builder.build(); } - return builder.build(); } @Override diff --git a/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/data/BooleanVector.java b/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/data/BooleanVector.java index 79b5ec40b81e5..d5dc9c23d7eee 100644 --- a/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/data/BooleanVector.java +++ b/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/data/BooleanVector.java @@ -72,17 +72,18 @@ static int hash(BooleanVector vector) { } /** Deserializes a Vector from the given stream input. */ - static BooleanVector of(StreamInput in) throws IOException { + static BooleanVector readFrom(BlockFactory blockFactory, StreamInput in) throws IOException { final int positions = in.readVInt(); final boolean constant = in.readBoolean(); if (constant && positions > 0) { - return new ConstantBooleanVector(in.readBoolean(), positions); + return blockFactory.newConstantBooleanVector(in.readBoolean(), positions); } else { - var builder = BooleanVector.newVectorBuilder(positions); - for (int i = 0; i < positions; i++) { - builder.appendBoolean(in.readBoolean()); + try (var builder = blockFactory.newBooleanVectorFixedBuilder(positions)) { + for (int i = 0; i < positions; i++) { + builder.appendBoolean(in.readBoolean()); + } + return builder.build(); } - return builder.build(); } } diff --git a/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/data/BytesRefBlock.java b/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/data/BytesRefBlock.java index e8cf8926d3cd2..9409212a9c998 100644 --- a/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/data/BytesRefBlock.java +++ b/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/data/BytesRefBlock.java @@ -40,33 +40,34 @@ public sealed interface BytesRefBlock extends Block permits FilterBytesRefBlock, @Override BytesRefBlock filter(int... positions); - NamedWriteableRegistry.Entry ENTRY = new NamedWriteableRegistry.Entry(Block.class, "BytesRefBlock", BytesRefBlock::of); - @Override default String getWriteableName() { return "BytesRefBlock"; } - static BytesRefBlock of(StreamInput in) throws IOException { + NamedWriteableRegistry.Entry ENTRY = new NamedWriteableRegistry.Entry(Block.class, "BytesRefBlock", BytesRefBlock::readFrom); + + private static BytesRefBlock readFrom(StreamInput in) throws IOException { final boolean isVector = in.readBoolean(); if (isVector) { - return BytesRefVector.of(in).asBlock(); + return BytesRefVector.readFrom(((BlockStreamInput) in).blockFactory(), in).asBlock(); } final int positions = in.readVInt(); - var builder = newBlockBuilder(positions); - for (int i = 0; i < positions; i++) { - if (in.readBoolean()) { - builder.appendNull(); - } else { - final int valueCount = in.readVInt(); - builder.beginPositionEntry(); - for (int valueIndex = 0; valueIndex < valueCount; valueIndex++) { - builder.appendBytesRef(in.readBytesRef()); + try (BytesRefBlock.Builder builder = ((BlockStreamInput) in).blockFactory().newBytesRefBlockBuilder(positions)) { + for (int i = 0; i < positions; i++) { + if (in.readBoolean()) { + builder.appendNull(); + } else { + final int valueCount = in.readVInt(); + builder.beginPositionEntry(); + for (int valueIndex = 0; valueIndex < valueCount; valueIndex++) { + builder.appendBytesRef(in.readBytesRef()); + } + builder.endPositionEntry(); } - builder.endPositionEntry(); } + return builder.build(); } - return builder.build(); } @Override diff --git a/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/data/BytesRefVector.java b/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/data/BytesRefVector.java index 84cb24f955618..3dd334a9fa71d 100644 --- a/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/data/BytesRefVector.java +++ b/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/data/BytesRefVector.java @@ -72,17 +72,18 @@ static int hash(BytesRefVector vector) { } /** Deserializes a Vector from the given stream input. */ - static BytesRefVector of(StreamInput in) throws IOException { + static BytesRefVector readFrom(BlockFactory blockFactory, StreamInput in) throws IOException { final int positions = in.readVInt(); final boolean constant = in.readBoolean(); if (constant && positions > 0) { - return new ConstantBytesRefVector(in.readBytesRef(), positions); + return blockFactory.newConstantBytesRefVector(in.readBytesRef(), positions); } else { - var builder = BytesRefVector.newVectorBuilder(positions); - for (int i = 0; i < positions; i++) { - builder.appendBytesRef(in.readBytesRef()); + try (var builder = blockFactory.newBytesRefVectorBuilder(positions)) { + for (int i = 0; i < positions; i++) { + builder.appendBytesRef(in.readBytesRef()); + } + return builder.build(); } - return builder.build(); } } diff --git a/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/data/ConstantBytesRefVector.java b/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/data/ConstantBytesRefVector.java index 3f7ae8449425e..6fc64a6891c32 100644 --- a/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/data/ConstantBytesRefVector.java +++ b/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/data/ConstantBytesRefVector.java @@ -57,9 +57,13 @@ public boolean isConstant() { return true; } + public static long ramBytesUsed(BytesRef value) { + return BASE_RAM_BYTES_USED + RamUsageEstimator.sizeOf(value.bytes); + } + @Override public long ramBytesUsed() { - return BASE_RAM_BYTES_USED + RamUsageEstimator.sizeOf(value.bytes); + return ramBytesUsed(value); } @Override diff --git a/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/data/DoubleBlock.java b/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/data/DoubleBlock.java index 9edd887448938..806ee6d3680bc 100644 --- a/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/data/DoubleBlock.java +++ b/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/data/DoubleBlock.java @@ -36,33 +36,34 @@ public sealed interface DoubleBlock extends Block permits FilterDoubleBlock, Dou @Override DoubleBlock filter(int... positions); - NamedWriteableRegistry.Entry ENTRY = new NamedWriteableRegistry.Entry(Block.class, "DoubleBlock", DoubleBlock::of); - @Override default String getWriteableName() { return "DoubleBlock"; } - static DoubleBlock of(StreamInput in) throws IOException { + NamedWriteableRegistry.Entry ENTRY = new NamedWriteableRegistry.Entry(Block.class, "DoubleBlock", DoubleBlock::readFrom); + + private static DoubleBlock readFrom(StreamInput in) throws IOException { final boolean isVector = in.readBoolean(); if (isVector) { - return DoubleVector.of(in).asBlock(); + return DoubleVector.readFrom(((BlockStreamInput) in).blockFactory(), in).asBlock(); } final int positions = in.readVInt(); - var builder = newBlockBuilder(positions); - for (int i = 0; i < positions; i++) { - if (in.readBoolean()) { - builder.appendNull(); - } else { - final int valueCount = in.readVInt(); - builder.beginPositionEntry(); - for (int valueIndex = 0; valueIndex < valueCount; valueIndex++) { - builder.appendDouble(in.readDouble()); + try (DoubleBlock.Builder builder = ((BlockStreamInput) in).blockFactory().newDoubleBlockBuilder(positions)) { + for (int i = 0; i < positions; i++) { + if (in.readBoolean()) { + builder.appendNull(); + } else { + final int valueCount = in.readVInt(); + builder.beginPositionEntry(); + for (int valueIndex = 0; valueIndex < valueCount; valueIndex++) { + builder.appendDouble(in.readDouble()); + } + builder.endPositionEntry(); } - builder.endPositionEntry(); } + return builder.build(); } - return builder.build(); } @Override diff --git a/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/data/DoubleVector.java b/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/data/DoubleVector.java index ce3e1ffa291f4..e2aaeed94ba6d 100644 --- a/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/data/DoubleVector.java +++ b/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/data/DoubleVector.java @@ -73,17 +73,18 @@ static int hash(DoubleVector vector) { } /** Deserializes a Vector from the given stream input. */ - static DoubleVector of(StreamInput in) throws IOException { + static DoubleVector readFrom(BlockFactory blockFactory, StreamInput in) throws IOException { final int positions = in.readVInt(); final boolean constant = in.readBoolean(); if (constant && positions > 0) { - return new ConstantDoubleVector(in.readDouble(), positions); + return blockFactory.newConstantDoubleVector(in.readDouble(), positions); } else { - var builder = DoubleVector.newVectorBuilder(positions); - for (int i = 0; i < positions; i++) { - builder.appendDouble(in.readDouble()); + try (var builder = blockFactory.newDoubleVectorFixedBuilder(positions)) { + for (int i = 0; i < positions; i++) { + builder.appendDouble(in.readDouble()); + } + return builder.build(); } - return builder.build(); } } diff --git a/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/data/IntBlock.java b/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/data/IntBlock.java index d6f39de6fc938..580da5e5a7415 100644 --- a/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/data/IntBlock.java +++ b/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/data/IntBlock.java @@ -36,33 +36,34 @@ public sealed interface IntBlock extends Block permits FilterIntBlock, IntArrayB @Override IntBlock filter(int... positions); - NamedWriteableRegistry.Entry ENTRY = new NamedWriteableRegistry.Entry(Block.class, "IntBlock", IntBlock::of); - @Override default String getWriteableName() { return "IntBlock"; } - static IntBlock of(StreamInput in) throws IOException { + NamedWriteableRegistry.Entry ENTRY = new NamedWriteableRegistry.Entry(Block.class, "IntBlock", IntBlock::readFrom); + + private static IntBlock readFrom(StreamInput in) throws IOException { final boolean isVector = in.readBoolean(); if (isVector) { - return IntVector.of(in).asBlock(); + return IntVector.readFrom(((BlockStreamInput) in).blockFactory(), in).asBlock(); } final int positions = in.readVInt(); - var builder = newBlockBuilder(positions); - for (int i = 0; i < positions; i++) { - if (in.readBoolean()) { - builder.appendNull(); - } else { - final int valueCount = in.readVInt(); - builder.beginPositionEntry(); - for (int valueIndex = 0; valueIndex < valueCount; valueIndex++) { - builder.appendInt(in.readInt()); + try (IntBlock.Builder builder = ((BlockStreamInput) in).blockFactory().newIntBlockBuilder(positions)) { + for (int i = 0; i < positions; i++) { + if (in.readBoolean()) { + builder.appendNull(); + } else { + final int valueCount = in.readVInt(); + builder.beginPositionEntry(); + for (int valueIndex = 0; valueIndex < valueCount; valueIndex++) { + builder.appendInt(in.readInt()); + } + builder.endPositionEntry(); } - builder.endPositionEntry(); } + return builder.build(); } - return builder.build(); } @Override diff --git a/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/data/IntVector.java b/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/data/IntVector.java index 79f4d5c31845d..157f7f1406072 100644 --- a/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/data/IntVector.java +++ b/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/data/IntVector.java @@ -72,17 +72,18 @@ static int hash(IntVector vector) { } /** Deserializes a Vector from the given stream input. */ - static IntVector of(StreamInput in) throws IOException { + static IntVector readFrom(BlockFactory blockFactory, StreamInput in) throws IOException { final int positions = in.readVInt(); final boolean constant = in.readBoolean(); if (constant && positions > 0) { - return new ConstantIntVector(in.readInt(), positions); + return blockFactory.newConstantIntVector(in.readInt(), positions); } else { - var builder = IntVector.newVectorBuilder(positions); - for (int i = 0; i < positions; i++) { - builder.appendInt(in.readInt()); + try (var builder = blockFactory.newIntVectorFixedBuilder(positions)) { + for (int i = 0; i < positions; i++) { + builder.appendInt(in.readInt()); + } + return builder.build(); } - return builder.build(); } } diff --git a/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/data/LongBlock.java b/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/data/LongBlock.java index d3dc5928cb543..2db757efd7091 100644 --- a/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/data/LongBlock.java +++ b/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/data/LongBlock.java @@ -36,33 +36,34 @@ public sealed interface LongBlock extends Block permits FilterLongBlock, LongArr @Override LongBlock filter(int... positions); - NamedWriteableRegistry.Entry ENTRY = new NamedWriteableRegistry.Entry(Block.class, "LongBlock", LongBlock::of); - @Override default String getWriteableName() { return "LongBlock"; } - static LongBlock of(StreamInput in) throws IOException { + NamedWriteableRegistry.Entry ENTRY = new NamedWriteableRegistry.Entry(Block.class, "LongBlock", LongBlock::readFrom); + + private static LongBlock readFrom(StreamInput in) throws IOException { final boolean isVector = in.readBoolean(); if (isVector) { - return LongVector.of(in).asBlock(); + return LongVector.readFrom(((BlockStreamInput) in).blockFactory(), in).asBlock(); } final int positions = in.readVInt(); - var builder = newBlockBuilder(positions); - for (int i = 0; i < positions; i++) { - if (in.readBoolean()) { - builder.appendNull(); - } else { - final int valueCount = in.readVInt(); - builder.beginPositionEntry(); - for (int valueIndex = 0; valueIndex < valueCount; valueIndex++) { - builder.appendLong(in.readLong()); + try (LongBlock.Builder builder = ((BlockStreamInput) in).blockFactory().newLongBlockBuilder(positions)) { + for (int i = 0; i < positions; i++) { + if (in.readBoolean()) { + builder.appendNull(); + } else { + final int valueCount = in.readVInt(); + builder.beginPositionEntry(); + for (int valueIndex = 0; valueIndex < valueCount; valueIndex++) { + builder.appendLong(in.readLong()); + } + builder.endPositionEntry(); } - builder.endPositionEntry(); } + return builder.build(); } - return builder.build(); } @Override diff --git a/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/data/LongVector.java b/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/data/LongVector.java index 584e9ecfa9ce0..de2e51cfda4ea 100644 --- a/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/data/LongVector.java +++ b/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/data/LongVector.java @@ -73,17 +73,18 @@ static int hash(LongVector vector) { } /** Deserializes a Vector from the given stream input. */ - static LongVector of(StreamInput in) throws IOException { + static LongVector readFrom(BlockFactory blockFactory, StreamInput in) throws IOException { final int positions = in.readVInt(); final boolean constant = in.readBoolean(); if (constant && positions > 0) { - return new ConstantLongVector(in.readLong(), positions); + return blockFactory.newConstantLongVector(in.readLong(), positions); } else { - var builder = LongVector.newVectorBuilder(positions); - for (int i = 0; i < positions; i++) { - builder.appendLong(in.readLong()); + try (var builder = blockFactory.newLongVectorFixedBuilder(positions)) { + for (int i = 0; i < positions; i++) { + builder.appendLong(in.readLong()); + } + return builder.build(); } - return builder.build(); } } diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/BlockFactory.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/BlockFactory.java index f094f8462e673..0e93bc1ee5e90 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/BlockFactory.java +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/BlockFactory.java @@ -147,6 +147,13 @@ public BooleanBlock newConstantBooleanBlockWith(boolean value, int positions, lo return b; } + public BooleanVector newConstantBooleanVector(boolean value, int positions) { + adjustBreaker(ConstantBooleanVector.RAM_BYTES_USED, false); + var v = new ConstantBooleanVector(value, positions, this); + assert v.ramBytesUsed() == ConstantBooleanVector.RAM_BYTES_USED; + return v; + } + public IntBlock.Builder newIntBlockBuilder(int estimatedSize) { return new IntBlockBuilder(estimatedSize, this); } @@ -207,7 +214,7 @@ public IntBlock newConstantIntBlockWith(int value, int positions, long preAdjust public IntVector newConstantIntVector(int value, int positions) { adjustBreaker(ConstantIntVector.RAM_BYTES_USED, false); var v = new ConstantIntVector(value, positions, this); - adjustBreaker(v.ramBytesUsed() - ConstantIntVector.RAM_BYTES_USED, true); + assert v.ramBytesUsed() == ConstantLongVector.RAM_BYTES_USED; return v; } @@ -253,6 +260,13 @@ public LongBlock newConstantLongBlockWith(long value, int positions, long preAdj return b; } + public LongVector newConstantLongVector(long value, int positions) { + adjustBreaker(ConstantLongVector.RAM_BYTES_USED, false); + var v = new ConstantLongVector(value, positions, this); + assert v.ramBytesUsed() == ConstantLongVector.RAM_BYTES_USED; + return v; + } + public DoubleBlock.Builder newDoubleBlockBuilder(int estimatedSize) { return new DoubleBlockBuilder(estimatedSize, this); } @@ -296,6 +310,13 @@ public DoubleBlock newConstantDoubleBlockWith(double value, int positions, long return b; } + public DoubleVector newConstantDoubleVector(double value, int positions) { + adjustBreaker(ConstantDoubleVector.RAM_BYTES_USED, false); + var v = new ConstantDoubleVector(value, positions, this); + assert v.ramBytesUsed() == ConstantDoubleVector.RAM_BYTES_USED; + return v; + } + public BytesRefBlock.Builder newBytesRefBlockBuilder(int estimatedSize) { return new BytesRefBlockBuilder(estimatedSize, bigArrays, this); } @@ -322,6 +343,14 @@ public BytesRefBlock newConstantBytesRefBlockWith(BytesRef value, int positions) return b; } + public BytesRefVector newConstantBytesRefVector(BytesRef value, int positions) { + long preadjusted = ConstantBytesRefVector.ramBytesUsed(value); + adjustBreaker(preadjusted, false); + var v = new ConstantBytesRefVector(value, positions, this); + assert v.ramBytesUsed() == preadjusted; + return v; + } + public Block newConstantNullBlock(int positions) { var b = new ConstantNullBlock(positions, this); adjustBreaker(b.ramBytesUsed(), true); diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/BlockStreamInput.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/BlockStreamInput.java new file mode 100644 index 0000000000000..a5604935acb23 --- /dev/null +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/BlockStreamInput.java @@ -0,0 +1,24 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.compute.data; + +import org.elasticsearch.common.io.stream.NamedWriteableAwareStreamInput; +import org.elasticsearch.common.io.stream.StreamInput; + +public class BlockStreamInput extends NamedWriteableAwareStreamInput { + private final BlockFactory blockFactory; + + public BlockStreamInput(StreamInput delegate, BlockFactory blockFactory) { + super(delegate, delegate.namedWriteableRegistry()); + this.blockFactory = blockFactory; + } + + BlockFactory blockFactory() { + return blockFactory; + } +} diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/Page.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/Page.java index d45555790c4cd..18f3ed7ba61bf 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/Page.java +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/Page.java @@ -224,20 +224,4 @@ public void releaseBlocks() { blocksReleased = true; Releasables.closeExpectNoException(blocks); } - - public static class PageWriter implements Writeable.Writer { - - @Override - public void write(StreamOutput out, Page value) throws IOException { - value.writeTo(out); - } - } - - public static class PageReader implements Writeable.Reader { - - @Override - public Page read(StreamInput in) throws IOException { - return new Page(in); - } - } } diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/X-Block.java.st b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/X-Block.java.st index 3f626e463f428..81a0d3de7f8f7 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/X-Block.java.st +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/X-Block.java.st @@ -52,33 +52,34 @@ $endif$ @Override $Type$Block filter(int... positions); - NamedWriteableRegistry.Entry ENTRY = new NamedWriteableRegistry.Entry(Block.class, "$Type$Block", $Type$Block::of); - @Override default String getWriteableName() { return "$Type$Block"; } - static $Type$Block of(StreamInput in) throws IOException { + NamedWriteableRegistry.Entry ENTRY = new NamedWriteableRegistry.Entry(Block.class, "$Type$Block", $Type$Block::readFrom); + + private static $Type$Block readFrom(StreamInput in) throws IOException { final boolean isVector = in.readBoolean(); if (isVector) { - return $Type$Vector.of(in).asBlock(); + return $Type$Vector.readFrom(((BlockStreamInput) in).blockFactory(), in).asBlock(); } final int positions = in.readVInt(); - var builder = newBlockBuilder(positions); - for (int i = 0; i < positions; i++) { - if (in.readBoolean()) { - builder.appendNull(); - } else { - final int valueCount = in.readVInt(); - builder.beginPositionEntry(); - for (int valueIndex = 0; valueIndex < valueCount; valueIndex++) { - builder.append$Type$(in.read$Type$()); + try ($Type$Block.Builder builder = ((BlockStreamInput) in).blockFactory().new$Type$BlockBuilder(positions)) { + for (int i = 0; i < positions; i++) { + if (in.readBoolean()) { + builder.appendNull(); + } else { + final int valueCount = in.readVInt(); + builder.beginPositionEntry(); + for (int valueIndex = 0; valueIndex < valueCount; valueIndex++) { + builder.append$Type$(in.read$Type$()); + } + builder.endPositionEntry(); } - builder.endPositionEntry(); } + return builder.build(); } - return builder.build(); } @Override diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/X-ConstantVector.java.st b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/X-ConstantVector.java.st index bfa33e54132a6..36384f3996f55 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/X-ConstantVector.java.st +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/X-ConstantVector.java.st @@ -68,15 +68,23 @@ $endif$ return true; } - @Override - public long ramBytesUsed() { $if(BytesRef)$ + public static long ramBytesUsed(BytesRef value) { return BASE_RAM_BYTES_USED + RamUsageEstimator.sizeOf(value.bytes); + } + + @Override + public long ramBytesUsed() { + return ramBytesUsed(value); + } + $else$ + @Override + public long ramBytesUsed() { return RAM_BYTES_USED; -$endif$ } +$endif$ @Override public boolean equals(Object obj) { if (obj instanceof $Type$Vector that) { diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/X-Vector.java.st b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/X-Vector.java.st index 9f29f3e25fe91..7e0c5b55fb2a6 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/X-Vector.java.st +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/X-Vector.java.st @@ -106,17 +106,18 @@ $endif$ } /** Deserializes a Vector from the given stream input. */ - static $Type$Vector of(StreamInput in) throws IOException { + static $Type$Vector readFrom(BlockFactory blockFactory, StreamInput in) throws IOException { final int positions = in.readVInt(); final boolean constant = in.readBoolean(); if (constant && positions > 0) { - return new Constant$Type$Vector(in.read$Type$(), positions); + return blockFactory.newConstant$Type$Vector(in.read$Type$(), positions); } else { - var builder = $Type$Vector.newVectorBuilder(positions); - for (int i = 0; i < positions; i++) { - builder.append$Type$(in.read$Type$()); + try (var builder = blockFactory.new$Type$Vector$if(BytesRef)$$else$Fixed$endif$Builder(positions)) { + for (int i = 0; i < positions; i++) { + builder.append$Type$(in.read$Type$()); + } + return builder.build(); } - return builder.build(); } } diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/LimitOperator.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/LimitOperator.java index 34b9a92d10ef7..7507eb8f978c8 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/LimitOperator.java +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/LimitOperator.java @@ -45,7 +45,7 @@ public LimitOperator(int limit) { public record Factory(int limit) implements OperatorFactory { @Override - public Operator get(DriverContext driverContext) { + public LimitOperator get(DriverContext driverContext) { return new LimitOperator(limit); } diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/exchange/ExchangeResponse.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/exchange/ExchangeResponse.java index 9b3da39fe5c74..5904c03a01e44 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/exchange/ExchangeResponse.java +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/exchange/ExchangeResponse.java @@ -7,8 +7,8 @@ package org.elasticsearch.compute.operator.exchange; -import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.compute.data.BlockStreamInput; import org.elasticsearch.compute.data.Page; import org.elasticsearch.core.AbstractRefCounted; import org.elasticsearch.core.Nullable; @@ -30,7 +30,7 @@ public ExchangeResponse(Page page, boolean finished) { this.finished = finished; } - public ExchangeResponse(StreamInput in) throws IOException { + public ExchangeResponse(BlockStreamInput in) throws IOException { super(in); this.page = in.readOptionalWriteable(Page::new); this.finished = in.readBoolean(); diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/exchange/ExchangeService.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/exchange/ExchangeService.java index f6762e33bbc18..ab9582b20d4aa 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/exchange/ExchangeService.java +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/exchange/ExchangeService.java @@ -22,6 +22,8 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.concurrent.AbstractAsyncTask; import org.elasticsearch.common.util.concurrent.ConcurrentCollections; +import org.elasticsearch.compute.data.BlockFactory; +import org.elasticsearch.compute.data.BlockStreamInput; import org.elasticsearch.core.TimeValue; import org.elasticsearch.tasks.CancellableTask; import org.elasticsearch.tasks.Task; @@ -61,15 +63,17 @@ public final class ExchangeService extends AbstractLifecycleComponent { private final ThreadPool threadPool; private final Executor executor; + private final BlockFactory blockFactory; private final Map sinks = ConcurrentCollections.newConcurrentMap(); private final Map sources = ConcurrentCollections.newConcurrentMap(); private final InactiveSinksReaper inactiveSinksReaper; - public ExchangeService(Settings settings, ThreadPool threadPool, String executorName) { + public ExchangeService(Settings settings, ThreadPool threadPool, String executorName, BlockFactory blockFactory) { this.threadPool = threadPool; this.executor = threadPool.executor(executorName); + this.blockFactory = blockFactory; final var inactiveInterval = settings.getAsTime(INACTIVE_SINKS_INTERVAL_SETTING, TimeValue.timeValueMinutes(5)); this.inactiveSinksReaper = new InactiveSinksReaper(LOGGER, threadPool, this.executor, inactiveInterval); } @@ -250,11 +254,12 @@ protected void runInternal() { * @param remoteNode the node where the remote exchange sink is located */ public RemoteSink newRemoteSink(Task parentTask, String exchangeId, TransportService transportService, DiscoveryNode remoteNode) { - return new TransportRemoteSink(transportService, remoteNode, parentTask, exchangeId, executor); + return new TransportRemoteSink(transportService, blockFactory, remoteNode, parentTask, exchangeId, executor); } record TransportRemoteSink( TransportService transportService, + BlockFactory blockFactory, DiscoveryNode node, Task parentTask, String exchangeId, @@ -269,7 +274,11 @@ public void fetchPageAsync(boolean allSourcesFinished, ActionListener(listener, ExchangeResponse::new, responseExecutor) + new ActionListenerResponseHandler<>( + listener, + in -> new ExchangeResponse(new BlockStreamInput(in, blockFactory)), + responseExecutor + ) ); } } diff --git a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/data/BasicPageTests.java b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/data/BasicPageTests.java index ca7ef54f7f321..25aa957e90cff 100644 --- a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/data/BasicPageTests.java +++ b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/data/BasicPageTests.java @@ -8,16 +8,10 @@ package org.elasticsearch.compute.data; import org.apache.lucene.util.BytesRef; -import org.elasticsearch.common.bytes.BytesReference; -import org.elasticsearch.common.io.stream.ByteBufferStreamInput; +import org.elasticsearch.common.collect.Iterators; import org.elasticsearch.common.io.stream.BytesStreamOutput; -import org.elasticsearch.common.io.stream.NamedWriteableAwareStreamInput; -import org.elasticsearch.common.io.stream.StreamInput; -import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.common.util.BytesRefArray; -import org.elasticsearch.common.util.MockBigArrays; -import org.elasticsearch.common.util.PageCacheRecycler; -import org.elasticsearch.indices.breaker.NoneCircuitBreakerService; +import org.elasticsearch.core.Releasables; import org.elasticsearch.test.EqualsHashCodeTestUtils; import java.io.IOException; @@ -120,9 +114,13 @@ public void testEqualityAndHashCode() throws IOException { }; } Page page = new Page(positions, blocks); - EqualsHashCodeTestUtils.checkEqualsAndHashCode(page, copyPageFunction, mutatePageFunction); + try { + EqualsHashCodeTestUtils.checkEqualsAndHashCode(page, copyPageFunction, mutatePageFunction); - EqualsHashCodeTestUtils.checkEqualsAndHashCode(page, unused -> serializeDeserializePage(page)); + EqualsHashCodeTestUtils.checkEqualsAndHashCode(page, this::serializeDeserializePage, null, Page::releaseBlocks); + } finally { + page.releaseBlocks(); + } } public void testBasic() { @@ -146,28 +144,33 @@ public void testAppend() { } public void testPageSerializationSimple() throws IOException { - try (var bytesRefArray = bytesRefArrayOf("0a", "1b", "2c", "3d", "4e", "5f", "6g", "7h", "8i", "9j")) { - final BytesStreamOutput out = new BytesStreamOutput(); - Page origPage = new Page( - new IntArrayVector(IntStream.range(0, 10).toArray(), 10).asBlock(), - new LongArrayVector(LongStream.range(10, 20).toArray(), 10).asBlock(), - new DoubleArrayVector(LongStream.range(30, 40).mapToDouble(i -> i).toArray(), 10).asBlock(), - new BytesRefArrayVector(bytesRefArray, 10).asBlock(), - IntBlock.newConstantBlockWith(randomInt(), 10), - LongBlock.newConstantBlockWith(randomInt(), 10), - DoubleBlock.newConstantBlockWith(randomInt(), 10), - BytesRefBlock.newConstantBlockWith(new BytesRef(Integer.toHexString(randomInt())), 10), - new IntArrayVector(IntStream.range(0, 20).toArray(), 20).filter(5, 6, 7, 8, 9, 10, 11, 12, 13, 14).asBlock() - ); + Page origPage = new Page( + new IntArrayVector(IntStream.range(0, 10).toArray(), 10).asBlock(), + new LongArrayVector(LongStream.range(10, 20).toArray(), 10).asBlock(), + new DoubleArrayVector(LongStream.range(30, 40).mapToDouble(i -> i).toArray(), 10).asBlock(), + new BytesRefArrayVector(bytesRefArrayOf("0a", "1b", "2c", "3d", "4e", "5f", "6g", "7h", "8i", "9j"), 10).asBlock(), + IntBlock.newConstantBlockWith(randomInt(), 10), + LongBlock.newConstantBlockWith(randomInt(), 10), + DoubleBlock.newConstantBlockWith(randomInt(), 10), + BytesRefBlock.newConstantBlockWith(new BytesRef(Integer.toHexString(randomInt())), 10), + new IntArrayVector(IntStream.range(0, 20).toArray(), 20).filter(5, 6, 7, 8, 9, 10, 11, 12, 13, 14).asBlock() + ); + try { Page deserPage = serializeDeserializePage(origPage); - EqualsHashCodeTestUtils.checkEqualsAndHashCode(origPage, unused -> deserPage); - - for (int i = 0; i < origPage.getBlockCount(); i++) { - Vector vector = origPage.getBlock(i).asVector(); - if (vector != null) { - assertEquals(vector.isConstant(), deserPage.getBlock(i).asVector().isConstant()); + try { + EqualsHashCodeTestUtils.checkEqualsAndHashCode(origPage, unused -> deserPage); + + for (int i = 0; i < origPage.getBlockCount(); i++) { + Vector vector = origPage.getBlock(i).asVector(); + if (vector != null) { + assertEquals(vector.isConstant(), deserPage.getBlock(i).asVector().isConstant()); + } } + } finally { + deserPage.releaseBlocks(); } + } finally { + origPage.releaseBlocks(); } } @@ -181,16 +184,18 @@ public void testSerializationListPages() throws IOException { ), new Page(BytesRefBlock.newConstantBlockWith(new BytesRef("Hello World"), positions)) ); - final BytesStreamOutput out = new BytesStreamOutput(); - out.writeCollection(origPages); - StreamInput in = new NamedWriteableAwareStreamInput(ByteBufferStreamInput.wrap(BytesReference.toBytes(out.bytes())), registry); - - List deserPages = in.readCollectionAsList(new Page.PageReader()); - EqualsHashCodeTestUtils.checkEqualsAndHashCode(origPages, unused -> deserPages); + try { + EqualsHashCodeTestUtils.checkEqualsAndHashCode(origPages, page -> { + try (BytesStreamOutput out = new BytesStreamOutput()) { + out.writeCollection(origPages); + return blockStreamInput(out).readCollectionAsList(Page::new); + } + }, null, pages -> Releasables.close(() -> Iterators.map(pages.iterator(), p -> p::releaseBlocks))); + } finally { + Releasables.close(() -> Iterators.map(origPages.iterator(), p -> p::releaseBlocks)); + } } - final BigArrays bigArrays = new MockBigArrays(PageCacheRecycler.NON_RECYCLING_INSTANCE, new NoneCircuitBreakerService()); - BytesRefArray bytesRefArrayOf(String... values) { var array = new BytesRefArray(values.length, bigArrays); Arrays.stream(values).map(BytesRef::new).forEach(array::append); diff --git a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/data/BigArrayVectorTests.java b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/data/BigArrayVectorTests.java index 66f62a2052689..3033f672f897f 100644 --- a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/data/BigArrayVectorTests.java +++ b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/data/BigArrayVectorTests.java @@ -140,10 +140,11 @@ public void testDouble() throws IOException { } void assertSerialization(Block origBlock) throws IOException { - Block deserBlock = serializeDeserializeBlock(origBlock); - EqualsHashCodeTestUtils.checkEqualsAndHashCode(origBlock, unused -> deserBlock); - EqualsHashCodeTestUtils.checkEqualsAndHashCode(origBlock.asVector(), unused -> deserBlock.asVector()); - assertThat(deserBlock.asVector(), is(origBlock.asVector())); - assertThat(deserBlock.asVector().isConstant(), is(origBlock.asVector().isConstant())); + try (Block deserBlock = serializeDeserializeBlock(origBlock)) { + EqualsHashCodeTestUtils.checkEqualsAndHashCode(origBlock, unused -> deserBlock); + EqualsHashCodeTestUtils.checkEqualsAndHashCode(origBlock.asVector(), unused -> deserBlock.asVector()); + assertThat(deserBlock.asVector(), is(origBlock.asVector())); + assertThat(deserBlock.asVector().isConstant(), is(origBlock.asVector().isConstant())); + } } } diff --git a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/data/BlockSerializationTests.java b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/data/BlockSerializationTests.java index cf3f275db53e9..8b958f7bafb8f 100644 --- a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/data/BlockSerializationTests.java +++ b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/data/BlockSerializationTests.java @@ -16,6 +16,7 @@ import org.elasticsearch.common.util.PageCacheRecycler; import org.elasticsearch.compute.aggregation.SumLongAggregatorFunction; import org.elasticsearch.compute.operator.DriverContext; +import org.elasticsearch.core.Releasables; import org.elasticsearch.indices.breaker.NoneCircuitBreakerService; import org.elasticsearch.test.EqualsHashCodeTestUtils; @@ -47,42 +48,45 @@ public void testConstantBytesRefBlock() throws IOException { private void assertConstantBlockImpl(Block origBlock) throws IOException { assertThat(origBlock.asVector().isConstant(), is(true)); - Block deserBlock = serializeDeserializeBlock(origBlock); - EqualsHashCodeTestUtils.checkEqualsAndHashCode(origBlock, unused -> deserBlock); - assertThat(deserBlock.asVector().isConstant(), is(true)); + try (Block deserBlock = serializeDeserializeBlock(origBlock)) { + EqualsHashCodeTestUtils.checkEqualsAndHashCode(origBlock, unused -> deserBlock); + assertThat(deserBlock.asVector().isConstant(), is(true)); + } } - public void testEmptyIntBlock() { + public void testEmptyIntBlock() throws IOException { assertEmptyBlock(IntBlock.newBlockBuilder(0).build()); assertEmptyBlock(IntBlock.newBlockBuilder(0).appendNull().build().filter()); assertEmptyBlock(IntVector.newVectorBuilder(0).build().asBlock()); assertEmptyBlock(IntVector.newVectorBuilder(0).appendInt(randomInt()).build().filter().asBlock()); } - public void testEmptyLongBlock() { + public void testEmptyLongBlock() throws IOException { assertEmptyBlock(LongBlock.newBlockBuilder(0).build()); assertEmptyBlock(LongBlock.newBlockBuilder(0).appendNull().build().filter()); assertEmptyBlock(LongVector.newVectorBuilder(0).build().asBlock()); assertEmptyBlock(LongVector.newVectorBuilder(0).appendLong(randomLong()).build().filter().asBlock()); } - public void testEmptyDoubleBlock() { + public void testEmptyDoubleBlock() throws IOException { assertEmptyBlock(DoubleBlock.newBlockBuilder(0).build()); assertEmptyBlock(DoubleBlock.newBlockBuilder(0).appendNull().build().filter()); assertEmptyBlock(DoubleVector.newVectorBuilder(0).build().asBlock()); assertEmptyBlock(DoubleVector.newVectorBuilder(0).appendDouble(randomDouble()).build().filter().asBlock()); } - public void testEmptyBytesRefBlock() { + public void testEmptyBytesRefBlock() throws IOException { assertEmptyBlock(BytesRefBlock.newBlockBuilder(0).build()); assertEmptyBlock(BytesRefBlock.newBlockBuilder(0).appendNull().build().filter()); assertEmptyBlock(BytesRefVector.newVectorBuilder(0).build().asBlock()); assertEmptyBlock(BytesRefVector.newVectorBuilder(0).appendBytesRef(randomBytesRef()).build().filter().asBlock()); } - private void assertEmptyBlock(Block origBlock) { + private void assertEmptyBlock(Block origBlock) throws IOException { assertThat(origBlock.getPositionCount(), is(0)); - EqualsHashCodeTestUtils.checkEqualsAndHashCode(origBlock, block -> serializeDeserializeBlock(block)); + try (Block deserBlock = serializeDeserializeBlock(origBlock)) { + EqualsHashCodeTestUtils.checkEqualsAndHashCode(origBlock, unused -> deserBlock); + } } public void testFilterIntBlock() throws IOException { @@ -132,19 +136,21 @@ public void testFilterBytesRefBlock() throws IOException { private void assertFilterBlock(Block origBlock) throws IOException { assertThat(origBlock.getPositionCount(), is(1)); - Block deserBlock = serializeDeserializeBlock(origBlock); - EqualsHashCodeTestUtils.checkEqualsAndHashCode(origBlock, unused -> deserBlock); - assertThat(deserBlock.getPositionCount(), is(1)); + try (Block deserBlock = serializeDeserializeBlock(origBlock)) { + EqualsHashCodeTestUtils.checkEqualsAndHashCode(origBlock, unused -> deserBlock); + assertThat(deserBlock.getPositionCount(), is(1)); + } } public void testConstantNullBlock() throws IOException { Block origBlock = new ConstantNullBlock(randomIntBetween(1, 8192)); - Block deserBlock = serializeDeserializeBlock(origBlock); - EqualsHashCodeTestUtils.checkEqualsAndHashCode(origBlock, unused -> deserBlock); + try (Block deserBlock = serializeDeserializeBlock(origBlock)) { + EqualsHashCodeTestUtils.checkEqualsAndHashCode(origBlock, unused -> deserBlock); + } } // TODO: more types, grouping, etc... - public void testAggregatorStateBlock() throws IOException { + public void testSimulateAggs() { DriverContext driverCtx = driverContext(); Page page = new Page(new LongArrayVector(new long[] { 1, 2, 3, 4, 5, 6, 7, 8, 9, 10 }, 10).asBlock()); var bigArrays = BigArrays.NON_RECYCLING_INSTANCE; @@ -152,18 +158,28 @@ public void testAggregatorStateBlock() throws IOException { var function = SumLongAggregatorFunction.create(driverCtx, List.of(0)); function.addRawInput(page); Block[] blocks = new Block[function.intermediateBlockCount()]; - function.evaluateIntermediate(blocks, 0); - - Block[] deserBlocks = Arrays.stream(blocks).map(this::uncheckedSerializeDeserializeBlock).toArray(Block[]::new); - IntStream.range(0, blocks.length).forEach(i -> EqualsHashCodeTestUtils.checkEqualsAndHashCode(blocks[i], unused -> deserBlocks[i])); - - var inputChannels = IntStream.range(0, SumLongAggregatorFunction.intermediateStateDesc().size()).boxed().toList(); - var finalAggregator = SumLongAggregatorFunction.create(driverCtx, inputChannels); - finalAggregator.addIntermediateInput(new Page(deserBlocks)); - Block[] finalBlocks = new Block[1]; - finalAggregator.evaluateFinal(finalBlocks, 0, driverCtx); - var finalBlock = (LongBlock) finalBlocks[0]; - assertThat(finalBlock.getLong(0), is(55L)); + try { + function.evaluateIntermediate(blocks, 0); + + Block[] deserBlocks = Arrays.stream(blocks).map(this::uncheckedSerializeDeserializeBlock).toArray(Block[]::new); + try { + IntStream.range(0, blocks.length) + .forEach(i -> EqualsHashCodeTestUtils.checkEqualsAndHashCode(blocks[i], unused -> deserBlocks[i])); + + var inputChannels = IntStream.range(0, SumLongAggregatorFunction.intermediateStateDesc().size()).boxed().toList(); + var finalAggregator = SumLongAggregatorFunction.create(driverCtx, inputChannels); + finalAggregator.addIntermediateInput(new Page(deserBlocks)); + Block[] finalBlocks = new Block[1]; + finalAggregator.evaluateFinal(finalBlocks, 0, driverCtx); + try (var finalBlock = (LongBlock) finalBlocks[0]) { + assertThat(finalBlock.getLong(0), is(55L)); + } + } finally { + Releasables.close(deserBlocks); + } + } finally { + Releasables.close(blocks); + } } static BytesRef randomBytesRef() { diff --git a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/data/MultiValueBlockTests.java b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/data/MultiValueBlockTests.java index 482a61a329a94..f067999a04ff1 100644 --- a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/data/MultiValueBlockTests.java +++ b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/data/MultiValueBlockTests.java @@ -7,6 +7,8 @@ package org.elasticsearch.compute.data; +import org.elasticsearch.core.Releasable; +import org.elasticsearch.core.Releasables; import org.elasticsearch.test.EqualsHashCodeTestUtils; import java.io.IOException; @@ -51,7 +53,7 @@ public void testIntBlockTrivial1() { // cannot get a Vector view assertNull(block.asVector()); - EqualsHashCodeTestUtils.checkEqualsAndHashCode(block, b -> serializeDeserializeBlock(b)); + EqualsHashCodeTestUtils.checkEqualsAndHashCode(block, this::serializeDeserializeBlock, null, Releasable::close); } public void testIntBlockTrivial() { @@ -76,7 +78,7 @@ public void testIntBlockTrivial() { assertThat(block.getValueCount(0), is(1)); assertThat(block.getInt(block.getFirstValueIndex(0)), is(1)); assertNull(block.asVector()); - EqualsHashCodeTestUtils.checkEqualsAndHashCode(block, b -> serializeDeserializeBlock(b)); + EqualsHashCodeTestUtils.checkEqualsAndHashCode(block, this::serializeDeserializeBlock, null, Releasable::close); } public void testEmpty() { @@ -84,22 +86,22 @@ public void testEmpty() { IntBlock intBlock = IntBlock.newBlockBuilder(initialSize).build(); assertThat(intBlock.getPositionCount(), is(0)); assertThat(intBlock.asVector(), is(notNullValue())); - EqualsHashCodeTestUtils.checkEqualsAndHashCode(intBlock, block -> serializeDeserializeBlock(block)); + EqualsHashCodeTestUtils.checkEqualsAndHashCode(intBlock, this::serializeDeserializeBlock, null, Releasable::close); LongBlock longBlock = LongBlock.newBlockBuilder(initialSize).build(); assertThat(longBlock.getPositionCount(), is(0)); assertThat(longBlock.asVector(), is(notNullValue())); - EqualsHashCodeTestUtils.checkEqualsAndHashCode(longBlock, block -> serializeDeserializeBlock(block)); + EqualsHashCodeTestUtils.checkEqualsAndHashCode(longBlock, this::serializeDeserializeBlock, null, Releasable::close); DoubleBlock doubleBlock = DoubleBlock.newBlockBuilder(initialSize).build(); assertThat(doubleBlock.getPositionCount(), is(0)); assertThat(doubleBlock.asVector(), is(notNullValue())); - EqualsHashCodeTestUtils.checkEqualsAndHashCode(doubleBlock, block -> serializeDeserializeBlock(block)); + EqualsHashCodeTestUtils.checkEqualsAndHashCode(doubleBlock, this::serializeDeserializeBlock, null, Releasable::close); BytesRefBlock bytesRefBlock = BytesRefBlock.newBlockBuilder(initialSize).build(); assertThat(bytesRefBlock.getPositionCount(), is(0)); assertThat(bytesRefBlock.asVector(), is(notNullValue())); - EqualsHashCodeTestUtils.checkEqualsAndHashCode(bytesRefBlock, block -> serializeDeserializeBlock(block)); + EqualsHashCodeTestUtils.checkEqualsAndHashCode(bytesRefBlock, this::serializeDeserializeBlock, null, Releasable::close); } } @@ -109,25 +111,25 @@ public void testNullOnly() throws IOException { assertThat(intBlock.getPositionCount(), is(1)); assertThat(intBlock.getValueCount(0), is(0)); assertNull(intBlock.asVector()); - EqualsHashCodeTestUtils.checkEqualsAndHashCode(intBlock, block -> serializeDeserializeBlock(block)); + EqualsHashCodeTestUtils.checkEqualsAndHashCode(intBlock, this::serializeDeserializeBlock, null, Releasable::close); LongBlock longBlock = LongBlock.newBlockBuilder(initialSize).appendNull().build(); assertThat(longBlock.getPositionCount(), is(1)); assertThat(longBlock.getValueCount(0), is(0)); assertNull(longBlock.asVector()); - EqualsHashCodeTestUtils.checkEqualsAndHashCode(longBlock, block -> serializeDeserializeBlock(block)); + EqualsHashCodeTestUtils.checkEqualsAndHashCode(longBlock, this::serializeDeserializeBlock, null, Releasable::close); DoubleBlock doubleBlock = DoubleBlock.newBlockBuilder(initialSize).appendNull().build(); assertThat(doubleBlock.getPositionCount(), is(1)); assertThat(doubleBlock.getValueCount(0), is(0)); assertNull(doubleBlock.asVector()); - EqualsHashCodeTestUtils.checkEqualsAndHashCode(doubleBlock, block -> serializeDeserializeBlock(block)); + EqualsHashCodeTestUtils.checkEqualsAndHashCode(doubleBlock, this::serializeDeserializeBlock, null, Releasable::close); BytesRefBlock bytesRefBlock = BytesRefBlock.newBlockBuilder(initialSize).appendNull().build(); assertThat(bytesRefBlock.getPositionCount(), is(1)); assertThat(bytesRefBlock.getValueCount(0), is(0)); assertNull(bytesRefBlock.asVector()); - EqualsHashCodeTestUtils.checkEqualsAndHashCode(bytesRefBlock, block -> serializeDeserializeBlock(block)); + EqualsHashCodeTestUtils.checkEqualsAndHashCode(bytesRefBlock, this::serializeDeserializeBlock, null, Releasable::close); } } @@ -149,22 +151,22 @@ public void testNullsFollowedByValues() { Block intBlock = TestBlockBuilder.blockFromValues(blockValues, ElementType.INT); assertThat(intBlock.elementType(), is(equalTo(ElementType.INT))); BlockValueAsserter.assertBlockValues(intBlock, blockValues); - EqualsHashCodeTestUtils.checkEqualsAndHashCode(intBlock, block -> serializeDeserializeBlock(block)); + EqualsHashCodeTestUtils.checkEqualsAndHashCode(intBlock, this::serializeDeserializeBlock, null, Releasable::close); Block longBlock = TestBlockBuilder.blockFromValues(blockValues, ElementType.LONG); assertThat(longBlock.elementType(), is(equalTo(ElementType.LONG))); BlockValueAsserter.assertBlockValues(longBlock, blockValues); - EqualsHashCodeTestUtils.checkEqualsAndHashCode(intBlock, block -> serializeDeserializeBlock(block)); + EqualsHashCodeTestUtils.checkEqualsAndHashCode(intBlock, this::serializeDeserializeBlock, null, Releasable::close); Block doubleBlock = TestBlockBuilder.blockFromValues(blockValues, ElementType.DOUBLE); assertThat(doubleBlock.elementType(), is(equalTo(ElementType.DOUBLE))); BlockValueAsserter.assertBlockValues(doubleBlock, blockValues); - EqualsHashCodeTestUtils.checkEqualsAndHashCode(intBlock, block -> serializeDeserializeBlock(block)); + EqualsHashCodeTestUtils.checkEqualsAndHashCode(intBlock, this::serializeDeserializeBlock, null, Releasable::close); Block bytesRefBlock = TestBlockBuilder.blockFromValues(blockValues, ElementType.BYTES_REF); assertThat(bytesRefBlock.elementType(), is(equalTo(ElementType.BYTES_REF))); BlockValueAsserter.assertBlockValues(bytesRefBlock, blockValues); - EqualsHashCodeTestUtils.checkEqualsAndHashCode(intBlock, block -> serializeDeserializeBlock(block)); + EqualsHashCodeTestUtils.checkEqualsAndHashCode(intBlock, this::serializeDeserializeBlock, null, Releasable::close); } public void testMultiValuesAndNullsSmall() { @@ -181,22 +183,22 @@ public void testMultiValuesAndNullsSmall() { Block intBlock = TestBlockBuilder.blockFromValues(blockValues, ElementType.INT); assertThat(intBlock.elementType(), is(equalTo(ElementType.INT))); BlockValueAsserter.assertBlockValues(intBlock, blockValues); - EqualsHashCodeTestUtils.checkEqualsAndHashCode(intBlock, block -> serializeDeserializeBlock(block)); + EqualsHashCodeTestUtils.checkEqualsAndHashCode(intBlock, this::serializeDeserializeBlock, null, Releasable::close); Block longBlock = TestBlockBuilder.blockFromValues(blockValues, ElementType.LONG); assertThat(longBlock.elementType(), is(equalTo(ElementType.LONG))); BlockValueAsserter.assertBlockValues(longBlock, blockValues); - EqualsHashCodeTestUtils.checkEqualsAndHashCode(intBlock, block -> serializeDeserializeBlock(block)); + EqualsHashCodeTestUtils.checkEqualsAndHashCode(intBlock, this::serializeDeserializeBlock, null, Releasable::close); Block doubleBlock = TestBlockBuilder.blockFromValues(blockValues, ElementType.DOUBLE); assertThat(doubleBlock.elementType(), is(equalTo(ElementType.DOUBLE))); BlockValueAsserter.assertBlockValues(doubleBlock, blockValues); - EqualsHashCodeTestUtils.checkEqualsAndHashCode(intBlock, block -> serializeDeserializeBlock(block)); + EqualsHashCodeTestUtils.checkEqualsAndHashCode(intBlock, this::serializeDeserializeBlock, null, Releasable::close); Block bytesRefBlock = TestBlockBuilder.blockFromValues(blockValues, ElementType.BYTES_REF); assertThat(bytesRefBlock.elementType(), is(equalTo(ElementType.BYTES_REF))); BlockValueAsserter.assertBlockValues(bytesRefBlock, blockValues); - EqualsHashCodeTestUtils.checkEqualsAndHashCode(intBlock, block -> serializeDeserializeBlock(block)); + EqualsHashCodeTestUtils.checkEqualsAndHashCode(intBlock, this::serializeDeserializeBlock, null, Releasable::close); } public void testMultiValuesAndNulls() { @@ -217,27 +219,27 @@ public void testMultiValuesAndNulls() { Block intBlock = TestBlockBuilder.blockFromValues(blockValues, ElementType.INT); assertThat(intBlock.elementType(), is(equalTo(ElementType.INT))); BlockValueAsserter.assertBlockValues(intBlock, blockValues); - EqualsHashCodeTestUtils.checkEqualsAndHashCode(intBlock, block -> serializeDeserializeBlock(block)); + EqualsHashCodeTestUtils.checkEqualsAndHashCode(intBlock, this::serializeDeserializeBlock, null, Releasable::close); Block longBlock = TestBlockBuilder.blockFromValues(blockValues, ElementType.LONG); assertThat(longBlock.elementType(), is(equalTo(ElementType.LONG))); BlockValueAsserter.assertBlockValues(longBlock, blockValues); - EqualsHashCodeTestUtils.checkEqualsAndHashCode(intBlock, block -> serializeDeserializeBlock(block)); + EqualsHashCodeTestUtils.checkEqualsAndHashCode(intBlock, this::serializeDeserializeBlock, null, Releasable::close); Block doubleBlock = TestBlockBuilder.blockFromValues(blockValues, ElementType.DOUBLE); assertThat(doubleBlock.elementType(), is(equalTo(ElementType.DOUBLE))); BlockValueAsserter.assertBlockValues(doubleBlock, blockValues); - EqualsHashCodeTestUtils.checkEqualsAndHashCode(intBlock, block -> serializeDeserializeBlock(block)); + EqualsHashCodeTestUtils.checkEqualsAndHashCode(intBlock, this::serializeDeserializeBlock, null, Releasable::close); Block bytesRefBlock = TestBlockBuilder.blockFromValues(blockValues, ElementType.BYTES_REF); assertThat(bytesRefBlock.elementType(), is(equalTo(ElementType.BYTES_REF))); BlockValueAsserter.assertBlockValues(bytesRefBlock, blockValues); - EqualsHashCodeTestUtils.checkEqualsAndHashCode(intBlock, block -> serializeDeserializeBlock(block)); + EqualsHashCodeTestUtils.checkEqualsAndHashCode(intBlock, this::serializeDeserializeBlock, null, Releasable::close); } // Tests that the use of Block builder beginPositionEntry (or not) with just a single value, // and no nulls, builds a block backed by a vector. - public void testSingleNonNullValues() { + public void testSingleNonNullValues() throws IOException { List blockValues = new ArrayList<>(); int positions = randomInt(512); for (int i = 0; i < positions; i++) { @@ -256,10 +258,14 @@ public void testSingleNonNullValues() { TestBlockBuilder.blockFromSingleValues(blockValues, ElementType.BYTES_REF), TestBlockBuilder.blockFromValues(blockValues.stream().map(List::of).toList(), ElementType.BYTES_REF) ); - for (Block block : blocks) { - assertThat(block.asVector(), is(notNullValue())); - BlockValueAsserter.assertBlockValues(block, blockValues.stream().map(List::of).toList()); - EqualsHashCodeTestUtils.checkEqualsAndHashCode(block, unused -> serializeDeserializeBlock(block)); + try { + for (Block block : blocks) { + assertThat(block.asVector(), is(notNullValue())); + BlockValueAsserter.assertBlockValues(block, blockValues.stream().map(List::of).toList()); + EqualsHashCodeTestUtils.checkEqualsAndHashCode(block, this::serializeDeserializeBlock, null, Releasable::close); + } + } finally { + Releasables.close(blocks); } } @@ -302,7 +308,7 @@ public void testSingleWithNullValues() { for (Block block : blocks) { assertThat(block.asVector(), is(nullValue())); BlockValueAsserter.assertBlockValues(block, blockValues.stream().map(MultiValueBlockTests::mapToList).toList()); - EqualsHashCodeTestUtils.checkEqualsAndHashCode(block, unused -> serializeDeserializeBlock(block)); + EqualsHashCodeTestUtils.checkEqualsAndHashCode(block, this::serializeDeserializeBlock, null, Releasable::close); } } diff --git a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/data/SerializationTestCase.java b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/data/SerializationTestCase.java index 62b754d76fe49..b0666e89cf79e 100644 --- a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/data/SerializationTestCase.java +++ b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/data/SerializationTestCase.java @@ -7,35 +7,62 @@ package org.elasticsearch.compute.data; +import org.elasticsearch.common.breaker.CircuitBreaker; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.io.stream.ByteBufferStreamInput; import org.elasticsearch.common.io.stream.BytesStreamOutput; import org.elasticsearch.common.io.stream.NamedWriteableAwareStreamInput; import org.elasticsearch.common.io.stream.NamedWriteableRegistry; -import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.unit.ByteSizeValue; +import org.elasticsearch.common.util.BigArrays; +import org.elasticsearch.common.util.MockBigArrays; +import org.elasticsearch.common.util.PageCacheRecycler; import org.elasticsearch.test.ESTestCase; +import org.junit.After; +import org.junit.Before; import java.io.IOException; import java.io.UncheckedIOException; -abstract class SerializationTestCase extends ESTestCase { +import static org.hamcrest.Matchers.equalTo; - final NamedWriteableRegistry registry = new NamedWriteableRegistry(Block.getNamedWriteables()); +public abstract class SerializationTestCase extends ESTestCase { + BigArrays bigArrays; + private BlockFactory blockFactory; + NamedWriteableRegistry registry = new NamedWriteableRegistry(Block.getNamedWriteables()); + + @Before + public final void newBlockFactory() { + bigArrays = new MockBigArrays(PageCacheRecycler.NON_RECYCLING_INSTANCE, ByteSizeValue.ofGb(1)).withCircuitBreaking(); + blockFactory = new BlockFactory(bigArrays.breakerService().getBreaker(CircuitBreaker.REQUEST), bigArrays); + } + + @After + public final void blockFactoryEmpty() { + assertThat(blockFactory.breaker().getUsed(), equalTo(0L)); + blockFactory = null; + registry = null; + } Page serializeDeserializePage(Page origPage) throws IOException { try (BytesStreamOutput out = new BytesStreamOutput()) { origPage.writeTo(out); - StreamInput in = new NamedWriteableAwareStreamInput(ByteBufferStreamInput.wrap(BytesReference.toBytes(out.bytes())), registry); - return new Page(in); + return new Page(blockStreamInput(out)); } } + BlockStreamInput blockStreamInput(BytesStreamOutput out) { + return new BlockStreamInput( + new NamedWriteableAwareStreamInput(ByteBufferStreamInput.wrap(BytesReference.toBytes(out.bytes())), registry), + blockFactory + ); + } + @SuppressWarnings("unchecked") T serializeDeserializeBlock(T origBlock) throws IOException { try (BytesStreamOutput out = new BytesStreamOutput()) { out.writeNamedWriteable(origBlock); - StreamInput in = new NamedWriteableAwareStreamInput(ByteBufferStreamInput.wrap(BytesReference.toBytes(out.bytes())), registry); - return (T) in.readNamedWriteable(Block.class); + return (T) blockStreamInput(out).readNamedWriteable(Block.class); } } diff --git a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/LimitOperatorTests.java b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/LimitOperatorTests.java index 993b809c909b2..b5d078754b26d 100644 --- a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/LimitOperatorTests.java +++ b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/LimitOperatorTests.java @@ -29,7 +29,7 @@ protected DriverContext driverContext() { } @Override - protected Operator.OperatorFactory simple(BigArrays bigArrays) { + protected LimitOperator.Factory simple(BigArrays bigArrays) { return new LimitOperator.Factory(100); } @@ -62,16 +62,21 @@ protected ByteSizeValue smallEnoughToCircuitBreak() { } public void testStatus() { - LimitOperator op = (LimitOperator) simple(BigArrays.NON_RECYCLING_INSTANCE).get(driverContext()); + BlockFactory blockFactory = driverContext().blockFactory(); + LimitOperator op = simple(BigArrays.NON_RECYCLING_INSTANCE).get(driverContext()); LimitOperator.Status status = op.status(); assertThat(status.limit(), equalTo(100)); assertThat(status.limitRemaining(), equalTo(100)); assertThat(status.pagesProcessed(), equalTo(0)); - Page p = new Page(Block.constantNullBlock(10)); - op.addInput(p); - assertSame(p, op.getOutput()); + Page p = new Page(blockFactory.newConstantNullBlock(10)); + try { + op.addInput(p); + assertSame(p, op.getOutput()); + } finally { + p.releaseBlocks(); + } status = op.status(); assertThat(status.limit(), equalTo(100)); assertThat(status.limitRemaining(), equalTo(90)); @@ -79,15 +84,17 @@ public void testStatus() { } public void testNeedInput() { - LimitOperator op = (LimitOperator) simple(BigArrays.NON_RECYCLING_INSTANCE).get(driverContext()); - assertTrue(op.needsInput()); - Page p = new Page(Block.constantNullBlock(10)); - op.addInput(p); - assertFalse(op.needsInput()); - op.getOutput(); - assertTrue(op.needsInput()); - op.finish(); - assertFalse(op.needsInput()); + BlockFactory blockFactory = driverContext().blockFactory(); + try (LimitOperator op = simple(BigArrays.NON_RECYCLING_INSTANCE).get(driverContext())) { + assertTrue(op.needsInput()); + Page p = new Page(blockFactory.newConstantNullBlock(10)); + op.addInput(p); + assertFalse(op.needsInput()); + op.getOutput().releaseBlocks(); + assertTrue(op.needsInput()); + op.finish(); + assertFalse(op.needsInput()); + } } // TODO: remove this once possible diff --git a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/exchange/ExchangeServiceTests.java b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/exchange/ExchangeServiceTests.java index 114d4caed8a0f..78042a8587350 100644 --- a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/exchange/ExchangeServiceTests.java +++ b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/exchange/ExchangeServiceTests.java @@ -14,8 +14,10 @@ import org.elasticsearch.action.support.SubscribableListener; import org.elasticsearch.cluster.ClusterModule; import org.elasticsearch.cluster.node.VersionInformation; +import org.elasticsearch.common.breaker.CircuitBreaker; import org.elasticsearch.common.io.stream.NamedWriteableRegistry; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.util.MockBigArrays; import org.elasticsearch.common.util.PageCacheRecycler; import org.elasticsearch.common.util.concurrent.ConcurrentCollections; @@ -31,7 +33,6 @@ import org.elasticsearch.compute.operator.SinkOperator; import org.elasticsearch.compute.operator.SourceOperator; import org.elasticsearch.core.TimeValue; -import org.elasticsearch.indices.breaker.NoneCircuitBreakerService; import org.elasticsearch.tasks.Task; import org.elasticsearch.tasks.TaskCancellationService; import org.elasticsearch.test.ESTestCase; @@ -225,17 +226,21 @@ public boolean needsInput() { @Override public void addInput(Page page) { - assertFalse("already finished", finished); - IntBlock block = page.getBlock(0); - for (int i = 0; i < block.getPositionCount(); i++) { - int v = block.getInt(i); - if (v < maxOutputSeqNo) { - assertTrue(receivedSeqNos.add(v)); - // Early termination - if (receivedSeqNos.size() >= maxOutputSeqNo && randomBoolean()) { - finished = true; + try { + assertFalse("already finished", finished); + IntBlock block = page.getBlock(0); + for (int i = 0; i < block.getPositionCount(); i++) { + int v = block.getInt(i); + if (v < maxOutputSeqNo) { + assertTrue(receivedSeqNos.add(v)); + // Early termination + if (receivedSeqNos.size() >= maxOutputSeqNo && randomBoolean()) { + finished = true; + } } } + } finally { + page.releaseBlocks(); } } @@ -353,10 +358,10 @@ public void testEarlyTerminate() { public void testConcurrentWithTransportActions() throws Exception { MockTransportService node0 = newTransportService(); - ExchangeService exchange0 = new ExchangeService(Settings.EMPTY, threadPool, ESQL_TEST_EXECUTOR); + ExchangeService exchange0 = new ExchangeService(Settings.EMPTY, threadPool, ESQL_TEST_EXECUTOR, blockFactory()); exchange0.registerTransportHandler(node0); MockTransportService node1 = newTransportService(); - ExchangeService exchange1 = new ExchangeService(Settings.EMPTY, threadPool, ESQL_TEST_EXECUTOR); + ExchangeService exchange1 = new ExchangeService(Settings.EMPTY, threadPool, ESQL_TEST_EXECUTOR, blockFactory()); exchange1.registerTransportHandler(node1); AbstractSimpleTransportTestCase.connectToNode(node0, node1.getLocalNode()); @@ -372,13 +377,13 @@ public void testConcurrentWithTransportActions() throws Exception { } } - public void testFailToRespondPage() throws Exception { + public void testFailToRespondPage() { Settings settings = Settings.builder().build(); MockTransportService node0 = newTransportService(); - ExchangeService exchange0 = new ExchangeService(settings, threadPool, ESQL_TEST_EXECUTOR); + ExchangeService exchange0 = new ExchangeService(settings, threadPool, ESQL_TEST_EXECUTOR, blockFactory()); exchange0.registerTransportHandler(node0); MockTransportService node1 = newTransportService(); - ExchangeService exchange1 = new ExchangeService(settings, threadPool, ESQL_TEST_EXECUTOR); + ExchangeService exchange1 = new ExchangeService(settings, threadPool, ESQL_TEST_EXECUTOR, blockFactory()); exchange1.registerTransportHandler(node1); AbstractSimpleTransportTestCase.connectToNode(node0, node1.getLocalNode()); final int maxSeqNo = randomIntBetween(1000, 5000); @@ -393,18 +398,25 @@ public void messageReceived( ) throws Exception { FilterTransportChannel filterChannel = new FilterTransportChannel(channel) { @Override - public void sendResponse(TransportResponse response) throws IOException { - ExchangeResponse exchangeResponse = (ExchangeResponse) response; - Page page = exchangeResponse.takePage(); + public void sendResponse(TransportResponse transportResponse) throws IOException { + ExchangeResponse origResp = (ExchangeResponse) transportResponse; + Page page = origResp.takePage(); if (page != null) { IntBlock block = page.getBlock(0); for (int i = 0; i < block.getPositionCount(); i++) { if (block.getInt(i) == disconnectOnSeqNo) { + page.releaseBlocks(); throw new IOException("page is too large"); } } } - super.sendResponse(response); + ExchangeResponse newResp = new ExchangeResponse(page, origResp.finished()); + origResp.decRef(); + while (origResp.hasReferences()) { + newResp.incRef(); + origResp.decRef(); + } + super.sendResponse(newResp); } }; handler.messageReceived(request, filterChannel, task); @@ -476,13 +488,24 @@ public void sendResponse(Exception exception) throws IOException { } } - /** - * A {@link DriverContext} with a BigArrays that does not circuit break. - */ - DriverContext driverContext() { - return new DriverContext( - new MockBigArrays(PageCacheRecycler.NON_RECYCLING_INSTANCE, new NoneCircuitBreakerService()).withCircuitBreaking(), - BlockFactory.getNonBreakingInstance() - ); + private final List breakers = Collections.synchronizedList(new ArrayList<>()); + + private DriverContext driverContext() { + BlockFactory blockFactory = blockFactory(); + return new DriverContext(blockFactory.bigArrays(), blockFactory); + } + + private BlockFactory blockFactory() { + MockBigArrays bigArrays = new MockBigArrays(PageCacheRecycler.NON_RECYCLING_INSTANCE, ByteSizeValue.ofGb(1)); + CircuitBreaker breaker = bigArrays.breakerService().getBreaker(CircuitBreaker.REQUEST); + breakers.add(breaker); + return new BlockFactory(breaker, bigArrays); + } + + @After + public void allMemoryReleased() { + for (CircuitBreaker breaker : breakers) { + assertThat(breaker.getUsed(), equalTo(0L)); + } } } diff --git a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/EsqlActionIT.java b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/EsqlActionIT.java index adaaeae399cb9..fd4fe13b9c1b1 100644 --- a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/EsqlActionIT.java +++ b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/EsqlActionIT.java @@ -862,6 +862,7 @@ public void testFromStatsLimit() { } } + @AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/99826") public void testFromLimit() { try (EsqlQueryResponse results = run("from test | keep data | limit 2")) { logger.info(results); diff --git a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/plugin/CanMatchIT.java b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/plugin/CanMatchIT.java index 5d9600addb403..3969190630fd3 100644 --- a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/plugin/CanMatchIT.java +++ b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/plugin/CanMatchIT.java @@ -78,26 +78,41 @@ public void testCanMatch() { handler.messageReceived(request, channel, task); }); } - EsqlQueryResponse resp = run("from events_*", randomPragmas(), new RangeQueryBuilder("@timestamp").gte("2023-01-01")); - assertThat(getValuesList(resp), hasSize(4)); - assertThat(queriedIndices, equalTo(Set.of("events_2023"))); - queriedIndices.clear(); - - resp = run("from events_*", randomPragmas(), new RangeQueryBuilder("@timestamp").lt("2023-01-01")); - assertThat(getValuesList(resp), hasSize(3)); - assertThat(queriedIndices, equalTo(Set.of("events_2022"))); - queriedIndices.clear(); - - resp = run("from events_*", randomPragmas(), new RangeQueryBuilder("@timestamp").gt("2022-01-01").lt("2023-12-31")); - assertThat(getValuesList(resp), hasSize(7)); - assertThat(queriedIndices, equalTo(Set.of("events_2022", "events_2023"))); - queriedIndices.clear(); - - resp = run("from events_*", randomPragmas(), new RangeQueryBuilder("@timestamp").gt("2021-01-01").lt("2021-12-31")); - assertThat(getValuesList(resp), hasSize(0)); - assertThat(queriedIndices, empty()); - queriedIndices.clear(); + try (EsqlQueryResponse resp = run("from events_*", randomPragmas(), new RangeQueryBuilder("@timestamp").gte("2023-01-01"))) { + assertThat(getValuesList(resp), hasSize(4)); + assertThat(queriedIndices, equalTo(Set.of("events_2023"))); + queriedIndices.clear(); + } + + try (EsqlQueryResponse resp = run("from events_*", randomPragmas(), new RangeQueryBuilder("@timestamp").lt("2023-01-01"))) { + assertThat(getValuesList(resp), hasSize(3)); + assertThat(queriedIndices, equalTo(Set.of("events_2022"))); + queriedIndices.clear(); + } + try ( + EsqlQueryResponse resp = run( + "from events_*", + randomPragmas(), + new RangeQueryBuilder("@timestamp").gt("2022-01-01").lt("2023-12-31") + ) + ) { + assertThat(getValuesList(resp), hasSize(7)); + assertThat(queriedIndices, equalTo(Set.of("events_2022", "events_2023"))); + queriedIndices.clear(); + } + + try ( + EsqlQueryResponse resp = run( + "from events_*", + randomPragmas(), + new RangeQueryBuilder("@timestamp").gt("2021-01-01").lt("2021-12-31") + ) + ) { + assertThat(getValuesList(resp), hasSize(0)); + assertThat(queriedIndices, empty()); + queriedIndices.clear(); + } } finally { for (TransportService ts : internalCluster().getInstances(TransportService.class)) { ((MockTransportService) ts).clearAllRules(); @@ -226,11 +241,12 @@ public void testFailOnUnavailableShards() throws Exception { .add(new IndexRequest().source("timestamp", 10, "message", "aa")) .add(new IndexRequest().source("timestamp", 11, "message", "bb")) .get(); - EsqlQueryResponse resp = run("from events,logs | KEEP timestamp,message"); - assertThat(getValuesList(resp), hasSize(5)); - internalCluster().stopNode(logsOnlyNode); - ensureClusterSizeConsistency(); - Exception error = expectThrows(Exception.class, () -> run("from events,logs | KEEP timestamp,message")); - assertThat(error.getMessage(), containsString("no shard copies found")); + try (EsqlQueryResponse resp = run("from events,logs | KEEP timestamp,message")) { + assertThat(getValuesList(resp), hasSize(5)); + internalCluster().stopNode(logsOnlyNode); + ensureClusterSizeConsistency(); + Exception error = expectThrows(Exception.class, () -> run("from events,logs | KEEP timestamp,message")); + assertThat(error.getMessage(), containsString("no shard copies found")); + } } } diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlQueryAction.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlQueryAction.java index b16b7b78f2eb0..13b5b067f5cc9 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlQueryAction.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlQueryAction.java @@ -15,6 +15,6 @@ public class EsqlQueryAction extends ActionType { public static final String NAME = "indices:data/read/esql"; private EsqlQueryAction() { - super(NAME, EsqlQueryResponse::new); + super(NAME, in -> { throw new IllegalArgumentException("can't transport EsqlQuery"); }); } } diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlQueryResponse.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlQueryResponse.java index 87d98b0bc61e9..a5194b1695c2c 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlQueryResponse.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlQueryResponse.java @@ -11,11 +11,13 @@ import org.elasticsearch.action.ActionResponse; import org.elasticsearch.common.Strings; import org.elasticsearch.common.collect.Iterators; -import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.io.stream.Writeable; import org.elasticsearch.common.xcontent.ChunkedToXContent; import org.elasticsearch.common.xcontent.ChunkedToXContentHelper; import org.elasticsearch.compute.data.Block; +import org.elasticsearch.compute.data.BlockFactory; +import org.elasticsearch.compute.data.BlockStreamInput; import org.elasticsearch.compute.data.BooleanBlock; import org.elasticsearch.compute.data.BytesRefBlock; import org.elasticsearch.compute.data.DoubleBlock; @@ -80,7 +82,14 @@ public EsqlQueryResponse(List columns, List> values) { this.columnar = false; } - public EsqlQueryResponse(StreamInput in) throws IOException { + /** + * Build a reader for the response. + */ + public static Writeable.Reader reader(BlockFactory blockFactory) { + return in -> new EsqlQueryResponse(new BlockStreamInput(in, blockFactory)); + } + + public EsqlQueryResponse(BlockStreamInput in) throws IOException { super(in); this.columns = in.readCollectionAsList(ColumnInfo::new); this.pages = in.readCollectionAsList(Page::new); diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/EnrichLookupService.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/EnrichLookupService.java index ca37b498f05ac..06b7874a8708f 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/EnrichLookupService.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/EnrichLookupService.java @@ -24,6 +24,7 @@ import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.compute.data.Block; import org.elasticsearch.compute.data.BlockFactory; +import org.elasticsearch.compute.data.BlockStreamInput; import org.elasticsearch.compute.data.ElementType; import org.elasticsearch.compute.data.Page; import org.elasticsearch.compute.lucene.ValueSources; @@ -294,7 +295,8 @@ private static class LookupRequest extends TransportRequest implements IndicesRe this.shardId = new ShardId(in); this.matchType = in.readString(); this.matchField = in.readString(); - this.inputPage = new Page(in); + // TODO real BlockFactory + this.inputPage = new Page(new BlockStreamInput(in, BlockFactory.getNonBreakingInstance())); PlanStreamInput planIn = new PlanStreamInput(in, PlanNameRegistry.INSTANCE, in.namedWriteableRegistry(), null); this.extractFields = planIn.readCollectionAsList(readerFromPlanReader(PlanStreamInput::readNamedExpression)); } @@ -364,7 +366,8 @@ private static class LookupResponse extends TransportResponse { } LookupResponse(StreamInput in) throws IOException { - this.page = new Page(in); + // TODO real BlockFactory + this.page = new Page(new BlockStreamInput(in, BlockFactory.getNonBreakingInstance())); } @Override diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/EsqlPlugin.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/EsqlPlugin.java index 62a74e5023773..2608d4525b153 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/EsqlPlugin.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/EsqlPlugin.java @@ -13,6 +13,7 @@ import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.cluster.routing.allocation.AllocationService; import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.breaker.CircuitBreaker; import org.elasticsearch.common.io.stream.NamedWriteableRegistry; import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.IndexScopedSettings; @@ -21,6 +22,7 @@ import org.elasticsearch.common.settings.SettingsFilter; import org.elasticsearch.common.util.concurrent.EsExecutors; import org.elasticsearch.compute.data.Block; +import org.elasticsearch.compute.data.BlockFactory; import org.elasticsearch.compute.lucene.LuceneSourceOperator; import org.elasticsearch.compute.lucene.ValuesSourceReaderOperator; import org.elasticsearch.compute.operator.AbstractPageMappingOperator; @@ -58,6 +60,7 @@ import java.util.Collection; import java.util.List; +import java.util.Objects; import java.util.Set; import java.util.function.Supplier; import java.util.stream.Stream; @@ -100,9 +103,13 @@ public Collection createComponents( AllocationService allocationService, IndicesService indicesService ) { + CircuitBreaker circuitBreaker = indicesService.getBigArrays().breakerService().getBreaker("request"); + Objects.requireNonNull(circuitBreaker, "request circuit breaker wasn't set"); + BlockFactory blockFactory = new BlockFactory(circuitBreaker, indicesService.getBigArrays()); return List.of( new PlanExecutor(new IndexResolver(client, clusterService.getClusterName().value(), EsqlDataTypeRegistry.INSTANCE, Set::of)), - new ExchangeService(clusterService.getSettings(), threadPool, EsqlPlugin.ESQL_THREAD_POOL_NAME) + new ExchangeService(clusterService.getSettings(), threadPool, EsqlPlugin.ESQL_THREAD_POOL_NAME, blockFactory), + blockFactory ); } diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/TransportEsqlQueryAction.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/TransportEsqlQueryAction.java index af515d0797202..2e51ae27f4851 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/TransportEsqlQueryAction.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/TransportEsqlQueryAction.java @@ -12,7 +12,6 @@ import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.HandledTransportAction; import org.elasticsearch.cluster.service.ClusterService; -import org.elasticsearch.common.breaker.CircuitBreaker; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.BigArrays; @@ -38,7 +37,6 @@ import java.time.ZoneOffset; import java.util.List; import java.util.Locale; -import java.util.Objects; import java.util.concurrent.Executor; public class TransportEsqlQueryAction extends HandledTransportAction { @@ -62,7 +60,8 @@ public TransportEsqlQueryAction( ExchangeService exchangeService, ClusterService clusterService, ThreadPool threadPool, - BigArrays bigArrays + BigArrays bigArrays, + BlockFactory blockFactory ) { // TODO replace SAME when removing workaround for https://github.com/elastic/elasticsearch/issues/97916 super(EsqlQueryAction.NAME, transportService, actionFilters, EsqlQueryRequest::new, EsExecutors.DIRECT_EXECUTOR_SERVICE); @@ -71,7 +70,6 @@ public TransportEsqlQueryAction( this.requestExecutor = threadPool.executor(EsqlPlugin.ESQL_THREAD_POOL_NAME); exchangeService.registerTransportHandler(transportService); this.exchangeService = exchangeService; - var blockFactory = createBlockFactory(bigArrays); this.enrichPolicyResolver = new EnrichPolicyResolver(clusterService, transportService, planExecutor.indexResolver()); this.enrichLookupService = new EnrichLookupService(clusterService, searchService, transportService, bigArrays, blockFactory); this.computeService = new ComputeService( @@ -86,12 +84,6 @@ public TransportEsqlQueryAction( this.settings = settings; } - static BlockFactory createBlockFactory(BigArrays bigArrays) { - CircuitBreaker circuitBreaker = bigArrays.breakerService().getBreaker("request"); - Objects.requireNonNull(circuitBreaker, "request circuit breaker wasn't set"); - return new BlockFactory(circuitBreaker, bigArrays); - } - @Override protected void doExecute(Task task, EsqlQueryRequest request, ActionListener listener) { // workaround for https://github.com/elastic/elasticsearch/issues/97916 - TODO remove this when we can diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/action/EsqlQueryResponseTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/action/EsqlQueryResponseTests.java index 5bf8df1c3fd0b..7920e0575fd89 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/action/EsqlQueryResponseTests.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/action/EsqlQueryResponseTests.java @@ -10,9 +10,16 @@ import org.apache.lucene.document.InetAddressPoint; import org.apache.lucene.util.BytesRef; import org.elasticsearch.common.Strings; +import org.elasticsearch.common.breaker.CircuitBreaker; import org.elasticsearch.common.io.stream.NamedWriteableRegistry; import org.elasticsearch.common.io.stream.Writeable; +import org.elasticsearch.common.unit.ByteSizeValue; +import org.elasticsearch.common.util.BigArrays; +import org.elasticsearch.common.util.MockBigArrays; +import org.elasticsearch.common.util.PageCacheRecycler; import org.elasticsearch.compute.data.Block; +import org.elasticsearch.compute.data.BlockFactory; +import org.elasticsearch.compute.data.BlockUtils; import org.elasticsearch.compute.data.BooleanBlock; import org.elasticsearch.compute.data.BytesRefBlock; import org.elasticsearch.compute.data.DoubleBlock; @@ -29,6 +36,8 @@ import org.elasticsearch.xpack.ql.type.DataType; import org.elasticsearch.xpack.ql.type.DataTypes; import org.elasticsearch.xpack.versionfield.Version; +import org.junit.After; +import org.junit.Before; import java.util.ArrayList; import java.util.List; @@ -36,6 +45,19 @@ import static org.hamcrest.Matchers.equalTo; public class EsqlQueryResponseTests extends AbstractChunkedSerializingTestCase { + private BlockFactory blockFactory; + + @Before + public void newBlockFactory() { + BigArrays bigArrays = new MockBigArrays(PageCacheRecycler.NON_RECYCLING_INSTANCE, ByteSizeValue.ofGb(1)).withCircuitBreaking(); + blockFactory = new BlockFactory(bigArrays.breakerService().getBreaker(CircuitBreaker.REQUEST), bigArrays); + } + + @After + public void blockFactoryEmpty() { + assertThat(blockFactory.breaker().getUsed(), equalTo(0L)); + } + @Override protected NamedWriteableRegistry getNamedWriteableRegistry() { return new NamedWriteableRegistry(Block.getNamedWriteables()); @@ -72,7 +94,7 @@ private ColumnInfo randomColumnInfo() { private Page randomPage(List columns) { return new Page(columns.stream().map(c -> { - Block.Builder builder = LocalExecutionPlanner.toElementType(EsqlDataTypes.fromName(c.type())).newBlockBuilder(1); + Block.Builder builder = LocalExecutionPlanner.toElementType(EsqlDataTypes.fromName(c.type())).newBlockBuilder(1, blockFactory); switch (c.type()) { case "unsigned_long", "long" -> ((LongBlock.Builder) builder).appendLong(randomLong()); case "integer" -> ((IntBlock.Builder) builder).appendInt(randomInt()); @@ -109,9 +131,9 @@ protected EsqlQueryResponse mutateInstance(EsqlQueryResponse instance) { List cols = new ArrayList<>(instance.columns()); // keep the type the same so the values are still valid but change the name cols.set(mutCol, new ColumnInfo(cols.get(mutCol).name() + "mut", cols.get(mutCol).type())); - yield new EsqlQueryResponse(cols, instance.pages(), instance.columnar()); + yield new EsqlQueryResponse(cols, deepCopyOfPages(instance), instance.columnar()); } - case 1 -> new EsqlQueryResponse(instance.columns(), instance.pages(), false == instance.columnar()); + case 1 -> new EsqlQueryResponse(instance.columns(), deepCopyOfPages(instance), false == instance.columnar()); case 2 -> { int noPages = instance.pages().size(); yield new EsqlQueryResponse( @@ -124,9 +146,22 @@ yield new EsqlQueryResponse( }; } + private List deepCopyOfPages(EsqlQueryResponse response) { + List deepCopiedPages = new ArrayList<>(response.pages().size()); + for (Page p : response.pages()) { + Block[] deepCopiedBlocks = new Block[p.getBlockCount()]; + for (int b = 0; b < p.getBlockCount(); b++) { + deepCopiedBlocks[b] = BlockUtils.deepCopyOf(p.getBlock(b), blockFactory); + } + deepCopiedPages.add(new Page(deepCopiedBlocks)); + } + assertThat(deepCopiedPages, equalTo(response.pages())); + return deepCopiedPages; + } + @Override protected Writeable.Reader instanceReader() { - return EsqlQueryResponse::new; + return EsqlQueryResponse.reader(blockFactory); } @Override @@ -135,28 +170,32 @@ protected EsqlQueryResponse doParseInstance(XContentParser parser) { } public void testChunkResponseSizeColumnar() { - EsqlQueryResponse resp = randomResponse(true); - int columnCount = resp.pages().get(0).getBlockCount(); - int bodySize = resp.pages().stream().mapToInt(p -> p.getPositionCount() * p.getBlockCount()).sum() + columnCount * 2; - assertChunkCount(resp, r -> 5 + bodySize); + try (EsqlQueryResponse resp = randomResponse(true)) { + int columnCount = resp.pages().get(0).getBlockCount(); + int bodySize = resp.pages().stream().mapToInt(p -> p.getPositionCount() * p.getBlockCount()).sum() + columnCount * 2; + assertChunkCount(resp, r -> 5 + bodySize); + } } public void testChunkResponseSizeRows() { - EsqlQueryResponse resp = randomResponse(false); - int bodySize = resp.pages().stream().mapToInt(p -> p.getPositionCount()).sum(); - assertChunkCount(resp, r -> 5 + bodySize); + try (EsqlQueryResponse resp = randomResponse(false)) { + int bodySize = resp.pages().stream().mapToInt(p -> p.getPositionCount()).sum(); + assertChunkCount(resp, r -> 5 + bodySize); + } } public void testSimpleXContentColumnar() { - EsqlQueryResponse response = simple(true); - assertThat(Strings.toString(response), equalTo(""" - {"columns":[{"name":"foo","type":"integer"}],"values":[[40,80]]}""")); + try (EsqlQueryResponse response = simple(true)) { + assertThat(Strings.toString(response), equalTo(""" + {"columns":[{"name":"foo","type":"integer"}],"values":[[40,80]]}""")); + } } public void testSimpleXContentRows() { - EsqlQueryResponse response = simple(false); - assertThat(Strings.toString(response), equalTo(""" - {"columns":[{"name":"foo","type":"integer"}],"values":[[40],[80]]}""")); + try (EsqlQueryResponse response = simple(false)) { + assertThat(Strings.toString(response), equalTo(""" + {"columns":[{"name":"foo","type":"integer"}],"values":[[40],[80]]}""")); + } } private EsqlQueryResponse simple(boolean columnar) { @@ -166,4 +205,9 @@ private EsqlQueryResponse simple(boolean columnar) { columnar ); } + + @Override + protected void dispose(EsqlQueryResponse esqlQueryResponse) { + esqlQueryResponse.close(); + } }