Skip to content

Commit

Permalink
Refactored ColumnAccessors & ColumnBuilders to remove duplicate code
Browse files Browse the repository at this point in the history
Primitive setters/getters for (Mutable)Rows are moved to ColumnTypes.
  • Loading branch information
liancheng committed Apr 1, 2014
1 parent ada310a commit 85cc59b
Show file tree
Hide file tree
Showing 4 changed files with 128 additions and 155 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import java.nio.{ByteOrder, ByteBuffer}

import org.apache.spark.sql.catalyst.types.{BinaryType, NativeType, DataType}
import org.apache.spark.sql.catalyst.expressions.MutableRow
import org.apache.spark.sql.execution.SparkSqlSerializer

/**
* An `Iterator` like trait used to extract values from columnar byte buffer. When a value is
Expand All @@ -41,116 +40,61 @@ private[sql] trait ColumnAccessor {
protected def underlyingBuffer: ByteBuffer
}

private[sql] abstract class BasicColumnAccessor[T <: DataType, JvmType](buffer: ByteBuffer)
private[sql] abstract class BasicColumnAccessor[T <: DataType, JvmType](
buffer: ByteBuffer, columnType: ColumnType[T, JvmType])
extends ColumnAccessor {

protected def initialize() {}

def columnType: ColumnType[T, JvmType]

def hasNext = buffer.hasRemaining

def extractTo(row: MutableRow, ordinal: Int) {
doExtractTo(row, ordinal)
columnType.setField(row, ordinal, columnType.extract(buffer))
}

protected def doExtractTo(row: MutableRow, ordinal: Int)

protected def underlyingBuffer = buffer
}

private[sql] abstract class NativeColumnAccessor[T <: NativeType](
buffer: ByteBuffer,
val columnType: NativeColumnType[T])
extends BasicColumnAccessor[T, T#JvmType](buffer)
with NullableColumnAccessor

private[sql] class BooleanColumnAccessor(buffer: ByteBuffer)
extends NativeColumnAccessor(buffer, BOOLEAN) {
extends BasicColumnAccessor[T, T#JvmType](buffer, columnType)
with NullableColumnAccessor {

override protected def doExtractTo(row: MutableRow, ordinal: Int) {
row.setBoolean(ordinal, columnType.extract(buffer))
}
type JvmType = T#JvmType
}

private[sql] class IntColumnAccessor(buffer: ByteBuffer)
extends NativeColumnAccessor(buffer, INT) {
private[sql] class BooleanColumnAccessor(buffer: ByteBuffer)
extends NativeColumnAccessor(buffer, BOOLEAN)

override protected def doExtractTo(row: MutableRow, ordinal: Int) {
row.setInt(ordinal, columnType.extract(buffer))
}
}
private[sql] class IntColumnAccessor(buffer: ByteBuffer)
extends NativeColumnAccessor(buffer, INT)

private[sql] class ShortColumnAccessor(buffer: ByteBuffer)
extends NativeColumnAccessor(buffer, SHORT) {

override protected def doExtractTo(row: MutableRow, ordinal: Int) {
row.setShort(ordinal, columnType.extract(buffer))
}
}
extends NativeColumnAccessor(buffer, SHORT)

private[sql] class LongColumnAccessor(buffer: ByteBuffer)
extends NativeColumnAccessor(buffer, LONG) {

override protected def doExtractTo(row: MutableRow, ordinal: Int) {
row.setLong(ordinal, columnType.extract(buffer))
}
}
extends NativeColumnAccessor(buffer, LONG)

private[sql] class ByteColumnAccessor(buffer: ByteBuffer)
extends NativeColumnAccessor(buffer, BYTE) {

override protected def doExtractTo(row: MutableRow, ordinal: Int) {
row.setByte(ordinal, columnType.extract(buffer))
}
}
extends NativeColumnAccessor(buffer, BYTE)

private[sql] class DoubleColumnAccessor(buffer: ByteBuffer)
extends NativeColumnAccessor(buffer, DOUBLE) {

override protected def doExtractTo(row: MutableRow, ordinal: Int) {
row.setDouble(ordinal, columnType.extract(buffer))
}
}
extends NativeColumnAccessor(buffer, DOUBLE)

private[sql] class FloatColumnAccessor(buffer: ByteBuffer)
extends NativeColumnAccessor(buffer, FLOAT) {

override protected def doExtractTo(row: MutableRow, ordinal: Int) {
row.setFloat(ordinal, columnType.extract(buffer))
}
}
extends NativeColumnAccessor(buffer, FLOAT)

private[sql] class StringColumnAccessor(buffer: ByteBuffer)
extends NativeColumnAccessor(buffer, STRING) {

override protected def doExtractTo(row: MutableRow, ordinal: Int) {
row.setString(ordinal, columnType.extract(buffer))
}
}
extends NativeColumnAccessor(buffer, STRING)

private[sql] class BinaryColumnAccessor(buffer: ByteBuffer)
extends BasicColumnAccessor[BinaryType.type, Array[Byte]](buffer)
with NullableColumnAccessor {

def columnType = BINARY

override protected def doExtractTo(row: MutableRow, ordinal: Int) {
row(ordinal) = columnType.extract(buffer)
}
}
extends BasicColumnAccessor[BinaryType.type, Array[Byte]](buffer, BINARY)
with NullableColumnAccessor

private[sql] class GenericColumnAccessor(buffer: ByteBuffer)
extends BasicColumnAccessor[DataType, Array[Byte]](buffer)
with NullableColumnAccessor {

def columnType = GENERIC

override protected def doExtractTo(row: MutableRow, ordinal: Int) {
val serialized = columnType.extract(buffer)
row(ordinal) = SparkSqlSerializer.deserialize[Any](serialized)
}
}
extends BasicColumnAccessor[DataType, Array[Byte]](buffer, GENERIC)
with NullableColumnAccessor

private[sql] object ColumnAccessor {
def apply(b: ByteBuffer): ColumnAccessor = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,25 +22,27 @@ import java.nio.{ByteBuffer, ByteOrder}
import org.apache.spark.sql.Row
import org.apache.spark.sql.catalyst.types._
import org.apache.spark.sql.columnar.ColumnBuilder._
import org.apache.spark.sql.execution.SparkSqlSerializer

private[sql] trait ColumnBuilder {
/**
* Initializes with an approximate lower bound on the expected number of elements in this column.
*/
def initialize(initialSize: Int, columnName: String = "")

