diff --git a/docs/changelog/105893.yaml b/docs/changelog/105893.yaml new file mode 100644 index 0000000000000..c88736f5dda3d --- /dev/null +++ b/docs/changelog/105893.yaml @@ -0,0 +1,5 @@ +pr: 105893 +summary: Specialize serialization for `ArrayVectors` +area: ES|QL +type: enhancement +issues: [] diff --git a/server/src/main/java/org/elasticsearch/TransportVersions.java b/server/src/main/java/org/elasticsearch/TransportVersions.java index 29dec80875787..bc27ab8265b26 100644 --- a/server/src/main/java/org/elasticsearch/TransportVersions.java +++ b/server/src/main/java/org/elasticsearch/TransportVersions.java @@ -138,6 +138,7 @@ static TransportVersion def(int id) { public static final TransportVersion DATA_STREAM_AUTO_SHARDING_EVENT = def(8_598_00_0); public static final TransportVersion ADD_FAILURE_STORE_INDICES_OPTIONS = def(8_599_00_0); public static final TransportVersion ESQL_ENRICH_OPERATOR_STATUS = def(8_600_00_0); + public static final TransportVersion ESQL_SERIALIZE_ARRAY_VECTOR = def(8_601_00_0); /* * STOP! READ THIS FIRST! No, really, diff --git a/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/data/BooleanArrayVector.java b/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/data/BooleanArrayVector.java index 1599061d04ce8..63f02b14d9481 100644 --- a/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/data/BooleanArrayVector.java +++ b/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/data/BooleanArrayVector.java @@ -8,7 +8,10 @@ package org.elasticsearch.compute.data; import org.apache.lucene.util.RamUsageEstimator; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import java.io.IOException; import java.util.Arrays; /** @@ -28,6 +31,33 @@ final class BooleanArrayVector extends AbstractVector implements BooleanVector { this.values = values; } + static BooleanArrayVector readArrayVector(int positions, StreamInput in, BlockFactory blockFactory) throws IOException { + final long preAdjustedBytes = RamUsageEstimator.NUM_BYTES_ARRAY_HEADER + (long) positions * Byte.BYTES; + blockFactory.adjustBreaker(preAdjustedBytes); + boolean success = false; + try { + boolean[] values = new boolean[positions]; + for (int i = 0; i < positions; i++) { + values[i] = in.readBoolean(); + } + final var block = new BooleanArrayVector(values, positions, blockFactory); + blockFactory.adjustBreaker(block.ramBytesUsed() - preAdjustedBytes); + success = true; + return block; + } finally { + if (success == false) { + blockFactory.adjustBreaker(-preAdjustedBytes); + } + } + } + + void writeArrayVector(int positions, StreamOutput out) throws IOException { + // TODO: One bit for each boolean + for (int i = 0; i < positions; i++) { + out.writeBoolean(values[i]); + } + } + @Override public BooleanBlock asBlock() { return new BooleanVectorBlock(this); diff --git a/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/data/BooleanVector.java b/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/data/BooleanVector.java index 7c86f40981ec7..2f50b45fbfc9d 100644 --- a/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/data/BooleanVector.java +++ b/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/data/BooleanVector.java @@ -7,6 +7,7 @@ package org.elasticsearch.compute.data; +import org.elasticsearch.TransportVersions; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; @@ -74,30 +75,47 @@ static int hash(BooleanVector vector) { /** Deserializes a Vector from the given stream input. */ static BooleanVector readFrom(BlockFactory blockFactory, StreamInput in) throws IOException { final int positions = in.readVInt(); - final boolean constant = in.readBoolean(); - if (constant && positions > 0) { - return blockFactory.newConstantBooleanVector(in.readBoolean(), positions); - } else { - try (var builder = blockFactory.newBooleanVectorFixedBuilder(positions)) { - for (int i = 0; i < positions; i++) { - builder.appendBoolean(in.readBoolean()); - } - return builder.build(); + final byte serializationType = in.readByte(); + return switch (serializationType) { + case SERIALIZE_VECTOR_VALUES -> readValues(positions, in, blockFactory); + case SERIALIZE_VECTOR_CONSTANT -> blockFactory.newConstantBooleanVector(in.readBoolean(), positions); + case SERIALIZE_VECTOR_ARRAY -> BooleanArrayVector.readArrayVector(positions, in, blockFactory); + default -> { + assert false : "invalid vector serialization type [" + serializationType + "]"; + throw new IllegalStateException("invalid vector serialization type [" + serializationType + "]"); } - } + }; } /** Serializes this Vector to the given stream output. */ default void writeTo(StreamOutput out) throws IOException { final int positions = getPositionCount(); + final var version = out.getTransportVersion(); out.writeVInt(positions); - out.writeBoolean(isConstant()); if (isConstant() && positions > 0) { + out.writeByte(SERIALIZE_VECTOR_CONSTANT); out.writeBoolean(getBoolean(0)); + } else if (version.onOrAfter(TransportVersions.ESQL_SERIALIZE_ARRAY_VECTOR) && this instanceof BooleanArrayVector v) { + out.writeByte(SERIALIZE_VECTOR_ARRAY); + v.writeArrayVector(positions, out); } else { + out.writeByte(SERIALIZE_VECTOR_VALUES); + writeValues(this, positions, out); + } + } + + private static BooleanVector readValues(int positions, StreamInput in, BlockFactory blockFactory) throws IOException { + try (var builder = blockFactory.newBooleanVectorFixedBuilder(positions)) { for (int i = 0; i < positions; i++) { - out.writeBoolean(getBoolean(i)); + builder.appendBoolean(in.readBoolean()); } + return builder.build(); + } + } + + private static void writeValues(BooleanVector v, int positions, StreamOutput out) throws IOException { + for (int i = 0; i < positions; i++) { + out.writeBoolean(v.getBoolean(i)); } } diff --git a/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/data/BytesRefArrayVector.java b/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/data/BytesRefArrayVector.java index 5d47802bebabe..d0b600d0f0be2 100644 --- a/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/data/BytesRefArrayVector.java +++ b/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/data/BytesRefArrayVector.java @@ -9,9 +9,13 @@ import org.apache.lucene.util.BytesRef; import org.apache.lucene.util.RamUsageEstimator; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.util.BytesRefArray; import org.elasticsearch.core.Releasables; +import java.io.IOException; + /** * Vector implementation that stores an array of BytesRef values. * Does not take ownership of the given {@link BytesRefArray} and does not adjust circuit breakers to account for it. @@ -30,6 +34,25 @@ final class BytesRefArrayVector extends AbstractVector implements BytesRefVector this.values = values; } + static BytesRefArrayVector readArrayVector(int positions, StreamInput in, BlockFactory blockFactory) throws IOException { + final BytesRefArray values = new BytesRefArray(in, blockFactory.bigArrays()); + boolean success = false; + try { + final var block = new BytesRefArrayVector(values, positions, blockFactory); + blockFactory.adjustBreaker(block.ramBytesUsed() - values.bigArraysRamBytesUsed()); + success = true; + return block; + } finally { + if (success == false) { + values.close(); + } + } + } + + void writeArrayVector(int positions, StreamOutput out) throws IOException { + values.writeTo(out); + } + @Override public BytesRefBlock asBlock() { return new BytesRefVectorBlock(this); diff --git a/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/data/BytesRefVector.java b/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/data/BytesRefVector.java index 5c56ece72c298..c0b107065f43c 100644 --- a/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/data/BytesRefVector.java +++ b/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/data/BytesRefVector.java @@ -8,6 +8,7 @@ package org.elasticsearch.compute.data; import org.apache.lucene.util.BytesRef; +import org.elasticsearch.TransportVersions; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; @@ -74,30 +75,48 @@ static int hash(BytesRefVector vector) { /** Deserializes a Vector from the given stream input. */ static BytesRefVector readFrom(BlockFactory blockFactory, StreamInput in) throws IOException { final int positions = in.readVInt(); - final boolean constant = in.readBoolean(); - if (constant && positions > 0) { - return blockFactory.newConstantBytesRefVector(in.readBytesRef(), positions); - } else { - try (var builder = blockFactory.newBytesRefVectorBuilder(positions)) { - for (int i = 0; i < positions; i++) { - builder.appendBytesRef(in.readBytesRef()); - } - return builder.build(); + final byte serializationType = in.readByte(); + return switch (serializationType) { + case SERIALIZE_VECTOR_VALUES -> readValues(positions, in, blockFactory); + case SERIALIZE_VECTOR_CONSTANT -> blockFactory.newConstantBytesRefVector(in.readBytesRef(), positions); + case SERIALIZE_VECTOR_ARRAY -> BytesRefArrayVector.readArrayVector(positions, in, blockFactory); + default -> { + assert false : "invalid vector serialization type [" + serializationType + "]"; + throw new IllegalStateException("invalid vector serialization type [" + serializationType + "]"); } - } + }; } /** Serializes this Vector to the given stream output. */ default void writeTo(StreamOutput out) throws IOException { final int positions = getPositionCount(); + final var version = out.getTransportVersion(); out.writeVInt(positions); - out.writeBoolean(isConstant()); if (isConstant() && positions > 0) { + out.writeByte(SERIALIZE_VECTOR_CONSTANT); out.writeBytesRef(getBytesRef(0, new BytesRef())); + } else if (version.onOrAfter(TransportVersions.ESQL_SERIALIZE_ARRAY_VECTOR) && this instanceof BytesRefArrayVector v) { + out.writeByte(SERIALIZE_VECTOR_ARRAY); + v.writeArrayVector(positions, out); } else { + out.writeByte(SERIALIZE_VECTOR_VALUES); + writeValues(this, positions, out); + } + } + + private static BytesRefVector readValues(int positions, StreamInput in, BlockFactory blockFactory) throws IOException { + try (var builder = blockFactory.newBytesRefVectorBuilder(positions)) { for (int i = 0; i < positions; i++) { - out.writeBytesRef(getBytesRef(i, new BytesRef())); + builder.appendBytesRef(in.readBytesRef()); } + return builder.build(); + } + } + + private static void writeValues(BytesRefVector v, int positions, StreamOutput out) throws IOException { + var scratch = new BytesRef(); + for (int i = 0; i < positions; i++) { + out.writeBytesRef(v.getBytesRef(i, scratch)); } } diff --git a/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/data/DoubleArrayVector.java b/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/data/DoubleArrayVector.java index 9a9fedb95a1b6..a7868beaf5db8 100644 --- a/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/data/DoubleArrayVector.java +++ b/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/data/DoubleArrayVector.java @@ -8,7 +8,10 @@ package org.elasticsearch.compute.data; import org.apache.lucene.util.RamUsageEstimator; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import java.io.IOException; import java.util.Arrays; /** @@ -28,6 +31,32 @@ final class DoubleArrayVector extends AbstractVector implements DoubleVector { this.values = values; } + static DoubleArrayVector readArrayVector(int positions, StreamInput in, BlockFactory blockFactory) throws IOException { + final long preAdjustedBytes = RamUsageEstimator.NUM_BYTES_ARRAY_HEADER + (long) positions * Double.BYTES; + blockFactory.adjustBreaker(preAdjustedBytes); + boolean success = false; + try { + double[] values = new double[positions]; + for (int i = 0; i < positions; i++) { + values[i] = in.readDouble(); + } + final var block = new DoubleArrayVector(values, positions, blockFactory); + blockFactory.adjustBreaker(block.ramBytesUsed() - preAdjustedBytes); + success = true; + return block; + } finally { + if (success == false) { + blockFactory.adjustBreaker(-preAdjustedBytes); + } + } + } + + void writeArrayVector(int positions, StreamOutput out) throws IOException { + for (int i = 0; i < positions; i++) { + out.writeDouble(values[i]); + } + } + @Override public DoubleBlock asBlock() { return new DoubleVectorBlock(this); diff --git a/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/data/DoubleVector.java b/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/data/DoubleVector.java index f54044874acdd..c5553f6a102f9 100644 --- a/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/data/DoubleVector.java +++ b/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/data/DoubleVector.java @@ -7,6 +7,7 @@ package org.elasticsearch.compute.data; +import org.elasticsearch.TransportVersions; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; @@ -75,30 +76,47 @@ static int hash(DoubleVector vector) { /** Deserializes a Vector from the given stream input. */ static DoubleVector readFrom(BlockFactory blockFactory, StreamInput in) throws IOException { final int positions = in.readVInt(); - final boolean constant = in.readBoolean(); - if (constant && positions > 0) { - return blockFactory.newConstantDoubleVector(in.readDouble(), positions); - } else { - try (var builder = blockFactory.newDoubleVectorFixedBuilder(positions)) { - for (int i = 0; i < positions; i++) { - builder.appendDouble(in.readDouble()); - } - return builder.build(); + final byte serializationType = in.readByte(); + return switch (serializationType) { + case SERIALIZE_VECTOR_VALUES -> readValues(positions, in, blockFactory); + case SERIALIZE_VECTOR_CONSTANT -> blockFactory.newConstantDoubleVector(in.readDouble(), positions); + case SERIALIZE_VECTOR_ARRAY -> DoubleArrayVector.readArrayVector(positions, in, blockFactory); + default -> { + assert false : "invalid vector serialization type [" + serializationType + "]"; + throw new IllegalStateException("invalid vector serialization type [" + serializationType + "]"); } - } + }; } /** Serializes this Vector to the given stream output. */ default void writeTo(StreamOutput out) throws IOException { final int positions = getPositionCount(); + final var version = out.getTransportVersion(); out.writeVInt(positions); - out.writeBoolean(isConstant()); if (isConstant() && positions > 0) { + out.writeByte(SERIALIZE_VECTOR_CONSTANT); out.writeDouble(getDouble(0)); + } else if (version.onOrAfter(TransportVersions.ESQL_SERIALIZE_ARRAY_VECTOR) && this instanceof DoubleArrayVector v) { + out.writeByte(SERIALIZE_VECTOR_ARRAY); + v.writeArrayVector(positions, out); } else { + out.writeByte(SERIALIZE_VECTOR_VALUES); + writeValues(this, positions, out); + } + } + + private static DoubleVector readValues(int positions, StreamInput in, BlockFactory blockFactory) throws IOException { + try (var builder = blockFactory.newDoubleVectorFixedBuilder(positions)) { for (int i = 0; i < positions; i++) { - out.writeDouble(getDouble(i)); + builder.appendDouble(in.readDouble()); } + return builder.build(); + } + } + + private static void writeValues(DoubleVector v, int positions, StreamOutput out) throws IOException { + for (int i = 0; i < positions; i++) { + out.writeDouble(v.getDouble(i)); } } diff --git a/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/data/IntArrayVector.java b/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/data/IntArrayVector.java index 9374a4db4b4c4..644af9ae512a8 100644 --- a/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/data/IntArrayVector.java +++ b/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/data/IntArrayVector.java @@ -8,7 +8,10 @@ package org.elasticsearch.compute.data; import org.apache.lucene.util.RamUsageEstimator; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import java.io.IOException; import java.util.Arrays; /** @@ -28,6 +31,32 @@ final class IntArrayVector extends AbstractVector implements IntVector { this.values = values; } + static IntArrayVector readArrayVector(int positions, StreamInput in, BlockFactory blockFactory) throws IOException { + final long preAdjustedBytes = RamUsageEstimator.NUM_BYTES_ARRAY_HEADER + (long) positions * Integer.BYTES; + blockFactory.adjustBreaker(preAdjustedBytes); + boolean success = false; + try { + int[] values = new int[positions]; + for (int i = 0; i < positions; i++) { + values[i] = in.readInt(); + } + final var block = new IntArrayVector(values, positions, blockFactory); + blockFactory.adjustBreaker(block.ramBytesUsed() - preAdjustedBytes); + success = true; + return block; + } finally { + if (success == false) { + blockFactory.adjustBreaker(-preAdjustedBytes); + } + } + } + + void writeArrayVector(int positions, StreamOutput out) throws IOException { + for (int i = 0; i < positions; i++) { + out.writeInt(values[i]); + } + } + @Override public IntBlock asBlock() { return new IntVectorBlock(this); diff --git a/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/data/IntVector.java b/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/data/IntVector.java index bc7e3c87ec33d..1d4fb0741cab0 100644 --- a/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/data/IntVector.java +++ b/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/data/IntVector.java @@ -7,6 +7,7 @@ package org.elasticsearch.compute.data; +import org.elasticsearch.TransportVersions; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; @@ -74,30 +75,47 @@ static int hash(IntVector vector) { /** Deserializes a Vector from the given stream input. */ static IntVector readFrom(BlockFactory blockFactory, StreamInput in) throws IOException { final int positions = in.readVInt(); - final boolean constant = in.readBoolean(); - if (constant && positions > 0) { - return blockFactory.newConstantIntVector(in.readInt(), positions); - } else { - try (var builder = blockFactory.newIntVectorFixedBuilder(positions)) { - for (int i = 0; i < positions; i++) { - builder.appendInt(in.readInt()); - } - return builder.build(); + final byte serializationType = in.readByte(); + return switch (serializationType) { + case SERIALIZE_VECTOR_VALUES -> readValues(positions, in, blockFactory); + case SERIALIZE_VECTOR_CONSTANT -> blockFactory.newConstantIntVector(in.readInt(), positions); + case SERIALIZE_VECTOR_ARRAY -> IntArrayVector.readArrayVector(positions, in, blockFactory); + default -> { + assert false : "invalid vector serialization type [" + serializationType + "]"; + throw new IllegalStateException("invalid vector serialization type [" + serializationType + "]"); } - } + }; } /** Serializes this Vector to the given stream output. */ default void writeTo(StreamOutput out) throws IOException { final int positions = getPositionCount(); + final var version = out.getTransportVersion(); out.writeVInt(positions); - out.writeBoolean(isConstant()); if (isConstant() && positions > 0) { + out.writeByte(SERIALIZE_VECTOR_CONSTANT); out.writeInt(getInt(0)); + } else if (version.onOrAfter(TransportVersions.ESQL_SERIALIZE_ARRAY_VECTOR) && this instanceof IntArrayVector v) { + out.writeByte(SERIALIZE_VECTOR_ARRAY); + v.writeArrayVector(positions, out); } else { + out.writeByte(SERIALIZE_VECTOR_VALUES); + writeValues(this, positions, out); + } + } + + private static IntVector readValues(int positions, StreamInput in, BlockFactory blockFactory) throws IOException { + try (var builder = blockFactory.newIntVectorFixedBuilder(positions)) { for (int i = 0; i < positions; i++) { - out.writeInt(getInt(i)); + builder.appendInt(in.readInt()); } + return builder.build(); + } + } + + private static void writeValues(IntVector v, int positions, StreamOutput out) throws IOException { + for (int i = 0; i < positions; i++) { + out.writeInt(v.getInt(i)); } } diff --git a/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/data/LongArrayVector.java b/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/data/LongArrayVector.java index a50987f1d6959..b3cee58356d70 100644 --- a/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/data/LongArrayVector.java +++ b/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/data/LongArrayVector.java @@ -8,7 +8,10 @@ package org.elasticsearch.compute.data; import org.apache.lucene.util.RamUsageEstimator; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import java.io.IOException; import java.util.Arrays; /** @@ -28,6 +31,32 @@ final class LongArrayVector extends AbstractVector implements LongVector { this.values = values; } + static LongArrayVector readArrayVector(int positions, StreamInput in, BlockFactory blockFactory) throws IOException { + final long preAdjustedBytes = RamUsageEstimator.NUM_BYTES_ARRAY_HEADER + (long) positions * Long.BYTES; + blockFactory.adjustBreaker(preAdjustedBytes); + boolean success = false; + try { + long[] values = new long[positions]; + for (int i = 0; i < positions; i++) { + values[i] = in.readLong(); + } + final var block = new LongArrayVector(values, positions, blockFactory); + blockFactory.adjustBreaker(block.ramBytesUsed() - preAdjustedBytes); + success = true; + return block; + } finally { + if (success == false) { + blockFactory.adjustBreaker(-preAdjustedBytes); + } + } + } + + void writeArrayVector(int positions, StreamOutput out) throws IOException { + for (int i = 0; i < positions; i++) { + out.writeLong(values[i]); + } + } + @Override public LongBlock asBlock() { return new LongVectorBlock(this); diff --git a/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/data/LongVector.java b/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/data/LongVector.java index 358f5b32366cb..60592469f0ea1 100644 --- a/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/data/LongVector.java +++ b/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/data/LongVector.java @@ -7,6 +7,7 @@ package org.elasticsearch.compute.data; +import org.elasticsearch.TransportVersions; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; @@ -75,30 +76,47 @@ static int hash(LongVector vector) { /** Deserializes a Vector from the given stream input. */ static LongVector readFrom(BlockFactory blockFactory, StreamInput in) throws IOException { final int positions = in.readVInt(); - final boolean constant = in.readBoolean(); - if (constant && positions > 0) { - return blockFactory.newConstantLongVector(in.readLong(), positions); - } else { - try (var builder = blockFactory.newLongVectorFixedBuilder(positions)) { - for (int i = 0; i < positions; i++) { - builder.appendLong(in.readLong()); - } - return builder.build(); + final byte serializationType = in.readByte(); + return switch (serializationType) { + case SERIALIZE_VECTOR_VALUES -> readValues(positions, in, blockFactory); + case SERIALIZE_VECTOR_CONSTANT -> blockFactory.newConstantLongVector(in.readLong(), positions); + case SERIALIZE_VECTOR_ARRAY -> LongArrayVector.readArrayVector(positions, in, blockFactory); + default -> { + assert false : "invalid vector serialization type [" + serializationType + "]"; + throw new IllegalStateException("invalid vector serialization type [" + serializationType + "]"); } - } + }; } /** Serializes this Vector to the given stream output. */ default void writeTo(StreamOutput out) throws IOException { final int positions = getPositionCount(); + final var version = out.getTransportVersion(); out.writeVInt(positions); - out.writeBoolean(isConstant()); if (isConstant() && positions > 0) { + out.writeByte(SERIALIZE_VECTOR_CONSTANT); out.writeLong(getLong(0)); + } else if (version.onOrAfter(TransportVersions.ESQL_SERIALIZE_ARRAY_VECTOR) && this instanceof LongArrayVector v) { + out.writeByte(SERIALIZE_VECTOR_ARRAY); + v.writeArrayVector(positions, out); } else { + out.writeByte(SERIALIZE_VECTOR_VALUES); + writeValues(this, positions, out); + } + } + + private static LongVector readValues(int positions, StreamInput in, BlockFactory blockFactory) throws IOException { + try (var builder = blockFactory.newLongVectorFixedBuilder(positions)) { for (int i = 0; i < positions; i++) { - out.writeLong(getLong(i)); + builder.appendLong(in.readLong()); } + return builder.build(); + } + } + + private static void writeValues(LongVector v, int positions, StreamOutput out) throws IOException { + for (int i = 0; i < positions; i++) { + out.writeLong(v.getLong(i)); } } diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/Vector.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/Vector.java index fc09f636ac700..c309a7a0b8827 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/Vector.java +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/Vector.java @@ -75,4 +75,11 @@ interface Builder extends Releasable { * Whether this vector was released */ boolean isReleased(); + + /** + * The serialization type of vectors: 0 and 1 replaces the boolean false/true in pre-8.14. + */ + byte SERIALIZE_VECTOR_VALUES = 0; + byte SERIALIZE_VECTOR_CONSTANT = 1; + byte SERIALIZE_VECTOR_ARRAY = 2; } diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/X-ArrayVector.java.st b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/X-ArrayVector.java.st index a02656f72e54c..b5ecb2cad4a56 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/X-ArrayVector.java.st +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/X-ArrayVector.java.st @@ -10,12 +10,19 @@ package org.elasticsearch.compute.data; $if(BytesRef)$ import org.apache.lucene.util.BytesRef; import org.apache.lucene.util.RamUsageEstimator; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.util.BytesRefArray; import org.elasticsearch.core.Releasables; +import java.io.IOException; + $else$ import org.apache.lucene.util.RamUsageEstimator; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import java.io.IOException; import java.util.Arrays; $endif$ @@ -44,6 +51,56 @@ $endif$ this.values = values; } + static $Type$ArrayVector readArrayVector(int positions, StreamInput in, BlockFactory blockFactory) throws IOException { +$if(BytesRef)$ + final BytesRefArray values = new BytesRefArray(in, blockFactory.bigArrays()); + boolean success = false; + try { + final var block = new BytesRefArrayVector(values, positions, blockFactory); + blockFactory.adjustBreaker(block.ramBytesUsed() - values.bigArraysRamBytesUsed()); + success = true; + return block; + } finally { + if (success == false) { + values.close(); + } + } +$else$ + final long preAdjustedBytes = RamUsageEstimator.NUM_BYTES_ARRAY_HEADER + (long) positions * $BYTES$; + blockFactory.adjustBreaker(preAdjustedBytes); + boolean success = false; + try { + $type$[] values = new $type$[positions]; + for (int i = 0; i < positions; i++) { + values[i] = in.read$Type$(); + } + final var block = new $Type$ArrayVector(values, positions, blockFactory); + blockFactory.adjustBreaker(block.ramBytesUsed() - preAdjustedBytes); + success = true; + return block; + } finally { + if (success == false) { + blockFactory.adjustBreaker(-preAdjustedBytes); + } + } +$endif$ + } + + void writeArrayVector(int positions, StreamOutput out) throws IOException { +$if(BytesRef)$ + values.writeTo(out); +$elseif(boolean)$ + // TODO: One bit for each boolean + for (int i = 0; i < positions; i++) { + out.writeBoolean(values[i]); + } +$else$ + for (int i = 0; i < positions; i++) { + out.write$Type$(values[i]); + } +$endif$ + } + @Override public $Type$Block asBlock() { return new $Type$VectorBlock(this); diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/X-Vector.java.st b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/X-Vector.java.st index c303a8391ad18..0796801c55d40 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/X-Vector.java.st +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/X-Vector.java.st @@ -10,6 +10,7 @@ package org.elasticsearch.compute.data; $if(BytesRef)$ import org.apache.lucene.util.BytesRef; $endif$ +import org.elasticsearch.TransportVersions; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; @@ -108,38 +109,58 @@ $endif$ /** Deserializes a Vector from the given stream input. */ static $Type$Vector readFrom(BlockFactory blockFactory, StreamInput in) throws IOException { final int positions = in.readVInt(); - final boolean constant = in.readBoolean(); - if (constant && positions > 0) { - return blockFactory.newConstant$Type$Vector(in.read$Type$(), positions); - } else { - try (var builder = blockFactory.new$Type$Vector$if(BytesRef)$$else$Fixed$endif$Builder(positions)) { - for (int i = 0; i < positions; i++) { - builder.append$Type$(in.read$Type$()); - } - return builder.build(); + final byte serializationType = in.readByte(); + return switch (serializationType) { + case SERIALIZE_VECTOR_VALUES -> readValues(positions, in, blockFactory); + case SERIALIZE_VECTOR_CONSTANT -> blockFactory.newConstant$Type$Vector(in.read$Type$(), positions); + case SERIALIZE_VECTOR_ARRAY -> $Type$ArrayVector.readArrayVector(positions, in, blockFactory); + default -> { + assert false : "invalid vector serialization type [" + serializationType + "]"; + throw new IllegalStateException("invalid vector serialization type [" + serializationType + "]"); } - } + }; } /** Serializes this Vector to the given stream output. */ default void writeTo(StreamOutput out) throws IOException { final int positions = getPositionCount(); + final var version = out.getTransportVersion(); out.writeVInt(positions); - out.writeBoolean(isConstant()); if (isConstant() && positions > 0) { + out.writeByte(SERIALIZE_VECTOR_CONSTANT); $if(BytesRef)$ out.write$Type$(get$Type$(0, new BytesRef())); $else$ out.write$Type$(get$Type$(0)); $endif$ + } else if (version.onOrAfter(TransportVersions.ESQL_SERIALIZE_ARRAY_VECTOR) && this instanceof $Type$ArrayVector v) { + out.writeByte(SERIALIZE_VECTOR_ARRAY); + v.writeArrayVector(positions, out); } else { + out.writeByte(SERIALIZE_VECTOR_VALUES); + writeValues(this, positions, out); + } + } + + private static $Type$Vector readValues(int positions, StreamInput in, BlockFactory blockFactory) throws IOException { + try (var builder = blockFactory.new$Type$Vector$if(BytesRef)$$else$Fixed$endif$Builder(positions)) { for (int i = 0; i < positions; i++) { + builder.append$Type$(in.read$Type$()); + } + return builder.build(); + } + } + + private static void writeValues($Type$Vector v, int positions, StreamOutput out) throws IOException { $if(BytesRef)$ - out.write$Type$(get$Type$(i, new BytesRef())); + var scratch = new BytesRef(); +$endif$ + for (int i = 0; i < positions; i++) { +$if(BytesRef)$ + out.write$Type$(v.get$Type$(i, scratch)); $else$ - out.write$Type$(get$Type$(i)); + out.write$Type$(v.get$Type$(i)); $endif$ - } } }