Skip to content

Commit

Permalink
support big-endian in DecimalVector
Browse files Browse the repository at this point in the history
  • Loading branch information
kiszk committed Oct 6, 2020
1 parent c6e0fa4 commit 1eab081
Show file tree
Hide file tree
Showing 5 changed files with 136 additions and 60 deletions.
100 changes: 67 additions & 33 deletions java/vector/src/main/java/org/apache/arrow/vector/DecimalVector.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import static org.apache.arrow.vector.NullCheckingForGet.NULL_CHECKING_ENABLED;

import java.math.BigDecimal;
import java.nio.ByteOrder;

import org.apache.arrow.memory.ArrowBuf;
import org.apache.arrow.memory.BufferAllocator;
Expand All @@ -43,6 +44,7 @@
*/
public final class DecimalVector extends BaseFixedWidthVector {
public static final byte TYPE_WIDTH = 16;
private static final boolean LITTLE_ENDIAN = ByteOrder.nativeOrder() == ByteOrder.LITTLE_ENDIAN;
private final FieldReader reader;

private final int precision;
Expand Down Expand Up @@ -197,7 +199,7 @@ public void set(int index, ArrowBuf buffer) {

/**
* Set the decimal element at given index to the provided array of bytes.
* Decimal is now implemented as Little Endian. This API allows the user
* Decimal is now implemented as Native Endian. This API allows the user
* to pass a decimal value in the form of byte array in BE byte order.
*
* <p>Consumers of Arrow code can use this API instead of first swapping
Expand All @@ -218,25 +220,37 @@ public void setBigEndian(int index, byte[] value) {
valueBuffer.checkBytes((long) index * TYPE_WIDTH, (long) (index + 1) * TYPE_WIDTH);

long outAddress = valueBuffer.memoryAddress() + (long) index * TYPE_WIDTH;
// swap bytes to convert BE to LE
for (int byteIdx = 0; byteIdx < length; ++byteIdx) {
PlatformDependent.putByte(outAddress + byteIdx, value[length - 1 - byteIdx]);
}

if (length == TYPE_WIDTH) {
return;
}

if (length == 0) {
PlatformDependent.setMemory(outAddress, DecimalVector.TYPE_WIDTH, (byte) 0);
} else if (length < TYPE_WIDTH) {
// sign extend
final byte pad = (byte) (value[0] < 0 ? 0xFF : 0x00);
PlatformDependent.setMemory(outAddress + length, DecimalVector.TYPE_WIDTH - length, pad);
return;
}
if (LITTLE_ENDIAN) {
// swap bytes to convert BE to LE
for (int byteIdx = 0; byteIdx < length; ++byteIdx) {
PlatformDependent.putByte(outAddress + byteIdx, value[length - 1 - byteIdx]);
}

if (length == TYPE_WIDTH) {
return;
}

if (length < TYPE_WIDTH) {
// sign extend
final byte pad = (byte) (value[0] < 0 ? 0xFF : 0x00);
PlatformDependent.setMemory(outAddress + length, DecimalVector.TYPE_WIDTH - length, pad);
return;
}
} else {
throw new IllegalArgumentException(
"Invalid decimal value length. Valid length in [1 - 16], got " + length);
if (length <= TYPE_WIDTH) {
PlatformDependent.copyMemory(value, 0, outAddress + DecimalVector.TYPE_WIDTH - length, length);
// sign extend
final byte pad = (byte) (value[0] < 0 ? 0xFF : 0x00);
PlatformDependent.setMemory(outAddress, DecimalVector.TYPE_WIDTH - length, pad);
return;
}
}
throw new IllegalArgumentException(
"Invalid decimal value length. Valid length in [1 - 16], got " + length);
}

/**
Expand All @@ -255,7 +269,7 @@ public void set(int index, int start, ArrowBuf buffer) {
* Sets the element at given index using the buffer whose size maybe <= 16 bytes.
* @param index index to write the decimal to
* @param start start of value in the buffer
* @param buffer contains the decimal in little endian bytes
* @param buffer contains the decimal in native endian bytes
* @param length length of the value in the buffer
*/
public void setSafe(int index, int start, ArrowBuf buffer, int length) {
Expand All @@ -268,12 +282,22 @@ public void setSafe(int index, int start, ArrowBuf buffer, int length) {

long inAddress = buffer.memoryAddress() + start;
long outAddress = valueBuffer.memoryAddress() + (long) index * TYPE_WIDTH;
PlatformDependent.copyMemory(inAddress, outAddress, length);
// sign extend
if (length < 16) {
byte msb = PlatformDependent.getByte(inAddress + length - 1);
final byte pad = (byte) (msb < 0 ? 0xFF : 0x00);
PlatformDependent.setMemory(outAddress + length, DecimalVector.TYPE_WIDTH - length, pad);
if (LITTLE_ENDIAN) {
PlatformDependent.copyMemory(inAddress, outAddress, length);
// sign extend
if (length < TYPE_WIDTH) {
byte msb = PlatformDependent.getByte(inAddress + length - 1);
final byte pad = (byte) (msb < 0 ? 0xFF : 0x00);
PlatformDependent.setMemory(outAddress + length, DecimalVector.TYPE_WIDTH - length, pad);
}
} else {
PlatformDependent.copyMemory(inAddress, outAddress + DecimalVector.TYPE_WIDTH - length, length);
// sign extend
if (length < TYPE_WIDTH) {
byte msb = PlatformDependent.getByte(inAddress);
final byte pad = (byte) (msb < 0 ? 0xFF : 0x00);
PlatformDependent.setMemory(outAddress, DecimalVector.TYPE_WIDTH - length, pad);
}
}
}

Expand All @@ -296,16 +320,26 @@ public void setBigEndianSafe(int index, int start, ArrowBuf buffer, int length)
// not using buffer.getByte() to avoid boundary checks for every byte.
long inAddress = buffer.memoryAddress() + start;
long outAddress = valueBuffer.memoryAddress() + (long) index * TYPE_WIDTH;
// swap bytes to convert BE to LE
for (int byteIdx = 0; byteIdx < length; ++byteIdx) {
byte val = PlatformDependent.getByte((inAddress + length - 1) - byteIdx);
PlatformDependent.putByte(outAddress + byteIdx, val);
}
// sign extend
if (length < 16) {
byte msb = PlatformDependent.getByte(inAddress);
final byte pad = (byte) (msb < 0 ? 0xFF : 0x00);
PlatformDependent.setMemory(outAddress + length, DecimalVector.TYPE_WIDTH - length, pad);
if (LITTLE_ENDIAN) {
// swap bytes to convert BE to LE
for (int byteIdx = 0; byteIdx < length; ++byteIdx) {
byte val = PlatformDependent.getByte((inAddress + length - 1) - byteIdx);
PlatformDependent.putByte(outAddress + byteIdx, val);
}
// sign extend
if (length < TYPE_WIDTH) {
byte msb = PlatformDependent.getByte(inAddress);
final byte pad = (byte) (msb < 0 ? 0xFF : 0x00);
PlatformDependent.setMemory(outAddress + length, DecimalVector.TYPE_WIDTH - length, pad);
}
} else {
PlatformDependent.copyMemory(inAddress, outAddress + DecimalVector.TYPE_WIDTH - length, length);
// sign extend
if (length < TYPE_WIDTH) {
byte msb = PlatformDependent.getByte(inAddress);
final byte pad = (byte) (msb < 0 ? 0xFF : 0x00);
PlatformDependent.setMemory(outAddress, DecimalVector.TYPE_WIDTH - length, pad);
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,10 +62,10 @@ public class MessageSerializer {
public static final int IPC_CONTINUATION_TOKEN = -1;

/**
* Convert an array of 4 bytes to a little endian i32 value.
* Convert an array of 4 bytes in little-endian to an native-endian i32 value.
*
* @param bytes byte array with minimum length of 4
* @return converted little endian 32-bit integer
* @param bytes byte array with minimum length of 4 in little-endian
* @return converted an native-endian 32-bit integer
*/
public static int bytesToInt(byte[] bytes) {
return ((bytes[3] & 255) << 24) +
Expand All @@ -75,7 +75,7 @@ public static int bytesToInt(byte[] bytes) {
}

/**
* Convert an integer to a 4 byte array.
* Convert an integer to a 4 byte array in little-endian.
*
* @param value integer value input
* @param bytes existing byte array with minimum length of 4 to contain the conversion output
Expand All @@ -88,7 +88,7 @@ public static void intToBytes(int value, byte[] bytes) {
}

/**
* Convert a long to a 8 byte array.
* Convert a long to a 8 byte array in little-endian.
*
* @param value long value input
* @param bytes existing byte array with minimum length of 8 to contain the conversion output
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import java.math.BigDecimal;
import java.math.BigInteger;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;

import org.apache.arrow.memory.ArrowBuf;

Expand All @@ -34,6 +35,7 @@ private DecimalUtility() {}
public static final int DECIMAL_BYTE_LENGTH = 16;
public static final byte [] zeroes = new byte[] {0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0};
public static final byte [] minus_one = new byte[] {-1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1};
private static final boolean LITTLE_ENDIAN = ByteOrder.nativeOrder() == ByteOrder.LITTLE_ENDIAN;

/**
* Read an ArrowType.Decimal at the given value index in the ArrowBuf and convert to a BigDecimal
Expand All @@ -44,14 +46,16 @@ public static BigDecimal getBigDecimalFromArrowBuf(ArrowBuf bytebuf, int index,
byte temp;
final int startIndex = index * DECIMAL_BYTE_LENGTH;

// Decimal stored as little endian, need to swap bytes to make BigDecimal
bytebuf.getBytes(startIndex, value, 0, DECIMAL_BYTE_LENGTH);
int stop = DECIMAL_BYTE_LENGTH / 2;
for (int i = 0, j; i < stop; i++) {
temp = value[i];
j = (DECIMAL_BYTE_LENGTH - 1) - i;
value[i] = value[j];
value[j] = temp;
if (LITTLE_ENDIAN) {
// Decimal stored as native endian, need to swap bytes to make BigDecimal if native endian is LE
int stop = DECIMAL_BYTE_LENGTH / 2;
for (int i = 0, j; i < stop; i++) {
temp = value[i];
j = (DECIMAL_BYTE_LENGTH - 1) - i;
value[i] = value[j];
value[j] = temp;
}
}
BigInteger unscaledValue = new BigInteger(value);
return new BigDecimal(unscaledValue, scale);
Expand Down Expand Up @@ -129,9 +133,14 @@ public static void writeBigDecimalToArrowBuf(BigDecimal value, ArrowBuf bytebuf,
*/
public static void writeLongToArrowBuf(long value, ArrowBuf bytebuf, int index) {
final long addressOfValue = bytebuf.memoryAddress() + (long) index * DECIMAL_BYTE_LENGTH;
PlatformDependent.putLong(addressOfValue, value);
final long padValue = Long.signum(value) == -1 ? -1L : 0L;
PlatformDependent.putLong(addressOfValue + Long.BYTES, padValue);
if (LITTLE_ENDIAN) {
PlatformDependent.putLong(addressOfValue, value);
PlatformDependent.putLong(addressOfValue + Long.BYTES, padValue);
} else {
PlatformDependent.putLong(addressOfValue, padValue);
PlatformDependent.putLong(addressOfValue + Long.BYTES, value);
}
}

/**
Expand All @@ -149,15 +158,22 @@ private static void writeByteArrayToArrowBufHelper(byte[] bytes, ArrowBuf bytebu
throw new UnsupportedOperationException("Decimal size greater than 16 bytes");
}

// Decimal stored as little endian, need to swap data bytes before writing to ArrowBuf
byte[] bytesLE = new byte[bytes.length];
for (int i = 0; i < bytes.length; i++) {
bytesLE[i] = bytes[bytes.length - 1 - i];
if (LITTLE_ENDIAN) {
// Decimal stored as native-endian, need to swap data bytes before writing to ArrowBuf if LE
byte[] bytesLE = new byte[bytes.length];
for (int i = 0; i < bytes.length; i++) {
bytesLE[i] = bytes[bytes.length - 1 - i];
}

// Write LE data
byte [] padByes = bytes[0] < 0 ? minus_one : zeroes;
bytebuf.setBytes(startIndex, bytesLE, 0, bytes.length);
bytebuf.setBytes(startIndex + bytes.length, padByes, 0, DECIMAL_BYTE_LENGTH - bytes.length);
} else {
// Write BE data
byte [] padByes = bytes[0] < 0 ? minus_one : zeroes;
bytebuf.setBytes(startIndex + DECIMAL_BYTE_LENGTH - bytes.length, bytes, 0, bytes.length);
bytebuf.setBytes(startIndex, padByes, 0, DECIMAL_BYTE_LENGTH - bytes.length);
}

// Write LE data
byte [] padByes = bytes[0] < 0 ? minus_one : zeroes;
bytebuf.setBytes(startIndex, bytesLE, 0, bytes.length);
bytebuf.setBytes(startIndex + bytes.length, padByes, 0, DECIMAL_BYTE_LENGTH - bytes.length);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -216,11 +216,12 @@ public void testBigDecimalReadWrite() {
}

/**
* Test {@link DecimalVector#setBigEndian(int, byte[])} which takes BE layout input and stores in LE layout.
* Test {@link DecimalVector#setBigEndian(int, byte[])} which takes BE layout input and stores in native-endian (NE)
* layout.
* Cases to cover: input byte array in different lengths in range [1-16] and negative values.
*/
@Test
public void decimalBE2LE() {
public void decimalBE2NE() {
try (DecimalVector decimalVector = TestUtils.newVector(DecimalVector.class, "decimal",
new ArrowType.Decimal(21, 2), allocator)) {
decimalVector.allocateNew();
Expand Down Expand Up @@ -270,7 +271,7 @@ public void decimalBE2LE() {
}

@Test
public void setUsingArrowBufOfLEInts() {
public void setUsingArrowBufOfInts() {
try (DecimalVector decimalVector = TestUtils.newVector(DecimalVector.class, "decimal",
new ArrowType.Decimal(5, 2), allocator);
ArrowBuf buf = allocator.buffer(8);) {
Expand Down Expand Up @@ -299,7 +300,7 @@ public void setUsingArrowBufOfLEInts() {
}

@Test
public void setUsingArrowLongLEBytes() {
public void setUsingArrowLongBytes() {
try (DecimalVector decimalVector = TestUtils.newVector(DecimalVector.class, "decimal",
new ArrowType.Decimal(18, 0), allocator);
ArrowBuf buf = allocator.buffer(16);) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,31 @@ public class DecimalUtilityTest {
private static final BigInteger MIN_BIG_INT = MAX_BIG_INT.multiply(BigInteger.valueOf(-1));
private static final BigDecimal MIN_DECIMAL = new java.math.BigDecimal(MIN_BIG_INT, 0);

@Test
public void testSetLongInDecimalArrowBuf() {
try (BufferAllocator allocator = new RootAllocator(128);
ArrowBuf buf = allocator.buffer(16);
) {
int [] intValues = new int [] {Integer.MAX_VALUE, Integer.MIN_VALUE, 0};
for (int val : intValues) {
buf.clear();
DecimalUtility.writeLongToArrowBuf((long) val, buf, 0);
BigDecimal actual = DecimalUtility.getBigDecimalFromArrowBuf(buf, 0, 0);
BigDecimal expected = BigDecimal.valueOf(val);
Assert.assertEquals(expected, actual);
}

long [] longValues = new long[] {Long.MIN_VALUE, 0 , Long.MAX_VALUE};
for (long val : longValues) {
buf.clear();
DecimalUtility.writeLongToArrowBuf(val, buf, 0);
BigDecimal actual = DecimalUtility.getBigDecimalFromArrowBuf(buf, 0, 0);
BigDecimal expected = BigDecimal.valueOf(val);
Assert.assertEquals(expected, actual);
}
}
}

@Test
public void testSetByteArrayInDecimalArrowBuf() {
try (BufferAllocator allocator = new RootAllocator(128);
Expand Down

0 comments on commit 1eab081

Please sign in to comment.