From fc0e6944a531c806cf4fd83141646f264fa19af3 Mon Sep 17 00:00:00 2001 From: Wenchen Fan Date: Tue, 13 Jun 2017 09:15:14 +0800 Subject: [PATCH] Revert "[SPARK-21046][SQL] simplify the array offset and length in ColumnVector" This reverts commit 22dd65f58e12cb3a883d106fcccdff25a2a00fe8. --- .../execution/vectorized/ColumnVector.java | 35 +++++++------ .../vectorized/OffHeapColumnVector.java | 47 ++++++++++++++++-- .../vectorized/OnHeapColumnVector.java | 49 ++++++++++++++++--- .../vectorized/ColumnarBatchSuite.scala | 17 +++---- 4 files changed, 110 insertions(+), 38 deletions(-) diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnVector.java b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnVector.java index e50799eeb27ba..24260a60197f2 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnVector.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnVector.java @@ -14,7 +14,6 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - package org.apache.spark.sql.execution.vectorized; import java.math.BigDecimal; @@ -519,13 +518,19 @@ private void throwUnsupportedException(int requiredCapacity, Throwable cause) { public abstract double getDouble(int rowId); /** - * After writing array elements to the child column vector, call this method to set the offset and - * size of the written array. + * Puts a byte array that already exists in this column. */ - public void putArrayOffsetAndSize(int rowId, int offset, int size) { - long offsetAndSize = (((long) offset) << 32) | size; - putLong(rowId, offsetAndSize); - } + public abstract void putArray(int rowId, int offset, int length); + + /** + * Returns the length of the array at rowid. + */ + public abstract int getArrayLength(int rowId); + + /** + * Returns the offset of the array at rowid. + */ + public abstract int getArrayOffset(int rowId); /** * Returns a utility object to get structs. @@ -548,9 +553,8 @@ public ColumnarBatch.Row getStruct(int rowId, int size) { * Returns the array at rowid. */ public final Array getArray(int rowId) { - long offsetAndSize = getLong(rowId); - resultArray.offset = (int) (offsetAndSize >> 32); - resultArray.length = (int) offsetAndSize; + resultArray.length = getArrayLength(rowId); + resultArray.offset = getArrayOffset(rowId); return resultArray; } @@ -562,12 +566,7 @@ public final Array getArray(int rowId) { /** * Sets the value at rowId to `value`. */ - public int putByteArray(int rowId, byte[] value, int offset, int length) { - int result = arrayData().appendBytes(length, value, offset); - putArrayOffsetAndSize(rowId, result, length); - return result; - } - + public abstract int putByteArray(int rowId, byte[] value, int offset, int count); public final int putByteArray(int rowId, byte[] value) { return putByteArray(rowId, value, 0, value.length); } @@ -830,13 +829,13 @@ public final int appendDoubles(int length, double[] src, int offset) { public final int appendByteArray(byte[] value, int offset, int length) { int copiedOffset = arrayData().appendBytes(length, value, offset); reserve(elementsAppended + 1); - putArrayOffsetAndSize(elementsAppended, copiedOffset, length); + putArray(elementsAppended, copiedOffset, length); return elementsAppended++; } public final int appendArray(int length) { reserve(elementsAppended + 1); - putArrayOffsetAndSize(elementsAppended, arrayData().elementsAppended, length); + putArray(elementsAppended, arrayData().elementsAppended, length); return elementsAppended++; } diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OffHeapColumnVector.java b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OffHeapColumnVector.java index 4dc4d34db37fb..a7d3744d00e91 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OffHeapColumnVector.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OffHeapColumnVector.java @@ -34,15 +34,19 @@ public final class OffHeapColumnVector extends ColumnVector { // The data stored in these two allocations need to maintain binary compatible. We can // directly pass this buffer to external components. private long nulls; - // The actually data of this column vector will be stored here. If it's an array column vector, - // we will store the offsets and lengths here, and store the element data in child column vector. private long data; + // Set iff the type is array. + private long lengthData; + private long offsetData; + protected OffHeapColumnVector(int capacity, DataType type) { super(capacity, type, MemoryMode.OFF_HEAP); nulls = 0; data = 0; + lengthData = 0; + offsetData = 0; reserveInternal(capacity); reset(); @@ -62,8 +66,12 @@ public long nullsNativeAddress() { public void close() { Platform.freeMemory(nulls); Platform.freeMemory(data); + Platform.freeMemory(lengthData); + Platform.freeMemory(offsetData); nulls = 0; data = 0; + lengthData = 0; + offsetData = 0; } // @@ -387,6 +395,35 @@ public double getDouble(int rowId) { } } + // + // APIs dealing with Arrays. + // + @Override + public void putArray(int rowId, int offset, int length) { + assert(offset >= 0 && offset + length <= childColumns[0].capacity); + Platform.putInt(null, lengthData + 4 * rowId, length); + Platform.putInt(null, offsetData + 4 * rowId, offset); + } + + @Override + public int getArrayLength(int rowId) { + return Platform.getInt(null, lengthData + 4 * rowId); + } + + @Override + public int getArrayOffset(int rowId) { + return Platform.getInt(null, offsetData + 4 * rowId); + } + + // APIs dealing with ByteArrays + @Override + public int putByteArray(int rowId, byte[] value, int offset, int length) { + int result = arrayData().appendBytes(length, value, offset); + Platform.putInt(null, lengthData + 4 * rowId, length); + Platform.putInt(null, offsetData + 4 * rowId, result); + return result; + } + @Override public void loadBytes(ColumnVector.Array array) { if (array.tmpByteArray.length < array.length) array.tmpByteArray = new byte[array.length]; @@ -401,8 +438,10 @@ public void loadBytes(ColumnVector.Array array) { protected void reserveInternal(int newCapacity) { int oldCapacity = (this.data == 0L) ? 0 : capacity; if (this.resultArray != null) { - // need a long as offset and length for each array. - this.data = Platform.reallocateMemory(data, oldCapacity * 8, newCapacity * 8); + this.lengthData = + Platform.reallocateMemory(lengthData, oldCapacity * 4, newCapacity * 4); + this.offsetData = + Platform.reallocateMemory(offsetData, oldCapacity * 4, newCapacity * 4); } else if (type instanceof ByteType || type instanceof BooleanType) { this.data = Platform.reallocateMemory(data, oldCapacity, newCapacity); } else if (type instanceof ShortType) { diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OnHeapColumnVector.java b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OnHeapColumnVector.java index 4d23405dc7b17..94ed32294cfae 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OnHeapColumnVector.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OnHeapColumnVector.java @@ -43,12 +43,14 @@ public final class OnHeapColumnVector extends ColumnVector { private byte[] byteData; private short[] shortData; private int[] intData; - // This is not only used to store data for long column vector, but also can store offsets and - // lengths for array column vector. private long[] longData; private float[] floatData; private double[] doubleData; + // Only set if type is Array. + private int[] arrayLengths; + private int[] arrayOffsets; + protected OnHeapColumnVector(int capacity, DataType type) { super(capacity, type, MemoryMode.ON_HEAP); reserveInternal(capacity); @@ -364,22 +366,55 @@ public double getDouble(int rowId) { } } + // + // APIs dealing with Arrays + // + + @Override + public int getArrayLength(int rowId) { + return arrayLengths[rowId]; + } + @Override + public int getArrayOffset(int rowId) { + return arrayOffsets[rowId]; + } + + @Override + public void putArray(int rowId, int offset, int length) { + arrayOffsets[rowId] = offset; + arrayLengths[rowId] = length; + } + @Override public void loadBytes(ColumnVector.Array array) { array.byteArray = byteData; array.byteArrayOffset = array.offset; } + // + // APIs dealing with Byte Arrays + // + + @Override + public int putByteArray(int rowId, byte[] value, int offset, int length) { + int result = arrayData().appendBytes(length, value, offset); + arrayOffsets[rowId] = result; + arrayLengths[rowId] = length; + return result; + } + // Spilt this function out since it is the slow path. @Override protected void reserveInternal(int newCapacity) { if (this.resultArray != null || DecimalType.isByteArrayDecimalType(type)) { - // need 1 long as offset and length for each array. - if (longData == null || longData.length < newCapacity) { - long[] newData = new long[newCapacity]; - if (longData != null) System.arraycopy(longData, 0, newData, 0, capacity); - longData = newData; + int[] newLengths = new int[newCapacity]; + int[] newOffsets = new int[newCapacity]; + if (this.arrayLengths != null) { + System.arraycopy(this.arrayLengths, 0, newLengths, 0, capacity); + System.arraycopy(this.arrayOffsets, 0, newOffsets, 0, capacity); } + arrayLengths = newLengths; + arrayOffsets = newOffsets; } else if (type instanceof BooleanType) { if (byteData == null || byteData.length < newCapacity) { byte[] newData = new byte[newCapacity]; diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnarBatchSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnarBatchSuite.scala index 5c4128a70dd86..e48e3f6402901 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnarBatchSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnarBatchSuite.scala @@ -631,7 +631,7 @@ class ColumnarBatchSuite extends SparkFunSuite { assert(column.arrayData().elementsAppended == 17) // Put the same "ll" at offset. This should not allocate more memory in the column. - column.putArrayOffsetAndSize(idx, offset, 2) + column.putArray(idx, offset, 2) reference += "ll" idx += 1 assert(column.arrayData().elementsAppended == 17) @@ -644,8 +644,7 @@ class ColumnarBatchSuite extends SparkFunSuite { assert(column.arrayData().elementsAppended == 17 + (s + s).length) reference.zipWithIndex.foreach { v => - val offsetAndLength = column.getLong(v._2) - assert(v._1.length == offsetAndLength.toInt, "MemoryMode=" + memMode) + assert(v._1.length == column.getArrayLength(v._2), "MemoryMode=" + memMode) assert(v._1 == column.getUTF8String(v._2).toString, "MemoryMode" + memMode) } @@ -660,7 +659,7 @@ class ColumnarBatchSuite extends SparkFunSuite { val column = ColumnVector.allocate(10, new ArrayType(IntegerType, true), memMode) // Fill the underlying data with all the arrays back to back. - val data = column.arrayData() + val data = column.arrayData(); var i = 0 while (i < 6) { data.putInt(i, i) @@ -668,10 +667,10 @@ class ColumnarBatchSuite extends SparkFunSuite { } // Populate it with arrays [0], [1, 2], [], [3, 4, 5] - column.putArrayOffsetAndSize(0, 0, 1) - column.putArrayOffsetAndSize(1, 1, 2) - column.putArrayOffsetAndSize(2, 3, 0) - column.putArrayOffsetAndSize(3, 3, 3) + column.putArray(0, 0, 1) + column.putArray(1, 1, 2) + column.putArray(2, 2, 0) + column.putArray(3, 3, 3) val a1 = ColumnVectorUtils.toPrimitiveJavaArray(column.getArray(0)).asInstanceOf[Array[Int]] val a2 = ColumnVectorUtils.toPrimitiveJavaArray(column.getArray(1)).asInstanceOf[Array[Int]] @@ -704,7 +703,7 @@ class ColumnarBatchSuite extends SparkFunSuite { data.reserve(array.length) assert(data.capacity == array.length * 2) data.putInts(0, array.length, array, 0) - column.putArrayOffsetAndSize(0, 0, array.length) + column.putArray(0, 0, array.length) assert(ColumnVectorUtils.toPrimitiveJavaArray(column.getArray(0)).asInstanceOf[Array[Int]] === array) }}