Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Specialize serialization for ArrayVectors #105893

Merged
merged 4 commits into from
Mar 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions docs/changelog/105893.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 105893
summary: Specialize serialization for `ArrayVectors`
area: ES|QL
type: enhancement
issues: []
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,7 @@ static TransportVersion def(int id) {
public static final TransportVersion DATA_STREAM_AUTO_SHARDING_EVENT = def(8_598_00_0);
public static final TransportVersion ADD_FAILURE_STORE_INDICES_OPTIONS = def(8_599_00_0);
public static final TransportVersion ESQL_ENRICH_OPERATOR_STATUS = def(8_600_00_0);
public static final TransportVersion ESQL_SERIALIZE_ARRAY_VECTOR = def(8_601_00_0);

/*
* STOP! READ THIS FIRST! No, really,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,10 @@
package org.elasticsearch.compute.data;

import org.apache.lucene.util.RamUsageEstimator;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;

import java.io.IOException;
import java.util.Arrays;

/**
Expand All @@ -28,6 +31,33 @@ final class BooleanArrayVector extends AbstractVector implements BooleanVector {
this.values = values;
}

static BooleanArrayVector readArrayVector(int positions, StreamInput in, BlockFactory blockFactory) throws IOException {
final long preAdjustedBytes = RamUsageEstimator.NUM_BYTES_ARRAY_HEADER + (long) positions * Byte.BYTES;
blockFactory.adjustBreaker(preAdjustedBytes);
boolean success = false;
try {
boolean[] values = new boolean[positions];
for (int i = 0; i < positions; i++) {
values[i] = in.readBoolean();
}
final var block = new BooleanArrayVector(values, positions, blockFactory);
blockFactory.adjustBreaker(block.ramBytesUsed() - preAdjustedBytes);
success = true;
return block;
} finally {
if (success == false) {
blockFactory.adjustBreaker(-preAdjustedBytes);
}
}
}

void writeArrayVector(int positions, StreamOutput out) throws IOException {
// TODO: One bit for each boolean
for (int i = 0; i < positions; i++) {
out.writeBoolean(values[i]);
}
}

@Override
public BooleanBlock asBlock() {
return new BooleanVectorBlock(this);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

package org.elasticsearch.compute.data;

import org.elasticsearch.TransportVersions;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;

Expand Down Expand Up @@ -74,30 +75,47 @@ static int hash(BooleanVector vector) {
/** Deserializes a Vector from the given stream input. */
static BooleanVector readFrom(BlockFactory blockFactory, StreamInput in) throws IOException {
final int positions = in.readVInt();
final boolean constant = in.readBoolean();
if (constant && positions > 0) {
return blockFactory.newConstantBooleanVector(in.readBoolean(), positions);
} else {
try (var builder = blockFactory.newBooleanVectorFixedBuilder(positions)) {
for (int i = 0; i < positions; i++) {
builder.appendBoolean(in.readBoolean());
}
return builder.build();
final byte serializationType = in.readByte();
return switch (serializationType) {
case SERIALIZE_VECTOR_VALUES -> readValues(positions, in, blockFactory);
case SERIALIZE_VECTOR_CONSTANT -> blockFactory.newConstantBooleanVector(in.readBoolean(), positions);
case SERIALIZE_VECTOR_ARRAY -> BooleanArrayVector.readArrayVector(positions, in, blockFactory);
default -> {
assert false : "invalid vector serialization type [" + serializationType + "]";
throw new IllegalStateException("invalid vector serialization type [" + serializationType + "]");
}
}
};
}

/** Serializes this Vector to the given stream output. */
default void writeTo(StreamOutput out) throws IOException {
final int positions = getPositionCount();
final var version = out.getTransportVersion();
out.writeVInt(positions);
out.writeBoolean(isConstant());
if (isConstant() && positions > 0) {
out.writeByte(SERIALIZE_VECTOR_CONSTANT);
out.writeBoolean(getBoolean(0));
} else if (version.onOrAfter(TransportVersions.ESQL_SERIALIZE_ARRAY_VECTOR) && this instanceof BooleanArrayVector v) {
out.writeByte(SERIALIZE_VECTOR_ARRAY);
v.writeArrayVector(positions, out);
} else {
out.writeByte(SERIALIZE_VECTOR_VALUES);
writeValues(this, positions, out);
}
}

private static BooleanVector readValues(int positions, StreamInput in, BlockFactory blockFactory) throws IOException {
try (var builder = blockFactory.newBooleanVectorFixedBuilder(positions)) {
for (int i = 0; i < positions; i++) {
out.writeBoolean(getBoolean(i));
builder.appendBoolean(in.readBoolean());
}
return builder.build();
}
}

private static void writeValues(BooleanVector v, int positions, StreamOutput out) throws IOException {
for (int i = 0; i < positions; i++) {
out.writeBoolean(v.getBoolean(i));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,13 @@

import org.apache.lucene.util.BytesRef;
import org.apache.lucene.util.RamUsageEstimator;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.util.BytesRefArray;
import org.elasticsearch.core.Releasables;

import java.io.IOException;

/**
* Vector implementation that stores an array of BytesRef values.
* Does not take ownership of the given {@link BytesRefArray} and does not adjust circuit breakers to account for it.
Expand All @@ -30,6 +34,25 @@ final class BytesRefArrayVector extends AbstractVector implements BytesRefVector
this.values = values;
}

static BytesRefArrayVector readArrayVector(int positions, StreamInput in, BlockFactory blockFactory) throws IOException {
final BytesRefArray values = new BytesRefArray(in, blockFactory.bigArrays());
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the main improvement in this PR. The rest is for a follow-up where we will implement specialized serialization for ArrayBlock.

boolean success = false;
try {
final var block = new BytesRefArrayVector(values, positions, blockFactory);
blockFactory.adjustBreaker(block.ramBytesUsed() - values.bigArraysRamBytesUsed());
success = true;
return block;
} finally {
if (success == false) {
values.close();
}
}
}

void writeArrayVector(int positions, StreamOutput out) throws IOException {
values.writeTo(out);
}

@Override
public BytesRefBlock asBlock() {
return new BytesRefVectorBlock(this);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
package org.elasticsearch.compute.data;

import org.apache.lucene.util.BytesRef;
import org.elasticsearch.TransportVersions;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;

Expand Down Expand Up @@ -74,30 +75,48 @@ static int hash(BytesRefVector vector) {
/** Deserializes a Vector from the given stream input. */
static BytesRefVector readFrom(BlockFactory blockFactory, StreamInput in) throws IOException {
final int positions = in.readVInt();
final boolean constant = in.readBoolean();
if (constant && positions > 0) {
return blockFactory.newConstantBytesRefVector(in.readBytesRef(), positions);
} else {
try (var builder = blockFactory.newBytesRefVectorBuilder(positions)) {
for (int i = 0; i < positions; i++) {
builder.appendBytesRef(in.readBytesRef());
}
return builder.build();
final byte serializationType = in.readByte();
return switch (serializationType) {
case SERIALIZE_VECTOR_VALUES -> readValues(positions, in, blockFactory);
case SERIALIZE_VECTOR_CONSTANT -> blockFactory.newConstantBytesRefVector(in.readBytesRef(), positions);
case SERIALIZE_VECTOR_ARRAY -> BytesRefArrayVector.readArrayVector(positions, in, blockFactory);
default -> {
assert false : "invalid vector serialization type [" + serializationType + "]";
throw new IllegalStateException("invalid vector serialization type [" + serializationType + "]");
}
}
};
}

/** Serializes this Vector to the given stream output. */
default void writeTo(StreamOutput out) throws IOException {
final int positions = getPositionCount();
final var version = out.getTransportVersion();
out.writeVInt(positions);
out.writeBoolean(isConstant());
if (isConstant() && positions > 0) {
out.writeByte(SERIALIZE_VECTOR_CONSTANT);
out.writeBytesRef(getBytesRef(0, new BytesRef()));
} else if (version.onOrAfter(TransportVersions.ESQL_SERIALIZE_ARRAY_VECTOR) && this instanceof BytesRefArrayVector v) {
out.writeByte(SERIALIZE_VECTOR_ARRAY);
v.writeArrayVector(positions, out);
} else {
out.writeByte(SERIALIZE_VECTOR_VALUES);
writeValues(this, positions, out);
}
}

private static BytesRefVector readValues(int positions, StreamInput in, BlockFactory blockFactory) throws IOException {
try (var builder = blockFactory.newBytesRefVectorBuilder(positions)) {
for (int i = 0; i < positions; i++) {
out.writeBytesRef(getBytesRef(i, new BytesRef()));
builder.appendBytesRef(in.readBytesRef());
}
return builder.build();
}
}

private static void writeValues(BytesRefVector v, int positions, StreamOutput out) throws IOException {
var scratch = new BytesRef();
for (int i = 0; i < positions; i++) {
out.writeBytesRef(v.getBytesRef(i, scratch));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,10 @@
package org.elasticsearch.compute.data;

import org.apache.lucene.util.RamUsageEstimator;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;

import java.io.IOException;
import java.util.Arrays;

/**
Expand All @@ -28,6 +31,32 @@ final class DoubleArrayVector extends AbstractVector implements DoubleVector {
this.values = values;
}

static DoubleArrayVector readArrayVector(int positions, StreamInput in, BlockFactory blockFactory) throws IOException {
final long preAdjustedBytes = RamUsageEstimator.NUM_BYTES_ARRAY_HEADER + (long) positions * Double.BYTES;
blockFactory.adjustBreaker(preAdjustedBytes);
boolean success = false;
try {
double[] values = new double[positions];
for (int i = 0; i < positions; i++) {
values[i] = in.readDouble();
}
final var block = new DoubleArrayVector(values, positions, blockFactory);
blockFactory.adjustBreaker(block.ramBytesUsed() - preAdjustedBytes);
success = true;
return block;
} finally {
if (success == false) {
blockFactory.adjustBreaker(-preAdjustedBytes);
}
}
}

void writeArrayVector(int positions, StreamOutput out) throws IOException {
for (int i = 0; i < positions; i++) {
out.writeDouble(values[i]);
}
}

@Override
public DoubleBlock asBlock() {
return new DoubleVectorBlock(this);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

package org.elasticsearch.compute.data;

import org.elasticsearch.TransportVersions;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;

Expand Down Expand Up @@ -75,30 +76,47 @@ static int hash(DoubleVector vector) {
/** Deserializes a Vector from the given stream input. */
static DoubleVector readFrom(BlockFactory blockFactory, StreamInput in) throws IOException {
final int positions = in.readVInt();
final boolean constant = in.readBoolean();
if (constant && positions > 0) {
return blockFactory.newConstantDoubleVector(in.readDouble(), positions);
} else {
try (var builder = blockFactory.newDoubleVectorFixedBuilder(positions)) {
for (int i = 0; i < positions; i++) {
builder.appendDouble(in.readDouble());
}
return builder.build();
final byte serializationType = in.readByte();
return switch (serializationType) {
case SERIALIZE_VECTOR_VALUES -> readValues(positions, in, blockFactory);
case SERIALIZE_VECTOR_CONSTANT -> blockFactory.newConstantDoubleVector(in.readDouble(), positions);
case SERIALIZE_VECTOR_ARRAY -> DoubleArrayVector.readArrayVector(positions, in, blockFactory);
default -> {
assert false : "invalid vector serialization type [" + serializationType + "]";
throw new IllegalStateException("invalid vector serialization type [" + serializationType + "]");
}
}
};
}

/** Serializes this Vector to the given stream output. */
default void writeTo(StreamOutput out) throws IOException {
final int positions = getPositionCount();
final var version = out.getTransportVersion();
out.writeVInt(positions);
out.writeBoolean(isConstant());
if (isConstant() && positions > 0) {
out.writeByte(SERIALIZE_VECTOR_CONSTANT);
out.writeDouble(getDouble(0));
} else if (version.onOrAfter(TransportVersions.ESQL_SERIALIZE_ARRAY_VECTOR) && this instanceof DoubleArrayVector v) {
out.writeByte(SERIALIZE_VECTOR_ARRAY);
v.writeArrayVector(positions, out);
} else {
out.writeByte(SERIALIZE_VECTOR_VALUES);
writeValues(this, positions, out);
}
}

private static DoubleVector readValues(int positions, StreamInput in, BlockFactory blockFactory) throws IOException {
try (var builder = blockFactory.newDoubleVectorFixedBuilder(positions)) {
for (int i = 0; i < positions; i++) {
out.writeDouble(getDouble(i));
builder.appendDouble(in.readDouble());
}
return builder.build();
}
}

private static void writeValues(DoubleVector v, int positions, StreamOutput out) throws IOException {
for (int i = 0; i < positions; i++) {
out.writeDouble(v.getDouble(i));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,10 @@
package org.elasticsearch.compute.data;

import org.apache.lucene.util.RamUsageEstimator;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;

import java.io.IOException;
import java.util.Arrays;

/**
Expand All @@ -28,6 +31,32 @@ final class IntArrayVector extends AbstractVector implements IntVector {
this.values = values;
}

static IntArrayVector readArrayVector(int positions, StreamInput in, BlockFactory blockFactory) throws IOException {
final long preAdjustedBytes = RamUsageEstimator.NUM_BYTES_ARRAY_HEADER + (long) positions * Integer.BYTES;
blockFactory.adjustBreaker(preAdjustedBytes);
boolean success = false;
try {
int[] values = new int[positions];
for (int i = 0; i < positions; i++) {
values[i] = in.readInt();
}
final var block = new IntArrayVector(values, positions, blockFactory);
blockFactory.adjustBreaker(block.ramBytesUsed() - preAdjustedBytes);
success = true;
return block;
} finally {
if (success == false) {
blockFactory.adjustBreaker(-preAdjustedBytes);
}
}
}

void writeArrayVector(int positions, StreamOutput out) throws IOException {
for (int i = 0; i < positions; i++) {
out.writeInt(values[i]);
}
}

@Override
public IntBlock asBlock() {
return new IntVectorBlock(this);
Expand Down
Loading