Skip to content

Commit

Permalink
[SPARK-1402] Added 3 more compression schemes
Browse files Browse the repository at this point in the history
JIRA issue: [SPARK-1402](https://issues.apache.org/jira/browse/SPARK-1402)

This PR provides 3 more compression schemes for Spark SQL in-memory columnar storage:

* `BooleanBitSet`
* `IntDelta`
* `LongDelta`

Now there are 6 compression schemes in total, including the no-op `PassThrough` scheme.

Also fixed a bug in PR apache#286: not all compression schemes are added as available schemes when accessing an in-memory column, and when a column is compressed with an unrecognised scheme, `ColumnAccessor` throws exception.

Author: Cheng Lian <[email protected]>

Closes apache#330 from liancheng/moreCompressionSchemes and squashes the following commits:

1d037b8 [Cheng Lian] Fixed SPARK-1436: in-memory column byte buffer must be able to be accessed multiple times
d7c0e8f [Cheng Lian] Added test suite for IntegralDelta (IntDelta & LongDelta)
3c1ad7a [Cheng Lian] Added test suite for BooleanBitSet, refactored other test suites
44fe4b2 [Cheng Lian] Refactored CompressionScheme, added 3 more compression schemes.
  • Loading branch information
liancheng authored and rxin committed Apr 8, 2014
1 parent f27e56a commit 0d0493f
Show file tree
Hide file tree
Showing 11 changed files with 586 additions and 179 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -100,20 +100,21 @@ private[sql] class GenericColumnAccessor(buffer: ByteBuffer)

private[sql] object ColumnAccessor {
def apply(buffer: ByteBuffer): ColumnAccessor = {
val dup = buffer.duplicate().order(ByteOrder.nativeOrder)
// The first 4 bytes in the buffer indicate the column type.
val columnTypeId = buffer.getInt()
val columnTypeId = dup.getInt()

columnTypeId match {
case INT.typeId => new IntColumnAccessor(buffer)
case LONG.typeId => new LongColumnAccessor(buffer)
case FLOAT.typeId => new FloatColumnAccessor(buffer)
case DOUBLE.typeId => new DoubleColumnAccessor(buffer)
case BOOLEAN.typeId => new BooleanColumnAccessor(buffer)
case BYTE.typeId => new ByteColumnAccessor(buffer)
case SHORT.typeId => new ShortColumnAccessor(buffer)
case STRING.typeId => new StringColumnAccessor(buffer)
case BINARY.typeId => new BinaryColumnAccessor(buffer)
case GENERIC.typeId => new GenericColumnAccessor(buffer)
case INT.typeId => new IntColumnAccessor(dup)
case LONG.typeId => new LongColumnAccessor(dup)
case FLOAT.typeId => new FloatColumnAccessor(dup)
case DOUBLE.typeId => new DoubleColumnAccessor(dup)
case BOOLEAN.typeId => new BooleanColumnAccessor(dup)
case BYTE.typeId => new ByteColumnAccessor(dup)
case SHORT.typeId => new ShortColumnAccessor(dup)
case STRING.typeId => new StringColumnAccessor(dup)
case BINARY.typeId => new BinaryColumnAccessor(dup)
case GENERIC.typeId => new GenericColumnAccessor(dup)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,12 @@ package org.apache.spark.sql.columnar
import org.apache.spark.sql.Row
import org.apache.spark.sql.catalyst.types._

/**
* Used to collect statistical information when building in-memory columns.
*
* NOTE: we intentionally avoid using `Ordering[T]` to compare values here because `Ordering[T]`
* brings significant performance penalty.
*/
private[sql] sealed abstract class ColumnStats[T <: DataType, JvmType] extends Serializable {
/**
* Closed lower bound of this column.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,9 +47,9 @@ private[sql] trait CompressibleColumnBuilder[T <: NativeType]

import CompressionScheme._

val compressionEncoders = schemes.filter(_.supports(columnType)).map(_.encoder)
val compressionEncoders = schemes.filter(_.supports(columnType)).map(_.encoder[T])

protected def isWorthCompressing(encoder: Encoder) = {
protected def isWorthCompressing(encoder: Encoder[T]) = {
encoder.compressionRatio < 0.8
}

Expand All @@ -70,7 +70,7 @@ private[sql] trait CompressibleColumnBuilder[T <: NativeType]

abstract override def build() = {
val rawBuffer = super.build()
val encoder = {
val encoder: Encoder[T] = {
val candidate = compressionEncoders.minBy(_.compressionRatio)
if (isWorthCompressing(candidate)) candidate else PassThrough.encoder
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,8 @@ import java.nio.ByteBuffer
import org.apache.spark.sql.catalyst.types.NativeType
import org.apache.spark.sql.columnar.{ColumnType, NativeColumnType}

private[sql] trait Encoder {
def gatherCompressibilityStats[T <: NativeType](
value: T#JvmType,
columnType: ColumnType[T, T#JvmType]) {}
private[sql] trait Encoder[T <: NativeType] {
def gatherCompressibilityStats(value: T#JvmType, columnType: NativeColumnType[T]) {}

def compressedSize: Int

Expand All @@ -35,10 +33,7 @@ private[sql] trait Encoder {
if (uncompressedSize > 0) compressedSize.toDouble / uncompressedSize else 1.0
}

def compress[T <: NativeType](
from: ByteBuffer,
to: ByteBuffer,
columnType: ColumnType[T, T#JvmType]): ByteBuffer
def compress(from: ByteBuffer, to: ByteBuffer, columnType: NativeColumnType[T]): ByteBuffer
}

private[sql] trait Decoder[T <: NativeType] extends Iterator[T#JvmType]
Expand All @@ -48,7 +43,7 @@ private[sql] trait CompressionScheme {

def supports(columnType: ColumnType[_, _]): Boolean

def encoder: Encoder
def encoder[T <: NativeType]: Encoder[T]

def decoder[T <: NativeType](buffer: ByteBuffer, columnType: NativeColumnType[T]): Decoder[T]
}
Expand All @@ -58,15 +53,18 @@ private[sql] trait WithCompressionSchemes {
}

private[sql] trait AllCompressionSchemes extends WithCompressionSchemes {
override val schemes: Seq[CompressionScheme] = {
Seq(PassThrough, RunLengthEncoding, DictionaryEncoding)
}
override val schemes: Seq[CompressionScheme] = CompressionScheme.all
}

private[sql] object CompressionScheme {
def apply(typeId: Int): CompressionScheme = typeId match {
case PassThrough.typeId => PassThrough
case _ => throw new UnsupportedOperationException()
val all: Seq[CompressionScheme] =
Seq(PassThrough, RunLengthEncoding, DictionaryEncoding, BooleanBitSet, IntDelta, LongDelta)

private val typeIdToScheme = all.map(scheme => scheme.typeId -> scheme).toMap

def apply(typeId: Int): CompressionScheme = {
typeIdToScheme.getOrElse(typeId, throw new UnsupportedOperationException(
s"Unrecognized compression scheme type ID: $typeId"))
}

def copyColumnHeader(from: ByteBuffer, to: ByteBuffer) {
Expand Down
Loading

0 comments on commit 0d0493f

Please sign in to comment.