Skip to content

Commit

Permalink
ARROW-17269: [Java] implemented TransferPair methods in MapVector to …
Browse files Browse the repository at this point in the history
…get correct valuevector as mapvector instead of listvector (#13776)

…rect valuevector as mapvector instead of listvector

Authored-by: ankitgehlot <[email protected]>
Signed-off-by: Antoine Pitrou <[email protected]>
  • Loading branch information
Ankit01Gehlot authored Aug 2, 2022
1 parent d3a0ab9 commit a9dcaff
Show file tree
Hide file tree
Showing 4 changed files with 202 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -76,13 +76,13 @@ public static ListVector empty(String name, BufferAllocator allocator) {
protected ArrowBuf validityBuffer;
protected UnionListReader reader;
private CallBack callBack;
private final FieldType fieldType;
private int validityAllocationSizeInBytes;
protected final FieldType fieldType;
protected int validityAllocationSizeInBytes;

/**
* The maximum index that is actually set.
*/
private int lastSet;
protected int lastSet;

/**
* Constructs a new instance.
Expand Down Expand Up @@ -276,7 +276,7 @@ public boolean allocateNewSafe() {
return true;
}

private void allocateValidityBuffer(final long size) {
protected void allocateValidityBuffer(final long size) {
final int curSize = (int) size;
validityBuffer = allocator.buffer(curSize);
validityBuffer.readerIndex(0);
Expand All @@ -296,7 +296,7 @@ public void reAlloc() {
super.reAlloc();
}

private void reallocValidityAndOffsetBuffers() {
protected void reallocValidityAndOffsetBuffers() {
reallocOffsetBuffer();
reallocValidityBuffer();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,12 @@
import java.util.List;

import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.util.Preconditions;
import org.apache.arrow.vector.AddOrGetResult;
import org.apache.arrow.vector.BitVectorHelper;
import org.apache.arrow.vector.FieldVector;
import org.apache.arrow.vector.ValueVector;
import org.apache.arrow.vector.ZeroVector;
import org.apache.arrow.vector.complex.impl.UnionMapReader;
import org.apache.arrow.vector.complex.impl.UnionMapWriter;
import org.apache.arrow.vector.types.Types;
Expand All @@ -32,6 +36,7 @@
import org.apache.arrow.vector.types.pojo.Field;
import org.apache.arrow.vector.types.pojo.FieldType;
import org.apache.arrow.vector.util.CallBack;
import org.apache.arrow.vector.util.TransferPair;

/**
* A MapVector is used to store entries of key/value pairs. It is a container vector that is
Expand Down Expand Up @@ -119,4 +124,151 @@ public UnionMapReader getReader() {
public MinorType getMinorType() {
return MinorType.MAP;
}

@Override
public TransferPair getTransferPair(String ref, BufferAllocator allocator) {
return getTransferPair(ref, allocator, null);
}

@Override
public TransferPair getTransferPair(String ref, BufferAllocator allocator, CallBack callBack) {
return new TransferImpl(ref, allocator, callBack);
}

@Override
public TransferPair makeTransferPair(ValueVector target) {
return new MapVector.TransferImpl((MapVector) target);
}

private class TransferImpl implements TransferPair {

MapVector to;
TransferPair dataTransferPair;

public TransferImpl(String name, BufferAllocator allocator, CallBack callBack) {
this(new MapVector(name, allocator, fieldType, callBack));
}

public TransferImpl(MapVector to) {
this.to = to;
to.addOrGetVector(vector.getField().getFieldType());
if (to.getDataVector() instanceof ZeroVector) {
to.addOrGetVector(vector.getField().getFieldType());
}
dataTransferPair = getDataVector().makeTransferPair(to.getDataVector());
}

/**
* Transfer this vector'data to another vector. The memory associated
* with this vector is transferred to the allocator of target vector
* for accounting and management purposes.
*/
@Override
public void transfer() {
to.clear();
dataTransferPair.transfer();
to.validityBuffer = transferBuffer(validityBuffer, to.allocator);
to.offsetBuffer = transferBuffer(offsetBuffer, to.allocator);
to.lastSet = lastSet;
if (valueCount > 0) {
to.setValueCount(valueCount);
}
clear();
}

/**
* Slice this vector at desired index and length and transfer the
* corresponding data to the target vector.
* @param startIndex start position of the split in source vector.
* @param length length of the split.
*/
@Override
public void splitAndTransfer(int startIndex, int length) {
Preconditions.checkArgument(startIndex >= 0 && length >= 0 && startIndex + length <= valueCount,
"Invalid parameters startIndex: %s, length: %s for valueCount: %s", startIndex, length, valueCount);
final int startPoint = offsetBuffer.getInt(startIndex * OFFSET_WIDTH);
final int sliceLength = offsetBuffer.getInt((startIndex + length) * OFFSET_WIDTH) - startPoint;
to.clear();
to.allocateOffsetBuffer((length + 1) * OFFSET_WIDTH);
/* splitAndTransfer offset buffer */
for (int i = 0; i < length + 1; i++) {
final int relativeOffset = offsetBuffer.getInt((startIndex + i) * OFFSET_WIDTH) - startPoint;
to.offsetBuffer.setInt(i * OFFSET_WIDTH, relativeOffset);
}
/* splitAndTransfer validity buffer */
splitAndTransferValidityBuffer(startIndex, length, to);
/* splitAndTransfer data buffer */
dataTransferPair.splitAndTransfer(startPoint, sliceLength);
to.lastSet = length - 1;
to.setValueCount(length);
}

/*
* transfer the validity.
*/
private void splitAndTransferValidityBuffer(int startIndex, int length, MapVector target) {
int firstByteSource = BitVectorHelper.byteIndex(startIndex);
int lastByteSource = BitVectorHelper.byteIndex(valueCount - 1);
int byteSizeTarget = getValidityBufferSizeFromCount(length);
int offset = startIndex % 8;

if (length > 0) {
if (offset == 0) {
// slice
if (target.validityBuffer != null) {
target.validityBuffer.getReferenceManager().release();
}
target.validityBuffer = validityBuffer.slice(firstByteSource, byteSizeTarget);
target.validityBuffer.getReferenceManager().retain(1);
} else {
/* Copy data
* When the first bit starts from the middle of a byte (offset != 0),
* copy data from src BitVector.
* Each byte in the target is composed by a part in i-th byte,
* another part in (i+1)-th byte.
*/
target.allocateValidityBuffer(byteSizeTarget);

for (int i = 0; i < byteSizeTarget - 1; i++) {
byte b1 = BitVectorHelper.getBitsFromCurrentByte(validityBuffer, firstByteSource + i, offset);
byte b2 = BitVectorHelper.getBitsFromNextByte(validityBuffer, firstByteSource + i + 1, offset);

target.validityBuffer.setByte(i, (b1 + b2));
}

/* Copying the last piece is done in the following manner:
* if the source vector has 1 or more bytes remaining, we copy
* the last piece as a byte formed by shifting data
* from the current byte and the next byte.
*
* if the source vector has no more bytes remaining
* (we are at the last byte), we copy the last piece as a byte
* by shifting data from the current byte.
*/
if ((firstByteSource + byteSizeTarget - 1) < lastByteSource) {
byte b1 = BitVectorHelper.getBitsFromCurrentByte(validityBuffer,
firstByteSource + byteSizeTarget - 1, offset);
byte b2 = BitVectorHelper.getBitsFromNextByte(validityBuffer,
firstByteSource + byteSizeTarget, offset);

target.validityBuffer.setByte(byteSizeTarget - 1, b1 + b2);
} else {
byte b1 = BitVectorHelper.getBitsFromCurrentByte(validityBuffer,
firstByteSource + byteSizeTarget - 1, offset);
target.validityBuffer.setByte(byteSizeTarget - 1, b1);
}
}
}
}

@Override
public ValueVector getTo() {
return to;
}

@Override
public void copyValueSafe(int from, int to) {
this.to.copyFrom(from, to, MapVector.this);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertSame;
import static org.junit.Assert.assertTrue;

import java.util.ArrayList;
Expand Down Expand Up @@ -1110,4 +1111,26 @@ public void testClearAndReuse() {
assertEquals(55, getResultValue(resultStruct));
}
}

@Test
public void testGetTransferPair() {
try (MapVector mapVector = MapVector.empty("mapVector", allocator, false)) {

FieldType type = new FieldType(false, ArrowType.Struct.INSTANCE, null, null);
AddOrGetResult<StructVector> addResult = mapVector.addOrGetVector(type);
FieldType keyType = new FieldType(false, MinorType.BIGINT.getType(), null, null);
FieldType valueType = FieldType.nullable(MinorType.FLOAT8.getType());
addResult.getVector().addOrGet(MapVector.KEY_NAME, keyType, BigIntVector.class);
addResult.getVector().addOrGet(MapVector.VALUE_NAME, valueType, Float8Vector.class);
mapVector.allocateNew();
mapVector.setValueCount(0);

assertEquals(-1, mapVector.getLastSet());
TransferPair tp = mapVector.getTransferPair(mapVector.getName(), allocator, null);
tp.transfer();
ValueVector vector = tp.getTo();
assertSame(vector.getClass(), mapVector.getClass());
vector.clear();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,10 @@
import org.apache.arrow.memory.RootAllocator;
import org.apache.arrow.vector.complex.FixedSizeListVector;
import org.apache.arrow.vector.complex.ListVector;
import org.apache.arrow.vector.complex.MapVector;
import org.apache.arrow.vector.complex.StructVector;
import org.apache.arrow.vector.complex.UnionVector;
import org.apache.arrow.vector.types.pojo.ArrowType;
import org.apache.arrow.vector.types.pojo.ArrowType.Struct;
import org.apache.arrow.vector.types.pojo.FieldType;
import org.apache.arrow.vector.util.TransferPair;
Expand Down Expand Up @@ -406,5 +408,25 @@ public void testStructVectorZeroStartIndexAndLength() {
}
}

@Test
public void testMapVectorZeroStartIndexAndLength() {
Map<String, String> metadata = new HashMap<>();
metadata.put("k1", "v1");
FieldType type = new FieldType(true, new ArrowType.Map(false), null, metadata);
try (final MapVector mapVector = new MapVector("mapVec", allocator, type, null);
final MapVector newMapVector = new MapVector("newMapVec", allocator, type, null)) {

mapVector.allocateNew();
final int valueCount = 0;
mapVector.setValueCount(valueCount);

final TransferPair tp = mapVector.makeTransferPair(newMapVector);

tp.splitAndTransfer(0, 0);
assertEquals(valueCount, newMapVector.getValueCount());

newMapVector.clear();
}
}

}

0 comments on commit a9dcaff

Please sign in to comment.