Skip to content

Commit

Permalink
Revert "[SPARK-21046][SQL] simplify the array offset and length in Co…
Browse files Browse the repository at this point in the history
…lumnVector"

This reverts commit 22dd65f.
  • Loading branch information
cloud-fan committed Jun 13, 2017
1 parent 74a432d commit fc0e694
Show file tree
Hide file tree
Showing 4 changed files with 110 additions and 38 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.execution.vectorized;

import java.math.BigDecimal;
Expand Down Expand Up @@ -519,13 +518,19 @@ private void throwUnsupportedException(int requiredCapacity, Throwable cause) {
public abstract double getDouble(int rowId);

/**
* After writing array elements to the child column vector, call this method to set the offset and
* size of the written array.
* Puts a byte array that already exists in this column.
*/
public void putArrayOffsetAndSize(int rowId, int offset, int size) {
long offsetAndSize = (((long) offset) << 32) | size;
putLong(rowId, offsetAndSize);
}
public abstract void putArray(int rowId, int offset, int length);

/**
* Returns the length of the array at rowid.
*/
public abstract int getArrayLength(int rowId);

/**
* Returns the offset of the array at rowid.
*/
public abstract int getArrayOffset(int rowId);

/**
* Returns a utility object to get structs.
Expand All @@ -548,9 +553,8 @@ public ColumnarBatch.Row getStruct(int rowId, int size) {
* Returns the array at rowid.
*/
public final Array getArray(int rowId) {
long offsetAndSize = getLong(rowId);
resultArray.offset = (int) (offsetAndSize >> 32);
resultArray.length = (int) offsetAndSize;
resultArray.length = getArrayLength(rowId);
resultArray.offset = getArrayOffset(rowId);
return resultArray;
}

Expand All @@ -562,12 +566,7 @@ public final Array getArray(int rowId) {
/**
* Sets the value at rowId to `value`.
*/
public int putByteArray(int rowId, byte[] value, int offset, int length) {
int result = arrayData().appendBytes(length, value, offset);
putArrayOffsetAndSize(rowId, result, length);
return result;
}

public abstract int putByteArray(int rowId, byte[] value, int offset, int count);
public final int putByteArray(int rowId, byte[] value) {
return putByteArray(rowId, value, 0, value.length);
}
Expand Down Expand Up @@ -830,13 +829,13 @@ public final int appendDoubles(int length, double[] src, int offset) {
public final int appendByteArray(byte[] value, int offset, int length) {
int copiedOffset = arrayData().appendBytes(length, value, offset);
reserve(elementsAppended + 1);
putArrayOffsetAndSize(elementsAppended, copiedOffset, length);
putArray(elementsAppended, copiedOffset, length);
return elementsAppended++;
}

public final int appendArray(int length) {
reserve(elementsAppended + 1);
putArrayOffsetAndSize(elementsAppended, arrayData().elementsAppended, length);
putArray(elementsAppended, arrayData().elementsAppended, length);
return elementsAppended++;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,15 +34,19 @@ public final class OffHeapColumnVector extends ColumnVector {
// The data stored in these two allocations need to maintain binary compatible. We can
// directly pass this buffer to external components.
private long nulls;
// The actually data of this column vector will be stored here. If it's an array column vector,
// we will store the offsets and lengths here, and store the element data in child column vector.
private long data;

// Set iff the type is array.
private long lengthData;
private long offsetData;

protected OffHeapColumnVector(int capacity, DataType type) {
super(capacity, type, MemoryMode.OFF_HEAP);

nulls = 0;
data = 0;
lengthData = 0;
offsetData = 0;

reserveInternal(capacity);
reset();
Expand All @@ -62,8 +66,12 @@ public long nullsNativeAddress() {
public void close() {
Platform.freeMemory(nulls);
Platform.freeMemory(data);
Platform.freeMemory(lengthData);
Platform.freeMemory(offsetData);
nulls = 0;
data = 0;
lengthData = 0;
offsetData = 0;
}

//
Expand Down Expand Up @@ -387,6 +395,35 @@ public double getDouble(int rowId) {
}
}

//
// APIs dealing with Arrays.
//
@Override
public void putArray(int rowId, int offset, int length) {
assert(offset >= 0 && offset + length <= childColumns[0].capacity);
Platform.putInt(null, lengthData + 4 * rowId, length);
Platform.putInt(null, offsetData + 4 * rowId, offset);
}

@Override
public int getArrayLength(int rowId) {
return Platform.getInt(null, lengthData + 4 * rowId);
}

@Override
public int getArrayOffset(int rowId) {
return Platform.getInt(null, offsetData + 4 * rowId);
}

// APIs dealing with ByteArrays
@Override
public int putByteArray(int rowId, byte[] value, int offset, int length) {
int result = arrayData().appendBytes(length, value, offset);
Platform.putInt(null, lengthData + 4 * rowId, length);
Platform.putInt(null, offsetData + 4 * rowId, result);
return result;
}

@Override
public void loadBytes(ColumnVector.Array array) {
if (array.tmpByteArray.length < array.length) array.tmpByteArray = new byte[array.length];
Expand All @@ -401,8 +438,10 @@ public void loadBytes(ColumnVector.Array array) {
protected void reserveInternal(int newCapacity) {
int oldCapacity = (this.data == 0L) ? 0 : capacity;
if (this.resultArray != null) {
// need a long as offset and length for each array.
this.data = Platform.reallocateMemory(data, oldCapacity * 8, newCapacity * 8);
this.lengthData =
Platform.reallocateMemory(lengthData, oldCapacity * 4, newCapacity * 4);
this.offsetData =
Platform.reallocateMemory(offsetData, oldCapacity * 4, newCapacity * 4);
} else if (type instanceof ByteType || type instanceof BooleanType) {
this.data = Platform.reallocateMemory(data, oldCapacity, newCapacity);
} else if (type instanceof ShortType) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,12 +43,14 @@ public final class OnHeapColumnVector extends ColumnVector {
private byte[] byteData;
private short[] shortData;
private int[] intData;
// This is not only used to store data for long column vector, but also can store offsets and
// lengths for array column vector.
private long[] longData;
private float[] floatData;
private double[] doubleData;

// Only set if type is Array.
private int[] arrayLengths;
private int[] arrayOffsets;

protected OnHeapColumnVector(int capacity, DataType type) {
super(capacity, type, MemoryMode.ON_HEAP);
reserveInternal(capacity);
Expand Down Expand Up @@ -364,22 +366,55 @@ public double getDouble(int rowId) {
}
}

//
// APIs dealing with Arrays
//

@Override
public int getArrayLength(int rowId) {
return arrayLengths[rowId];
}
@Override
public int getArrayOffset(int rowId) {
return arrayOffsets[rowId];
}

@Override
public void putArray(int rowId, int offset, int length) {
arrayOffsets[rowId] = offset;
arrayLengths[rowId] = length;
}

@Override
public void loadBytes(ColumnVector.Array array) {
array.byteArray = byteData;
array.byteArrayOffset = array.offset;
}

//
// APIs dealing with Byte Arrays
//

@Override
public int putByteArray(int rowId, byte[] value, int offset, int length) {
int result = arrayData().appendBytes(length, value, offset);
arrayOffsets[rowId] = result;
arrayLengths[rowId] = length;
return result;
}

// Spilt this function out since it is the slow path.
@Override
protected void reserveInternal(int newCapacity) {
if (this.resultArray != null || DecimalType.isByteArrayDecimalType(type)) {
// need 1 long as offset and length for each array.
if (longData == null || longData.length < newCapacity) {
long[] newData = new long[newCapacity];
if (longData != null) System.arraycopy(longData, 0, newData, 0, capacity);
longData = newData;
int[] newLengths = new int[newCapacity];
int[] newOffsets = new int[newCapacity];
if (this.arrayLengths != null) {
System.arraycopy(this.arrayLengths, 0, newLengths, 0, capacity);
System.arraycopy(this.arrayOffsets, 0, newOffsets, 0, capacity);
}
arrayLengths = newLengths;
arrayOffsets = newOffsets;
} else if (type instanceof BooleanType) {
if (byteData == null || byteData.length < newCapacity) {
byte[] newData = new byte[newCapacity];
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -631,7 +631,7 @@ class ColumnarBatchSuite extends SparkFunSuite {
assert(column.arrayData().elementsAppended == 17)

// Put the same "ll" at offset. This should not allocate more memory in the column.
column.putArrayOffsetAndSize(idx, offset, 2)
column.putArray(idx, offset, 2)
reference += "ll"
idx += 1
assert(column.arrayData().elementsAppended == 17)
Expand All @@ -644,8 +644,7 @@ class ColumnarBatchSuite extends SparkFunSuite {
assert(column.arrayData().elementsAppended == 17 + (s + s).length)

reference.zipWithIndex.foreach { v =>
val offsetAndLength = column.getLong(v._2)
assert(v._1.length == offsetAndLength.toInt, "MemoryMode=" + memMode)
assert(v._1.length == column.getArrayLength(v._2), "MemoryMode=" + memMode)
assert(v._1 == column.getUTF8String(v._2).toString,
"MemoryMode" + memMode)
}
Expand All @@ -660,18 +659,18 @@ class ColumnarBatchSuite extends SparkFunSuite {
val column = ColumnVector.allocate(10, new ArrayType(IntegerType, true), memMode)

// Fill the underlying data with all the arrays back to back.
val data = column.arrayData()
val data = column.arrayData();
var i = 0
while (i < 6) {
data.putInt(i, i)
i += 1
}

// Populate it with arrays [0], [1, 2], [], [3, 4, 5]
column.putArrayOffsetAndSize(0, 0, 1)
column.putArrayOffsetAndSize(1, 1, 2)
column.putArrayOffsetAndSize(2, 3, 0)
column.putArrayOffsetAndSize(3, 3, 3)
column.putArray(0, 0, 1)
column.putArray(1, 1, 2)
column.putArray(2, 2, 0)
column.putArray(3, 3, 3)

val a1 = ColumnVectorUtils.toPrimitiveJavaArray(column.getArray(0)).asInstanceOf[Array[Int]]
val a2 = ColumnVectorUtils.toPrimitiveJavaArray(column.getArray(1)).asInstanceOf[Array[Int]]
Expand Down Expand Up @@ -704,7 +703,7 @@ class ColumnarBatchSuite extends SparkFunSuite {
data.reserve(array.length)
assert(data.capacity == array.length * 2)
data.putInts(0, array.length, array, 0)
column.putArrayOffsetAndSize(0, 0, array.length)
column.putArray(0, 0, array.length)
assert(ColumnVectorUtils.toPrimitiveJavaArray(column.getArray(0)).asInstanceOf[Array[Int]]
=== array)
}}
Expand Down

0 comments on commit fc0e694

Please sign in to comment.