Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

GH-41730: [Java] Adding variadicBufferCounts to RecordBatch #41732

Merged
merged 16 commits into from
May 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 13 additions & 4 deletions java/c/src/main/java/org/apache/arrow/c/StructVectorLoader.java
Original file line number Diff line number Diff line change
Expand Up @@ -90,8 +90,12 @@ public StructVector load(BufferAllocator allocator, ArrowRecordBatch recordBatch
.fromCompressionType(recordBatch.getBodyCompression().getCodec());
decompressionNeeded = codecType != CompressionUtil.CodecType.NO_COMPRESSION;
CompressionCodec codec = decompressionNeeded ? factory.createCodec(codecType) : NoCompressionCodec.INSTANCE;
Iterator<Long> variadicBufferCounts = null;
if (recordBatch.getVariadicBufferCounts() != null && !recordBatch.getVariadicBufferCounts().isEmpty()) {
variadicBufferCounts = recordBatch.getVariadicBufferCounts().iterator();
}
for (FieldVector fieldVector : result.getChildrenFromFields()) {
loadBuffers(fieldVector, fieldVector.getField(), buffers, nodes, codec);
loadBuffers(fieldVector, fieldVector.getField(), buffers, nodes, codec, variadicBufferCounts);
}
result.loadFieldBuffers(new ArrowFieldNode(recordBatch.getLength(), 0), Collections.singletonList(null));
if (nodes.hasNext() || buffers.hasNext()) {
Expand All @@ -102,10 +106,15 @@ public StructVector load(BufferAllocator allocator, ArrowRecordBatch recordBatch
}

private void loadBuffers(FieldVector vector, Field field, Iterator<ArrowBuf> buffers, Iterator<ArrowFieldNode> nodes,
CompressionCodec codec) {
CompressionCodec codec, Iterator<Long> variadicBufferCounts) {
checkArgument(nodes.hasNext(), "no more field nodes for field %s and vector %s", field, vector);
ArrowFieldNode fieldNode = nodes.next();
int bufferLayoutCount = TypeLayout.getTypeBufferCount(field.getType());
// variadicBufferLayoutCount will be 0 for vectors of type except BaseVariableWidthViewVector
long variadicBufferLayoutCount = 0;
if (variadicBufferCounts != null) {
variadicBufferLayoutCount = variadicBufferCounts.next();
}
int bufferLayoutCount = (int) (variadicBufferLayoutCount + TypeLayout.getTypeBufferCount(field.getType()));
List<ArrowBuf> ownBuffers = new ArrayList<>(bufferLayoutCount);
for (int j = 0; j < bufferLayoutCount; j++) {
ArrowBuf nextBuf = buffers.next();
Expand Down Expand Up @@ -138,7 +147,7 @@ private void loadBuffers(FieldVector vector, Field field, Iterator<ArrowBuf> buf
for (int i = 0; i < childrenFromFields.size(); i++) {
Field child = children.get(i);
FieldVector fieldVector = childrenFromFields.get(i);
loadBuffers(fieldVector, child, buffers, nodes, codec);
loadBuffers(fieldVector, child, buffers, nodes, codec, variadicBufferCounts);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import java.util.List;

import org.apache.arrow.memory.ArrowBuf;
import org.apache.arrow.vector.BaseVariableWidthViewVector;
import org.apache.arrow.vector.FieldVector;
import org.apache.arrow.vector.TypeLayout;
import org.apache.arrow.vector.complex.StructVector;
Expand Down Expand Up @@ -87,17 +88,28 @@ public StructVectorUnloader(StructVector root, boolean includeNullCount, Compres
public ArrowRecordBatch getRecordBatch() {
List<ArrowFieldNode> nodes = new ArrayList<>();
List<ArrowBuf> buffers = new ArrayList<>();
List<Long> variadicBufferCounts = new ArrayList<>();
for (FieldVector vector : root.getChildrenFromFields()) {
appendNodes(vector, nodes, buffers);
appendNodes(vector, nodes, buffers, variadicBufferCounts);
}
return new ArrowRecordBatch(root.getValueCount(), nodes, buffers, CompressionUtil.createBodyCompression(codec),
alignBuffers);
variadicBufferCounts, alignBuffers);
}

private void appendNodes(FieldVector vector, List<ArrowFieldNode> nodes, List<ArrowBuf> buffers) {
private long getVariadicBufferCount(FieldVector vector) {
if (vector instanceof BaseVariableWidthViewVector) {
return ((BaseVariableWidthViewVector) vector).getDataBuffers().size();
}
return 0L;
}

private void appendNodes(FieldVector vector, List<ArrowFieldNode> nodes, List<ArrowBuf> buffers,
List<Long> variadicBufferCounts) {
nodes.add(new ArrowFieldNode(vector.getValueCount(), includeNullCount ? vector.getNullCount() : -1));
List<ArrowBuf> fieldBuffers = vector.getFieldBuffers();
int expectedBufferCount = TypeLayout.getTypeBufferCount(vector.getField().getType());
long variadicBufferCount = getVariadicBufferCount(vector);
int expectedBufferCount = (int) (TypeLayout.getTypeBufferCount(vector.getField().getType()) + variadicBufferCount);
variadicBufferCounts.add(variadicBufferCount);
if (fieldBuffers.size() != expectedBufferCount) {
throw new IllegalArgumentException(String.format("wrong number of buffers for field %s in vector %s. found: %s",
vector.getField(), vector.getClass().getSimpleName(), fieldBuffers));
Expand All @@ -106,7 +118,7 @@ private void appendNodes(FieldVector vector, List<ArrowFieldNode> nodes, List<Ar
buffers.add(codec.compress(vector.getAllocator(), buf));
}
for (FieldVector child : vector.getChildrenFromFields()) {
appendNodes(child, nodes, buffers);
appendNodes(child, nodes, buffers, variadicBufferCounts);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ public ArrowBuf getDataBuffer() {
/**
* Get the buffers that store the data for views in the vector.
*
* @return buffer
* @return list of ArrowBuf
*/
public List<ArrowBuf> getDataBuffers() {
return dataBuffers;
Expand Down Expand Up @@ -368,8 +368,21 @@ public List<FieldVector> getChildrenFromFields() {
*/
@Override
public void loadFieldBuffers(ArrowFieldNode fieldNode, List<ArrowBuf> ownBuffers) {
// TODO: https://github.com/apache/arrow/issues/40931
throw new UnsupportedOperationException("loadFieldBuffers is not supported for BaseVariableWidthViewVector");
ArrowBuf bitBuf = ownBuffers.get(0);
ArrowBuf viewBuf = ownBuffers.get(1);
List<ArrowBuf> dataBufs = ownBuffers.subList(2, ownBuffers.size());

this.clear();

this.viewBuffer = viewBuf.getReferenceManager().retain(viewBuf, allocator);
this.validityBuffer = BitVectorHelper.loadValidityBuffer(fieldNode, bitBuf, allocator);

for (ArrowBuf dataBuf : dataBufs) {
this.dataBuffers.add(dataBuf.getReferenceManager().retain(dataBuf, allocator));
}

lastSet = fieldNode.getLength() - 1;
valueCount = fieldNode.getLength();
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.arrow.vector.types.pojo.ArrowType;
import org.apache.arrow.vector.types.pojo.ArrowType.ArrowTypeVisitor;
import org.apache.arrow.vector.types.pojo.ArrowType.Binary;
import org.apache.arrow.vector.types.pojo.ArrowType.BinaryView;
import org.apache.arrow.vector.types.pojo.ArrowType.Bool;
import org.apache.arrow.vector.types.pojo.ArrowType.Date;
import org.apache.arrow.vector.types.pojo.ArrowType.Decimal;
Expand Down Expand Up @@ -186,8 +187,7 @@ public TypeLayout visit(Binary type) {

@Override
public TypeLayout visit(ArrowType.BinaryView type) {
// TODO: https://github.com/apache/arrow/issues/40934
throw new UnsupportedOperationException("BinaryView not supported");
return newVariableWidthViewTypeLayout();
}

@Override
Expand All @@ -197,8 +197,7 @@ public TypeLayout visit(Utf8 type) {

@Override
public TypeLayout visit(Utf8View type) {
// TODO: https://github.com/apache/arrow/issues/40934
throw new UnsupportedOperationException("Utf8View not supported");
return newVariableWidthViewTypeLayout();
}

@Override
Expand All @@ -216,7 +215,12 @@ private TypeLayout newVariableWidthTypeLayout() {
BufferLayout.byteVector());
}

private TypeLayout newVariableWidthViewTypeLayout() {
return newPrimitiveTypeLayout(BufferLayout.validityVector(), BufferLayout.byteVector());
}

private TypeLayout newLargeVariableWidthTypeLayout() {
// NOTE: only considers the non variadic buffers
return newPrimitiveTypeLayout(BufferLayout.validityVector(), BufferLayout.largeOffsetBuffer(),
BufferLayout.byteVector());
}
Expand Down Expand Up @@ -377,9 +381,9 @@ public Integer visit(Binary type) {
}

@Override
public Integer visit(ArrowType.BinaryView type) {
// TODO: https://github.com/apache/arrow/issues/40935
return VARIABLE_WIDTH_BUFFER_COUNT;
public Integer visit(BinaryView type) {
// NOTE: only consider the validity and view buffers
return 2;
}

@Override
Expand All @@ -389,8 +393,8 @@ public Integer visit(Utf8 type) {

@Override
public Integer visit(Utf8View type) {
// TODO: https://github.com/apache/arrow/issues/40935
return VARIABLE_WIDTH_BUFFER_COUNT;
// NOTE: only consider the validity and view buffers
return 2;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,8 +80,13 @@ public void load(ArrowRecordBatch recordBatch) {
CompressionUtil.CodecType.fromCompressionType(recordBatch.getBodyCompression().getCodec());
decompressionNeeded = codecType != CompressionUtil.CodecType.NO_COMPRESSION;
CompressionCodec codec = decompressionNeeded ? factory.createCodec(codecType) : NoCompressionCodec.INSTANCE;
Iterator<Long> variadicBufferCounts = null;
if (recordBatch.getVariadicBufferCounts() != null && !recordBatch.getVariadicBufferCounts().isEmpty()) {
variadicBufferCounts = recordBatch.getVariadicBufferCounts().iterator();
}

for (FieldVector fieldVector : root.getFieldVectors()) {
loadBuffers(fieldVector, fieldVector.getField(), buffers, nodes, codec);
loadBuffers(fieldVector, fieldVector.getField(), buffers, nodes, codec, variadicBufferCounts);
}
root.setRowCount(recordBatch.getLength());
if (nodes.hasNext() || buffers.hasNext()) {
Expand All @@ -95,10 +100,16 @@ private void loadBuffers(
Field field,
Iterator<ArrowBuf> buffers,
Iterator<ArrowFieldNode> nodes,
CompressionCodec codec) {
CompressionCodec codec,
Iterator<Long> variadicBufferCounts) {
checkArgument(nodes.hasNext(), "no more field nodes for field %s and vector %s", field, vector);
ArrowFieldNode fieldNode = nodes.next();
int bufferLayoutCount = TypeLayout.getTypeBufferCount(field.getType());
// variadicBufferLayoutCount will be 0 for vectors of type except BaseVariableWidthViewVector
long variadicBufferLayoutCount = 0;
if (variadicBufferCounts != null) {
variadicBufferLayoutCount = variadicBufferCounts.next();
}
int bufferLayoutCount = (int) (variadicBufferLayoutCount + TypeLayout.getTypeBufferCount(field.getType()));
List<ArrowBuf> ownBuffers = new ArrayList<>(bufferLayoutCount);
for (int j = 0; j < bufferLayoutCount; j++) {
ArrowBuf nextBuf = buffers.next();
Expand Down Expand Up @@ -130,7 +141,7 @@ private void loadBuffers(
for (int i = 0; i < childrenFromFields.size(); i++) {
Field child = children.get(i);
FieldVector fieldVector = childrenFromFields.get(i);
loadBuffers(fieldVector, child, buffers, nodes, codec);
loadBuffers(fieldVector, child, buffers, nodes, codec, variadicBufferCounts);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,19 +80,30 @@ public VectorUnloader(
public ArrowRecordBatch getRecordBatch() {
List<ArrowFieldNode> nodes = new ArrayList<>();
List<ArrowBuf> buffers = new ArrayList<>();
List<Long> variadicBufferCounts = new ArrayList<>();
for (FieldVector vector : root.getFieldVectors()) {
appendNodes(vector, nodes, buffers);
appendNodes(vector, nodes, buffers, variadicBufferCounts);
}
// Do NOT retain buffers in ArrowRecordBatch constructor since we have already retained them.
return new ArrowRecordBatch(
root.getRowCount(), nodes, buffers, CompressionUtil.createBodyCompression(codec), alignBuffers,
/*retainBuffers*/ false);
root.getRowCount(), nodes, buffers, CompressionUtil.createBodyCompression(codec),
variadicBufferCounts, alignBuffers, /*retainBuffers*/ false);
}

private void appendNodes(FieldVector vector, List<ArrowFieldNode> nodes, List<ArrowBuf> buffers) {
private long getVariadicBufferCount(FieldVector vector) {
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

there is a slight issue in adding an interface for this. The VectorUnloader and StructVectorUnloader are in vector and c modules respectively. So we cannot have a common interface for both. So would it be fine if we add BaseVectorUnloader interface for both modules? I didn't add it yet.

if (vector instanceof BaseVariableWidthViewVector) {
return ((BaseVariableWidthViewVector) vector).getDataBuffers().size();
}
return 0L;
}

private void appendNodes(FieldVector vector, List<ArrowFieldNode> nodes, List<ArrowBuf> buffers,
List<Long> variadicBufferCounts) {
nodes.add(new ArrowFieldNode(vector.getValueCount(), includeNullCount ? vector.getNullCount() : -1));
List<ArrowBuf> fieldBuffers = vector.getFieldBuffers();
int expectedBufferCount = TypeLayout.getTypeBufferCount(vector.getField().getType());
long variadicBufferCount = getVariadicBufferCount(vector);
int expectedBufferCount = (int) (TypeLayout.getTypeBufferCount(vector.getField().getType()) + variadicBufferCount);
variadicBufferCounts.add(variadicBufferCount);
if (fieldBuffers.size() != expectedBufferCount) {
throw new IllegalArgumentException(String.format(
"wrong number of buffers for field %s in vector %s. found: %s",
Expand All @@ -107,7 +118,7 @@ private void appendNodes(FieldVector vector, List<ArrowFieldNode> nodes, List<Ar
buffers.add(codec.compress(vector.getAllocator(), buf));
}
for (FieldVector child : vector.getChildrenFromFields()) {
appendNodes(child, nodes, buffers);
appendNodes(child, nodes, buffers, variadicBufferCounts);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ public final class ViewVarCharVector extends BaseVariableWidthViewVector {
* @param allocator allocator for memory management.
*/
public ViewVarCharVector(String name, BufferAllocator allocator) {
this(name, FieldType.nullable(MinorType.VARCHAR.getType()), allocator);
this(name, FieldType.nullable(MinorType.VIEWVARCHAR.getType()), allocator);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -715,6 +715,7 @@ private ArrowBuf readIntoBuffer(BufferAllocator allocator, BufferType bufferType

private void readFromJsonIntoVector(Field field, FieldVector vector) throws JsonParseException, IOException {
ArrowType type = field.getType();
// TODO: https://github.com/apache/arrow/issues/41733
TypeLayout typeLayout = TypeLayout.getTypeLayout(type);
List<BufferType> vectorTypes = typeLayout.getBufferTypes();
ArrowBuf[] vectorBuffers = new ArrowBuf[vectorTypes.size()];
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,7 @@ private void writeBatch(VectorSchemaRoot recordBatch) throws IOException {
}

private void writeFromVectorIntoJson(Field field, FieldVector vector) throws IOException {
// TODO: https://github.com/apache/arrow/issues/41733
List<BufferType> vectorTypes = TypeLayout.getTypeLayout(field.getType()).getBufferTypes();
List<ArrowBuf> vectorBuffers = vector.getFieldBuffers();
if (vectorTypes.size() != vectorBuffers.size()) {
Expand Down
Loading
Loading