From 4fa40072a776662fdb48e99e7671784b1ee12545 Mon Sep 17 00:00:00 2001 From: Xianyang Liu Date: Thu, 18 Aug 2022 02:31:22 +0800 Subject: [PATCH] ARROW-17338: [Java] The maximum request memory of BaseVariableWidthVector should limit to Integer.MAX_VALUE (#13815) We got a IndexOutOfBoundsException: ``` 2022-08-03 09:33:34,076 Error executing query, currentState RUNNING, java.lang.RuntimeException: org.apache.spark.SparkException: Job aborted due to stage failure: Task 3315 in stage 5.0 failed 4 times, most recent failure: Lost task 3315.3 in stage 5.0 (TID 3926) (30.97.116.209 executor 49): java.lang.IndexOutOfBoundsException: index: 2147312542, length: 777713 (expected: range(0, 2147483648)) at org.apache.iceberg.shaded.org.apache.arrow.memory.ArrowBuf.checkIndex(ArrowBuf.java:699) at org.apache.iceberg.shaded.org.apache.arrow.memory.ArrowBuf.setBytes(ArrowBuf.java:826) at org.apache.iceberg.arrow.vectorized.parquet.VectorizedParquetDefinitionLevelReader$VarWidthReader.nextVal(VectorizedParquetDefinitionLevelReader.java:418) at org.apache.iceberg.arrow.vectorized.parquet.VectorizedParquetDefinitionLevelReader$BaseReader.nextBatch(VectorizedParquetDefinitionLevelReader.java:235) at org.apache.iceberg.arrow.vectorized.parquet.VectorizedPageIterator$VarWidthTypePageReader.nextVal(VectorizedPageIterator.java:353) at org.apache.iceberg.arrow.vectorized.parquet.VectorizedPageIterator$BagePageReader.nextBatch(VectorizedPageIterator.java:161) at org.apache.iceberg.arrow.vectorized.parquet.VectorizedColumnIterator$VarWidthTypeBatchReader.nextBatchOf(VectorizedColumnIterator.java:191) at org.apache.iceberg.arrow.vectorized.parquet.VectorizedColumnIterator$BatchReader.nextBatch(VectorizedColumnIterator.java:74) at org.apache.iceberg.arrow.vectorized.VectorizedArrowReader.read(VectorizedArrowReader.java:158) at org.apache.iceberg.spark.data.vectorized.ColumnarBatchReader.read(ColumnarBatchReader.java:51) at org.apache.iceberg.spark.data.vectorized.ColumnarBatchReader.read(ColumnarBatchReader.java:35) at org.apache.iceberg.parquet.VectorizedParquetReader$FileIterator.next(VectorizedParquetReader.java:134) at org.apache.iceberg.spark.source.BaseDataReader.next(BaseDataReader.java:98) at org.apache.spark.sql.execution.datasources.v2.PartitionIterator.hasNext(DataSourceRDD.scala:79) at org.apache.spark.sql.execution.datasources.v2.MetricsIterator.hasNext(DataSourceRDD.scala:112) at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37) at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.columnartorow_nextBatch_0$(Unknown Source) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source) at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:755) at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458) ``` The root cause is the following code of `BaseVariableWidthVector.handleSafe` could fail to reallocate because of int overflow and then led to `IndexOutOfBoundsException` when we put the data into the vector. ```java protected final void handleSafe(int index, int dataLength) { while (index >= getValueCapacity()) { reallocValidityAndOffsetBuffers(); } final int startOffset = lastSet < 0 ? 0 : getStartOffset(lastSet + 1); // startOffset + dataLength could overflow while (valueBuffer.capacity() < (startOffset + dataLength)) { reallocDataBuffer(); } } ``` The offset width of `BaseVariableWidthVector` is 4, while the maximum memory allocation is Long.MAX_VALUE. This makes the memory allocation check invalid. Authored-by: xianyangliu Signed-off-by: Antoine Pitrou --- .../arrow/vector/BaseVariableWidthVector.java | 41 +++++++++++++++---- .../apache/arrow/vector/TestValueVector.java | 19 +++++++++ 2 files changed, 51 insertions(+), 9 deletions(-) diff --git a/java/vector/src/main/java/org/apache/arrow/vector/BaseVariableWidthVector.java b/java/vector/src/main/java/org/apache/arrow/vector/BaseVariableWidthVector.java index 866dd9e218fc1..2a89590bf8440 100644 --- a/java/vector/src/main/java/org/apache/arrow/vector/BaseVariableWidthVector.java +++ b/java/vector/src/main/java/org/apache/arrow/vector/BaseVariableWidthVector.java @@ -46,6 +46,7 @@ public abstract class BaseVariableWidthVector extends BaseValueVector implements VariableWidthVector, FieldVector, VectorDefinitionSetter { private static final int DEFAULT_RECORD_BYTE_COUNT = 8; private static final int INITIAL_BYTE_COUNT = INITIAL_VALUE_ALLOCATION * DEFAULT_RECORD_BYTE_COUNT; + private static final int MAX_BUFFER_SIZE = (int) Math.min(MAX_ALLOCATION_SIZE, Integer.MAX_VALUE); private int lastValueCapacity; private long lastValueAllocationSizeInBytes; @@ -430,9 +431,10 @@ public void allocateNew(int valueCount) { /* Check if the data buffer size is within bounds. */ private void checkDataBufferSize(long size) { - if (size > MAX_ALLOCATION_SIZE || size < 0) { + if (size > MAX_BUFFER_SIZE || size < 0) { throw new OversizedAllocationException("Memory required for vector " + - " is (" + size + "), which is more than max allowed (" + MAX_ALLOCATION_SIZE + ")"); + "is (" + size + "), which is overflow or more than max allowed (" + MAX_BUFFER_SIZE + "). " + + "You could consider using LargeVarCharVector/LargeVarBinaryVector for large strings/large bytes types"); } } @@ -445,10 +447,10 @@ private long computeAndCheckOffsetsBufferSize(int valueCount) { * an additional slot in offset buffer. */ final long size = computeCombinedBufferSize(valueCount + 1, OFFSET_WIDTH); - if (size > MAX_ALLOCATION_SIZE) { + if (size > MAX_BUFFER_SIZE) { throw new OversizedAllocationException("Memory required for vector capacity " + valueCount + - " is (" + size + "), which is more than max allowed (" + MAX_ALLOCATION_SIZE + ")"); + " is (" + size + "), which is more than max allowed (" + MAX_BUFFER_SIZE + ")"); } return size; } @@ -514,13 +516,33 @@ public void reallocDataBuffer() { newAllocationSize = INITIAL_BYTE_COUNT * 2L; } } - newAllocationSize = CommonUtil.nextPowerOfTwo(newAllocationSize); + + reallocDataBuffer(newAllocationSize); + } + + /** + * Reallocate the data buffer to given size. Data Buffer stores the actual data for + * VARCHAR or VARBINARY elements in the vector. The actual allocate size may be larger + * than the request one because it will round up the provided value to the nearest + * power of two. + * + * @param desiredAllocSize the desired new allocation size + * @throws OversizedAllocationException if the desired new size is more than + * max allowed + * @throws OutOfMemoryException if the internal memory allocation fails + */ + public void reallocDataBuffer(long desiredAllocSize) { + if (desiredAllocSize == 0) { + return; + } + + final long newAllocationSize = CommonUtil.nextPowerOfTwo(desiredAllocSize); assert newAllocationSize >= 1; checkDataBufferSize(newAllocationSize); final ArrowBuf newBuf = allocator.buffer(newAllocationSize); - newBuf.setBytes(0, valueBuffer, 0, currentBufferCapacity); + newBuf.setBytes(0, valueBuffer, 0, valueBuffer.capacity()); valueBuffer.getReferenceManager().release(); valueBuffer = newBuf; lastValueAllocationSizeInBytes = valueBuffer.capacity(); @@ -1250,9 +1272,10 @@ protected final void handleSafe(int index, int dataLength) { while (index >= getValueCapacity()) { reallocValidityAndOffsetBuffers(); } - final int startOffset = lastSet < 0 ? 0 : getStartOffset(lastSet + 1); - while (valueBuffer.capacity() < (startOffset + dataLength)) { - reallocDataBuffer(); + final long startOffset = lastSet < 0 ? 0 : getStartOffset(lastSet + 1); + final long targetCapacity = startOffset + dataLength; + if (valueBuffer.capacity() < targetCapacity) { + reallocDataBuffer(targetCapacity); } } diff --git a/java/vector/src/test/java/org/apache/arrow/vector/TestValueVector.java b/java/vector/src/test/java/org/apache/arrow/vector/TestValueVector.java index 516daa2362280..0928d3eb03082 100644 --- a/java/vector/src/test/java/org/apache/arrow/vector/TestValueVector.java +++ b/java/vector/src/test/java/org/apache/arrow/vector/TestValueVector.java @@ -1137,6 +1137,25 @@ public void testNullableVarType2() { } } + @Test(expected = OversizedAllocationException.class) + public void testReallocateCheckSuccess() { + + // Create a new value vector for 1024 integers. + try (final VarBinaryVector vector = newVarBinaryVector(EMPTY_SCHEMA_PATH, allocator)) { + vector.allocateNew(1024 * 10, 1024); + + vector.set(0, STR1); + // Check the sample strings. + assertArrayEquals(STR1, vector.get(0)); + + // update the index offset to a larger one + ArrowBuf offsetBuf = vector.getOffsetBuffer(); + offsetBuf.setInt(VarBinaryVector.OFFSET_WIDTH, Integer.MAX_VALUE - 5); + + vector.setValueLengthSafe(1, 6); + } + } + /* * generic tests