def gatherStats(row: Row, ordinal: Int) {}

def appendFrom(row: Row, ordinal: Int)

def build(): ByteBuffer
}

private[sql] abstract class BasicColumnBuilder[T <: DataType, JvmType] extends ColumnBuilder {
private[sql] abstract class BasicColumnBuilder[T <: DataType, JvmType](
val columnType: ColumnType[T, JvmType])
extends ColumnBuilder {

private var columnName: String = _
protected var buffer: ByteBuffer = _

def columnType: ColumnType[T, JvmType]
protected var buffer: ByteBuffer = _

override def initialize(initialSize: Int, columnName: String = "") = {
val size = if (initialSize == 0) DEFAULT_INITIAL_BUFFER_SIZE else initialSize
Expand All @@ -49,18 +51,10 @@ private[sql] abstract class BasicColumnBuilder[T <: DataType, JvmType] extends C
buffer.order(ByteOrder.nativeOrder()).putInt(columnType.typeId)
}

// Have to give a concrete implementation to make mixin possible
override def appendFrom(row: Row, ordinal: Int) {
doAppendFrom(row, ordinal)
}

// Concrete `ColumnBuilder`s can override this method to append values
protected def doAppendFrom(row: Row, ordinal: Int)

// Helper method to append primitive values (to avoid boxing cost)
protected def appendValue(v: JvmType) {
buffer = ensureFreeSpace(buffer, columnType.actualSize(v))
columnType.append(v, buffer)
val field = columnType.getField(row, ordinal)
buffer = ensureFreeSpace(buffer, columnType.actualSize(field))
columnType.append(field, buffer)
}

override def build() = {
Expand All @@ -70,82 +64,41 @@ private[sql] abstract class BasicColumnBuilder[T <: DataType, JvmType] extends C
}

private[sql] abstract class NativeColumnBuilder[T <: NativeType](
val columnType: NativeColumnType[T])
extends BasicColumnBuilder[T, T#JvmType]
with NullableColumnBuilder
protected val columnStats: ColumnStats[T],
columnType: NativeColumnType[T])
extends BasicColumnBuilder[T, T#JvmType](columnType)
with NullableColumnBuilder {

private[sql] class BooleanColumnBuilder extends NativeColumnBuilder(BOOLEAN) {
override def doAppendFrom(row: Row, ordinal: Int) {
appendValue(row.getBoolean(ordinal))
override def gatherStats(row: Row, ordinal: Int) {
columnStats.gatherStats(row, ordinal)
}
}

private[sql] class IntColumnBuilder extends NativeColumnBuilder(INT) {
override def doAppendFrom(row: Row, ordinal: Int) {
appendValue(row.getInt(ordinal))
}
}
private[sql] class BooleanColumnBuilder
extends NativeColumnBuilder(new BooleanColumnStats, BOOLEAN)

private[sql] class ShortColumnBuilder extends NativeColumnBuilder(SHORT) {
override def doAppendFrom(row: Row, ordinal: Int) {
appendValue(row.getShort(ordinal))
}
}
private[sql] class IntColumnBuilder extends NativeColumnBuilder(new IntColumnStats, INT)

private[sql] class LongColumnBuilder extends NativeColumnBuilder(LONG) {
override def doAppendFrom(row: Row, ordinal: Int) {
appendValue(row.getLong(ordinal))
}
}
private[sql] class ShortColumnBuilder extends NativeColumnBuilder(new ShortColumnStats, SHORT)

private[sql] class ByteColumnBuilder extends NativeColumnBuilder(BYTE) {
override def doAppendFrom(row: Row, ordinal: Int) {
appendValue(row.getByte(ordinal))
}
}
private[sql] class LongColumnBuilder extends NativeColumnBuilder(new LongColumnStats, LONG)

private[sql] class DoubleColumnBuilder extends NativeColumnBuilder(DOUBLE) {
override def doAppendFrom(row: Row, ordinal: Int) {
appendValue(row.getDouble(ordinal))
}
}
private[sql] class ByteColumnBuilder extends NativeColumnBuilder(new ByteColumnStats, BYTE)

private[sql] class FloatColumnBuilder extends NativeColumnBuilder(FLOAT) {
override def doAppendFrom(row: Row, ordinal: Int) {
appendValue(row.getFloat(ordinal))
}
}
private[sql] class DoubleColumnBuilder extends NativeColumnBuilder(new DoubleColumnStats, DOUBLE)

private[sql] class StringColumnBuilder extends NativeColumnBuilder(STRING) {
override def doAppendFrom(row: Row, ordinal: Int) {
appendValue(row.getString(ordinal))
}
}
private[sql] class FloatColumnBuilder extends NativeColumnBuilder(new FloatColumnStats, FLOAT)

private[sql] class BinaryColumnBuilder
extends BasicColumnBuilder[BinaryType.type, Array[Byte]]
with NullableColumnBuilder {

def columnType = BINARY
private[sql] class StringColumnBuilder extends NativeColumnBuilder(new StringColumnStates, STRING)

override def doAppendFrom(row: Row, ordinal: Int) {
appendValue(row(ordinal).asInstanceOf[Array[Byte]])
}
}
private[sql] class BinaryColumnBuilder
extends BasicColumnBuilder[BinaryType.type, Array[Byte]](BINARY)
with NullableColumnBuilder

// TODO (lian) Add support for array, struct and map
private[sql] class GenericColumnBuilder
extends BasicColumnBuilder[DataType, Array[Byte]]
with NullableColumnBuilder {

def columnType = GENERIC

override def doAppendFrom(row: Row, ordinal: Int) {
val serialized = SparkSqlSerializer.serialize(row(ordinal))
buffer = ColumnBuilder.ensureFreeSpace(buffer, columnType.actualSize(serialized))
columnType.append(serialized, buffer)
}
}
extends BasicColumnBuilder[DataType, Array[Byte]](GENERIC)
with NullableColumnBuilder

private[sql] object ColumnBuilder {
val DEFAULT_INITIAL_BUFFER_SIZE = 10 * 1024 * 104
Expand Down
Loading

0 comments on commit 85cc59b

Please sign in to comment.