Skip to content

Commit

Permalink
Specialize serialization of array blocks (elastic#106102)
Browse files Browse the repository at this point in the history
A follow-up of elastic#105893

Currently, we serialize blocks value by value, which is simple but 
effective. However, it would be more efficient to serialize the
underlying structures of array blocks instead.
  • Loading branch information
dnhatn authored Mar 9, 2024
1 parent 58477b5 commit 721d9fa
Show file tree
Hide file tree
Showing 16 changed files with 447 additions and 103 deletions.
5 changes: 5 additions & 0 deletions docs/changelog/106102.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 106102
summary: Specialize serialization of array blocks
area: ES|QL
type: enhancement
issues: []
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,7 @@ static TransportVersion def(int id) {
public static final TransportVersion ADD_FAILURE_STORE_INDICES_OPTIONS = def(8_599_00_0);
public static final TransportVersion ESQL_ENRICH_OPERATOR_STATUS = def(8_600_00_0);
public static final TransportVersion ESQL_SERIALIZE_ARRAY_VECTOR = def(8_601_00_0);
public static final TransportVersion ESQL_SERIALIZE_ARRAY_BLOCK = def(8_602_00_0);

/*
* STOP! READ THIS FIRST! No, really,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,10 @@
package org.elasticsearch.compute.data;

import org.apache.lucene.util.RamUsageEstimator;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.core.Releasables;

import java.io.IOException;
import java.util.BitSet;

/**
Expand Down Expand Up @@ -53,6 +55,29 @@ private BooleanArrayBlock(
: firstValueIndexes[getPositionCount()] == vector.getPositionCount();
}

static BooleanArrayBlock readArrayBlock(BlockFactory blockFactory, BlockStreamInput in) throws IOException {
final SubFields sub = new SubFields(blockFactory, in);
BooleanArrayVector vector = null;
boolean success = false;
try {
vector = BooleanArrayVector.readArrayVector(sub.vectorPositions(), in, blockFactory);
var block = new BooleanArrayBlock(vector, sub.positionCount, sub.firstValueIndexes, sub.nullsMask, sub.mvOrdering);
blockFactory.adjustBreaker(block.ramBytesUsed() - vector.ramBytesUsed() - sub.bytesReserved);
success = true;
return block;
} finally {
if (success == false) {
Releasables.close(vector);
blockFactory.adjustBreaker(-sub.bytesReserved);
}
}
}

void writeArrayBlock(StreamOutput out) throws IOException {
writeSubFields(out);
vector.writeArrayVector(vector.getPositionCount(), out);
}

@Override
public BooleanVector asVector() {
return null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

package org.elasticsearch.compute.data;

import org.elasticsearch.TransportVersions;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
Expand Down Expand Up @@ -49,10 +50,19 @@ private static BooleanBlock readFrom(StreamInput in) throws IOException {
}

private static BooleanBlock readFrom(BlockStreamInput in) throws IOException {
final boolean isVector = in.readBoolean();
if (isVector) {
return BooleanVector.readFrom(in.blockFactory(), in).asBlock();
}
final byte serializationType = in.readByte();
return switch (serializationType) {
case SERIALIZE_BLOCK_VALUES -> BooleanBlock.readValues(in);
case SERIALIZE_BLOCK_VECTOR -> BooleanVector.readFrom(in.blockFactory(), in).asBlock();
case SERIALIZE_BLOCK_ARRAY -> BooleanArrayBlock.readArrayBlock(in.blockFactory(), in);
default -> {
assert false : "invalid block serialization type " + serializationType;
throw new IllegalStateException("invalid serialization type " + serializationType);
}
};
}

private static BooleanBlock readValues(BlockStreamInput in) throws IOException {
final int positions = in.readVInt();
try (BooleanBlock.Builder builder = in.blockFactory().newBooleanBlockBuilder(positions)) {
for (int i = 0; i < positions; i++) {
Expand All @@ -74,22 +84,31 @@ private static BooleanBlock readFrom(BlockStreamInput in) throws IOException {
@Override
default void writeTo(StreamOutput out) throws IOException {
BooleanVector vector = asVector();
out.writeBoolean(vector != null);
final var version = out.getTransportVersion();
if (vector != null) {
out.writeByte(SERIALIZE_BLOCK_VECTOR);
vector.writeTo(out);
} else if (version.onOrAfter(TransportVersions.ESQL_SERIALIZE_ARRAY_BLOCK) && this instanceof BooleanArrayBlock b) {
out.writeByte(SERIALIZE_BLOCK_ARRAY);
b.writeArrayBlock(out);
} else {
final int positions = getPositionCount();
out.writeVInt(positions);
for (int pos = 0; pos < positions; pos++) {
if (isNull(pos)) {
out.writeBoolean(true);
} else {
out.writeBoolean(false);
final int valueCount = getValueCount(pos);
out.writeVInt(valueCount);
for (int valueIndex = 0; valueIndex < valueCount; valueIndex++) {
out.writeBoolean(getBoolean(getFirstValueIndex(pos) + valueIndex));
}
out.writeByte(SERIALIZE_BLOCK_VALUES);
BooleanBlock.writeValues(this, out);
}
}

private static void writeValues(BooleanBlock block, StreamOutput out) throws IOException {
final int positions = block.getPositionCount();
out.writeVInt(positions);
for (int pos = 0; pos < positions; pos++) {
if (block.isNull(pos)) {
out.writeBoolean(true);
} else {
out.writeBoolean(false);
final int valueCount = block.getValueCount(pos);
out.writeVInt(valueCount);
for (int valueIndex = 0; valueIndex < valueCount; valueIndex++) {
out.writeBoolean(block.getBoolean(block.getFirstValueIndex(pos) + valueIndex));
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,11 @@

import org.apache.lucene.util.BytesRef;
import org.apache.lucene.util.RamUsageEstimator;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.util.BytesRefArray;
import org.elasticsearch.core.Releasables;

import java.io.IOException;
import java.util.BitSet;

/**
Expand Down Expand Up @@ -56,6 +58,29 @@ private BytesRefArrayBlock(
: firstValueIndexes[getPositionCount()] == vector.getPositionCount();
}

static BytesRefArrayBlock readArrayBlock(BlockFactory blockFactory, BlockStreamInput in) throws IOException {
final SubFields sub = new SubFields(blockFactory, in);
BytesRefArrayVector vector = null;
boolean success = false;
try {
vector = BytesRefArrayVector.readArrayVector(sub.vectorPositions(), in, blockFactory);
var block = new BytesRefArrayBlock(vector, sub.positionCount, sub.firstValueIndexes, sub.nullsMask, sub.mvOrdering);
blockFactory.adjustBreaker(block.ramBytesUsed() - vector.ramBytesUsed() - sub.bytesReserved);
success = true;
return block;
} finally {
if (success == false) {
Releasables.close(vector);
blockFactory.adjustBreaker(-sub.bytesReserved);
}
}
}

void writeArrayBlock(StreamOutput out) throws IOException {
writeSubFields(out);
vector.writeArrayVector(vector.getPositionCount(), out);
}

@Override
public BytesRefVector asVector() {
return null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
package org.elasticsearch.compute.data;

import org.apache.lucene.util.BytesRef;
import org.elasticsearch.TransportVersions;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
Expand Down Expand Up @@ -53,10 +54,19 @@ private static BytesRefBlock readFrom(StreamInput in) throws IOException {
}

private static BytesRefBlock readFrom(BlockStreamInput in) throws IOException {
final boolean isVector = in.readBoolean();
if (isVector) {
return BytesRefVector.readFrom(in.blockFactory(), in).asBlock();
}
final byte serializationType = in.readByte();
return switch (serializationType) {
case SERIALIZE_BLOCK_VALUES -> BytesRefBlock.readValues(in);
case SERIALIZE_BLOCK_VECTOR -> BytesRefVector.readFrom(in.blockFactory(), in).asBlock();
case SERIALIZE_BLOCK_ARRAY -> BytesRefArrayBlock.readArrayBlock(in.blockFactory(), in);
default -> {
assert false : "invalid block serialization type " + serializationType;
throw new IllegalStateException("invalid serialization type " + serializationType);
}
};
}

private static BytesRefBlock readValues(BlockStreamInput in) throws IOException {
final int positions = in.readVInt();
try (BytesRefBlock.Builder builder = in.blockFactory().newBytesRefBlockBuilder(positions)) {
for (int i = 0; i < positions; i++) {
Expand All @@ -78,22 +88,32 @@ private static BytesRefBlock readFrom(BlockStreamInput in) throws IOException {
@Override
default void writeTo(StreamOutput out) throws IOException {
BytesRefVector vector = asVector();
out.writeBoolean(vector != null);
final var version = out.getTransportVersion();
if (vector != null) {
out.writeByte(SERIALIZE_BLOCK_VECTOR);
vector.writeTo(out);
} else if (version.onOrAfter(TransportVersions.ESQL_SERIALIZE_ARRAY_BLOCK) && this instanceof BytesRefArrayBlock b) {
out.writeByte(SERIALIZE_BLOCK_ARRAY);
b.writeArrayBlock(out);
} else {
final int positions = getPositionCount();
out.writeVInt(positions);
for (int pos = 0; pos < positions; pos++) {
if (isNull(pos)) {
out.writeBoolean(true);
} else {
out.writeBoolean(false);
final int valueCount = getValueCount(pos);
out.writeVInt(valueCount);
for (int valueIndex = 0; valueIndex < valueCount; valueIndex++) {
out.writeBytesRef(getBytesRef(getFirstValueIndex(pos) + valueIndex, new BytesRef()));
}
out.writeByte(SERIALIZE_BLOCK_VALUES);
BytesRefBlock.writeValues(this, out);
}
}

private static void writeValues(BytesRefBlock block, StreamOutput out) throws IOException {
final int positions = block.getPositionCount();
out.writeVInt(positions);
for (int pos = 0; pos < positions; pos++) {
if (block.isNull(pos)) {
out.writeBoolean(true);
} else {
out.writeBoolean(false);
final int valueCount = block.getValueCount(pos);
out.writeVInt(valueCount);
var scratch = new BytesRef();
for (int valueIndex = 0; valueIndex < valueCount; valueIndex++) {
out.writeBytesRef(block.getBytesRef(block.getFirstValueIndex(pos) + valueIndex, scratch));
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,10 @@
package org.elasticsearch.compute.data;

import org.apache.lucene.util.RamUsageEstimator;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.core.Releasables;

import java.io.IOException;
import java.util.BitSet;

/**
Expand Down Expand Up @@ -53,6 +55,29 @@ private DoubleArrayBlock(
: firstValueIndexes[getPositionCount()] == vector.getPositionCount();
}

static DoubleArrayBlock readArrayBlock(BlockFactory blockFactory, BlockStreamInput in) throws IOException {
final SubFields sub = new SubFields(blockFactory, in);
DoubleArrayVector vector = null;
boolean success = false;
try {
vector = DoubleArrayVector.readArrayVector(sub.vectorPositions(), in, blockFactory);
var block = new DoubleArrayBlock(vector, sub.positionCount, sub.firstValueIndexes, sub.nullsMask, sub.mvOrdering);
blockFactory.adjustBreaker(block.ramBytesUsed() - vector.ramBytesUsed() - sub.bytesReserved);
success = true;
return block;
} finally {
if (success == false) {
Releasables.close(vector);
blockFactory.adjustBreaker(-sub.bytesReserved);
}
}
}

void writeArrayBlock(StreamOutput out) throws IOException {
writeSubFields(out);
vector.writeArrayVector(vector.getPositionCount(), out);
}

@Override
public DoubleVector asVector() {
return null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

package org.elasticsearch.compute.data;

import org.elasticsearch.TransportVersions;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
Expand Down Expand Up @@ -49,10 +50,19 @@ private static DoubleBlock readFrom(StreamInput in) throws IOException {
}

private static DoubleBlock readFrom(BlockStreamInput in) throws IOException {
final boolean isVector = in.readBoolean();
if (isVector) {
return DoubleVector.readFrom(in.blockFactory(), in).asBlock();
}
final byte serializationType = in.readByte();
return switch (serializationType) {
case SERIALIZE_BLOCK_VALUES -> DoubleBlock.readValues(in);
case SERIALIZE_BLOCK_VECTOR -> DoubleVector.readFrom(in.blockFactory(), in).asBlock();
case SERIALIZE_BLOCK_ARRAY -> DoubleArrayBlock.readArrayBlock(in.blockFactory(), in);
default -> {
assert false : "invalid block serialization type " + serializationType;
throw new IllegalStateException("invalid serialization type " + serializationType);
}
};
}

private static DoubleBlock readValues(BlockStreamInput in) throws IOException {
final int positions = in.readVInt();
try (DoubleBlock.Builder builder = in.blockFactory().newDoubleBlockBuilder(positions)) {
for (int i = 0; i < positions; i++) {
Expand All @@ -74,22 +84,31 @@ private static DoubleBlock readFrom(BlockStreamInput in) throws IOException {
@Override
default void writeTo(StreamOutput out) throws IOException {
DoubleVector vector = asVector();
out.writeBoolean(vector != null);
final var version = out.getTransportVersion();
if (vector != null) {
out.writeByte(SERIALIZE_BLOCK_VECTOR);
vector.writeTo(out);
} else if (version.onOrAfter(TransportVersions.ESQL_SERIALIZE_ARRAY_BLOCK) && this instanceof DoubleArrayBlock b) {
out.writeByte(SERIALIZE_BLOCK_ARRAY);
b.writeArrayBlock(out);
} else {
final int positions = getPositionCount();
out.writeVInt(positions);
for (int pos = 0; pos < positions; pos++) {
if (isNull(pos)) {
out.writeBoolean(true);
} else {
out.writeBoolean(false);
final int valueCount = getValueCount(pos);
out.writeVInt(valueCount);
for (int valueIndex = 0; valueIndex < valueCount; valueIndex++) {
out.writeDouble(getDouble(getFirstValueIndex(pos) + valueIndex));
}
out.writeByte(SERIALIZE_BLOCK_VALUES);
DoubleBlock.writeValues(this, out);
}
}

private static void writeValues(DoubleBlock block, StreamOutput out) throws IOException {
final int positions = block.getPositionCount();
out.writeVInt(positions);
for (int pos = 0; pos < positions; pos++) {
if (block.isNull(pos)) {
out.writeBoolean(true);
} else {
out.writeBoolean(false);
final int valueCount = block.getValueCount(pos);
out.writeVInt(valueCount);
for (int valueIndex = 0; valueIndex < valueCount; valueIndex++) {
out.writeDouble(block.getDouble(block.getFirstValueIndex(pos) + valueIndex));
}
}
}
Expand Down
Loading

0 comments on commit 721d9fa

Please sign in to comment.