Skip to content

Commit

Permalink
Simplify Block/Vector Serialization (#98938)
Browse files Browse the repository at this point in the history
This commit reworks and simplifies the serialization of Block and Vectors. Rather than having a separate NamedWritable for block views of vectors, we instead just encode a boolean bit into the stream. This allows for different backing implementations to be used without triggering the assertion that the NamedWritable registry entry of the deserialised instance must be the same as that of the name in the stream.
  • Loading branch information
ChrisHegarty authored Aug 29, 2023
1 parent 9f108d8 commit 1be3434
Show file tree
Hide file tree
Showing 20 changed files with 386 additions and 364 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,10 @@ default String getWriteableName() {
}

static BooleanBlock of(StreamInput in) throws IOException {
final boolean isVector = in.readBoolean();
if (isVector) {
return BooleanVector.of(in).asBlock();
}
final int positions = in.readVInt();
var builder = newBlockBuilder(positions);
for (int i = 0; i < positions; i++) {
Expand All @@ -63,17 +67,23 @@ static BooleanBlock of(StreamInput in) throws IOException {

@Override
default void writeTo(StreamOutput out) throws IOException {
final int positions = getPositionCount();
out.writeVInt(positions);
for (int pos = 0; pos < positions; pos++) {
if (isNull(pos)) {
out.writeBoolean(true);
} else {
out.writeBoolean(false);
final int valueCount = getValueCount(pos);
out.writeVInt(valueCount);
for (int valueIndex = 0; valueIndex < valueCount; valueIndex++) {
out.writeBoolean(getBoolean(getFirstValueIndex(pos) + valueIndex));
BooleanVector vector = asVector();
out.writeBoolean(vector != null);
if (vector != null) {
vector.writeTo(out);
} else {
final int positions = getPositionCount();
out.writeVInt(positions);
for (int pos = 0; pos < positions; pos++) {
if (isNull(pos)) {
out.writeBoolean(true);
} else {
out.writeBoolean(false);
final int valueCount = getValueCount(pos);
out.writeVInt(valueCount);
for (int valueIndex = 0; valueIndex < valueCount; valueIndex++) {
out.writeBoolean(getBoolean(getFirstValueIndex(pos) + valueIndex));
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,11 @@

package org.elasticsearch.compute.data;

import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;

import java.io.IOException;

/**
* Vector that stores boolean values.
* This class is generated. Do not edit it.
Expand Down Expand Up @@ -66,6 +71,35 @@ static int hash(BooleanVector vector) {
return result;
}

/** Deserializes a Vector from the given stream input. */
static BooleanVector of(StreamInput in) throws IOException {
final int positions = in.readVInt();
final boolean constant = in.readBoolean();
if (constant && positions > 0) {
return new ConstantBooleanVector(in.readBoolean(), positions);
} else {
var builder = BooleanVector.newVectorBuilder(positions);
for (int i = 0; i < positions; i++) {
builder.appendBoolean(in.readBoolean());
}
return builder.build();
}
}

/** Serializes this Vector to the given stream output. */
default void writeTo(StreamOutput out) throws IOException {
final int positions = getPositionCount();
out.writeVInt(positions);
out.writeBoolean(isConstant());
if (isConstant() && positions > 0) {
out.writeBoolean(getBoolean(0));
} else {
for (int i = 0; i < positions; i++) {
out.writeBoolean(getBoolean(i));
}
}
}

static Builder newVectorBuilder(int estimatedSize) {
return new BooleanVectorBuilder(estimatedSize);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,6 @@

package org.elasticsearch.compute.data;

import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;

import java.io.IOException;

/**
* Block view of a BooleanVector.
* This class is generated. Do not edit it.
Expand Down Expand Up @@ -51,46 +45,6 @@ public BooleanBlock filter(int... positions) {
return new FilterBooleanVector(vector, positions).asBlock();
}

public static final NamedWriteableRegistry.Entry ENTRY = new NamedWriteableRegistry.Entry(
Block.class,
"BooleanVectorBlock",
BooleanVectorBlock::of
);

@Override
public String getWriteableName() {
return "BooleanVectorBlock";
}

static BooleanVectorBlock of(StreamInput in) throws IOException {
final int positions = in.readVInt();
final boolean constant = in.readBoolean();
if (constant && positions > 0) {
return new BooleanVectorBlock(new ConstantBooleanVector(in.readBoolean(), positions));
} else {
var builder = BooleanVector.newVectorBuilder(positions);
for (int i = 0; i < positions; i++) {
builder.appendBoolean(in.readBoolean());
}
return new BooleanVectorBlock(builder.build());
}
}

@Override
public void writeTo(StreamOutput out) throws IOException {
final BooleanVector vector = this.vector;
final int positions = vector.getPositionCount();
out.writeVInt(positions);
out.writeBoolean(vector.isConstant());
if (vector.isConstant() && positions > 0) {
out.writeBoolean(getBoolean(0));
} else {
for (int i = 0; i < positions; i++) {
out.writeBoolean(getBoolean(i));
}
}
}

@Override
public boolean equals(Object obj) {
if (obj instanceof BooleanBlock that) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,10 @@ default String getWriteableName() {
}

static BytesRefBlock of(StreamInput in) throws IOException {
final boolean isVector = in.readBoolean();
if (isVector) {
return BytesRefVector.of(in).asBlock();
}
final int positions = in.readVInt();
var builder = newBlockBuilder(positions);
for (int i = 0; i < positions; i++) {
Expand All @@ -65,17 +69,23 @@ static BytesRefBlock of(StreamInput in) throws IOException {

@Override
default void writeTo(StreamOutput out) throws IOException {
final int positions = getPositionCount();
out.writeVInt(positions);
for (int pos = 0; pos < positions; pos++) {
if (isNull(pos)) {
out.writeBoolean(true);
} else {
out.writeBoolean(false);
final int valueCount = getValueCount(pos);
out.writeVInt(valueCount);
for (int valueIndex = 0; valueIndex < valueCount; valueIndex++) {
out.writeBytesRef(getBytesRef(getFirstValueIndex(pos) + valueIndex, new BytesRef()));
BytesRefVector vector = asVector();
out.writeBoolean(vector != null);
if (vector != null) {
vector.writeTo(out);
} else {
final int positions = getPositionCount();
out.writeVInt(positions);
for (int pos = 0; pos < positions; pos++) {
if (isNull(pos)) {
out.writeBoolean(true);
} else {
out.writeBoolean(false);
final int valueCount = getValueCount(pos);
out.writeVInt(valueCount);
for (int valueIndex = 0; valueIndex < valueCount; valueIndex++) {
out.writeBytesRef(getBytesRef(getFirstValueIndex(pos) + valueIndex, new BytesRef()));
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,10 @@
package org.elasticsearch.compute.data;

import org.apache.lucene.util.BytesRef;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;

import java.io.IOException;

/**
* Vector that stores BytesRef values.
Expand Down Expand Up @@ -67,6 +71,35 @@ static int hash(BytesRefVector vector) {
return result;
}

/** Deserializes a Vector from the given stream input. */
static BytesRefVector of(StreamInput in) throws IOException {
final int positions = in.readVInt();
final boolean constant = in.readBoolean();
if (constant && positions > 0) {
return new ConstantBytesRefVector(in.readBytesRef(), positions);
} else {
var builder = BytesRefVector.newVectorBuilder(positions);
for (int i = 0; i < positions; i++) {
builder.appendBytesRef(in.readBytesRef());
}
return builder.build();
}
}

/** Serializes this Vector to the given stream output. */
default void writeTo(StreamOutput out) throws IOException {
final int positions = getPositionCount();
out.writeVInt(positions);
out.writeBoolean(isConstant());
if (isConstant() && positions > 0) {
out.writeBytesRef(getBytesRef(0, new BytesRef()));
} else {
for (int i = 0; i < positions; i++) {
out.writeBytesRef(getBytesRef(i, new BytesRef()));
}
}
}

static Builder newVectorBuilder(int estimatedSize) {
return new BytesRefVectorBuilder(estimatedSize);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,6 @@
package org.elasticsearch.compute.data;

import org.apache.lucene.util.BytesRef;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;

import java.io.IOException;

/**
* Block view of a BytesRefVector.
Expand Down Expand Up @@ -52,46 +47,6 @@ public BytesRefBlock filter(int... positions) {
return new FilterBytesRefVector(vector, positions).asBlock();
}

public static final NamedWriteableRegistry.Entry ENTRY = new NamedWriteableRegistry.Entry(
Block.class,
"BytesRefVectorBlock",
BytesRefVectorBlock::of
);

@Override
public String getWriteableName() {
return "BytesRefVectorBlock";
}

static BytesRefVectorBlock of(StreamInput in) throws IOException {
final int positions = in.readVInt();
final boolean constant = in.readBoolean();
if (constant && positions > 0) {
return new BytesRefVectorBlock(new ConstantBytesRefVector(in.readBytesRef(), positions));
} else {
var builder = BytesRefVector.newVectorBuilder(positions);
for (int i = 0; i < positions; i++) {
builder.appendBytesRef(in.readBytesRef());
}
return new BytesRefVectorBlock(builder.build());
}
}

@Override
public void writeTo(StreamOutput out) throws IOException {
final BytesRefVector vector = this.vector;
final int positions = vector.getPositionCount();
out.writeVInt(positions);
out.writeBoolean(vector.isConstant());
if (vector.isConstant() && positions > 0) {
out.writeBytesRef(getBytesRef(0, new BytesRef()));
} else {
for (int i = 0; i < positions; i++) {
out.writeBytesRef(getBytesRef(i, new BytesRef()));
}
}
}

@Override
public boolean equals(Object obj) {
if (obj instanceof BytesRefBlock that) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,10 @@ default String getWriteableName() {
}

static DoubleBlock of(StreamInput in) throws IOException {
final boolean isVector = in.readBoolean();
if (isVector) {
return DoubleVector.of(in).asBlock();
}
final int positions = in.readVInt();
var builder = newBlockBuilder(positions);
for (int i = 0; i < positions; i++) {
Expand All @@ -63,17 +67,23 @@ static DoubleBlock of(StreamInput in) throws IOException {

@Override
default void writeTo(StreamOutput out) throws IOException {
final int positions = getPositionCount();
out.writeVInt(positions);
for (int pos = 0; pos < positions; pos++) {
if (isNull(pos)) {
out.writeBoolean(true);
} else {
out.writeBoolean(false);
final int valueCount = getValueCount(pos);
out.writeVInt(valueCount);
for (int valueIndex = 0; valueIndex < valueCount; valueIndex++) {
out.writeDouble(getDouble(getFirstValueIndex(pos) + valueIndex));
DoubleVector vector = asVector();
out.writeBoolean(vector != null);
if (vector != null) {
vector.writeTo(out);
} else {
final int positions = getPositionCount();
out.writeVInt(positions);
for (int pos = 0; pos < positions; pos++) {
if (isNull(pos)) {
out.writeBoolean(true);
} else {
out.writeBoolean(false);
final int valueCount = getValueCount(pos);
out.writeVInt(valueCount);
for (int valueIndex = 0; valueIndex < valueCount; valueIndex++) {
out.writeDouble(getDouble(getFirstValueIndex(pos) + valueIndex));
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,11 @@

package org.elasticsearch.compute.data;

import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;

import java.io.IOException;

/**
* Vector that stores double values.
* This class is generated. Do not edit it.
Expand Down Expand Up @@ -67,6 +72,35 @@ static int hash(DoubleVector vector) {
return result;
}

/** Deserializes a Vector from the given stream input. */
static DoubleVector of(StreamInput in) throws IOException {
final int positions = in.readVInt();
final boolean constant = in.readBoolean();
if (constant && positions > 0) {
return new ConstantDoubleVector(in.readDouble(), positions);
} else {
var builder = DoubleVector.newVectorBuilder(positions);
for (int i = 0; i < positions; i++) {
builder.appendDouble(in.readDouble());
}
return builder.build();
}
}

/** Serializes this Vector to the given stream output. */
default void writeTo(StreamOutput out) throws IOException {
final int positions = getPositionCount();
out.writeVInt(positions);
out.writeBoolean(isConstant());
if (isConstant() && positions > 0) {
out.writeDouble(getDouble(0));
} else {
for (int i = 0; i < positions; i++) {
out.writeDouble(getDouble(i));
}
}
}

static Builder newVectorBuilder(int estimatedSize) {
return new DoubleVectorBuilder(estimatedSize);
}
Expand Down
Loading

0 comments on commit 1be3434

Please sign in to comment.