Skip to content

Commit

Permalink
Refactored CompressionScheme, added 3 more compression schemes.
Browse files Browse the repository at this point in the history
New schemes: BooleanBitSet, IntDelta and LongDelta
  • Loading branch information
liancheng committed Apr 5, 2014
1 parent 1347ebd commit 44fe4b2
Show file tree
Hide file tree
Showing 6 changed files with 275 additions and 106 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,12 @@ package org.apache.spark.sql.columnar
import org.apache.spark.sql.Row
import org.apache.spark.sql.catalyst.types._

/**
* Used to collect statistical information when building in-memory columns.
*
* NOTE: we intentionally avoid using `Ordering[T]` to compare values here because `Ordering[T]`
* brings significant performance penalty.
*/
private[sql] sealed abstract class ColumnStats[T <: DataType, JvmType] extends Serializable {
/**
* Closed lower bound of this column.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,9 +47,9 @@ private[sql] trait CompressibleColumnBuilder[T <: NativeType]

import CompressionScheme._

val compressionEncoders = schemes.filter(_.supports(columnType)).map(_.encoder)
val compressionEncoders = schemes.filter(_.supports(columnType)).map(_.encoder[T])

protected def isWorthCompressing(encoder: Encoder) = {
protected def isWorthCompressing(encoder: Encoder[T]) = {
encoder.compressionRatio < 0.8
}

Expand All @@ -70,7 +70,7 @@ private[sql] trait CompressibleColumnBuilder[T <: NativeType]

abstract override def build() = {
val rawBuffer = super.build()
val encoder = {
val encoder: Encoder[T] = {
val candidate = compressionEncoders.minBy(_.compressionRatio)
if (isWorthCompressing(candidate)) candidate else PassThrough.encoder
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,8 @@ import java.nio.ByteBuffer
import org.apache.spark.sql.catalyst.types.NativeType
import org.apache.spark.sql.columnar.{ColumnType, NativeColumnType}

private[sql] trait Encoder {
def gatherCompressibilityStats[T <: NativeType](
value: T#JvmType,
columnType: ColumnType[T, T#JvmType]) {}
private[sql] trait Encoder[T <: NativeType] {
def gatherCompressibilityStats(value: T#JvmType, columnType: NativeColumnType[T]) {}

def compressedSize: Int

Expand All @@ -35,10 +33,7 @@ private[sql] trait Encoder {
if (uncompressedSize > 0) compressedSize.toDouble / uncompressedSize else 1.0
}

def compress[T <: NativeType](
from: ByteBuffer,
to: ByteBuffer,
columnType: ColumnType[T, T#JvmType]): ByteBuffer
def compress(from: ByteBuffer, to: ByteBuffer, columnType: NativeColumnType[T]): ByteBuffer
}

private[sql] trait Decoder[T <: NativeType] extends Iterator[T#JvmType]
Expand All @@ -48,7 +43,7 @@ private[sql] trait CompressionScheme {

def supports(columnType: ColumnType[_, _]): Boolean

def encoder: Encoder
def encoder[T <: NativeType]: Encoder[T]

def decoder[T <: NativeType](buffer: ByteBuffer, columnType: NativeColumnType[T]): Decoder[T]
}
Expand All @@ -58,15 +53,18 @@ private[sql] trait WithCompressionSchemes {
}

private[sql] trait AllCompressionSchemes extends WithCompressionSchemes {
override val schemes: Seq[CompressionScheme] = {
Seq(PassThrough, RunLengthEncoding, DictionaryEncoding)
}
override val schemes: Seq[CompressionScheme] = CompressionScheme.all
}

private[sql] object CompressionScheme {
def apply(typeId: Int): CompressionScheme = typeId match {
case PassThrough.typeId => PassThrough
case _ => throw new UnsupportedOperationException()
val all: Seq[CompressionScheme] =
Seq(PassThrough, RunLengthEncoding, DictionaryEncoding, BooleanBitSet, IntDelta, LongDelta)

private val typeIdToScheme = all.map(scheme => scheme.typeId -> scheme).toMap

def apply(typeId: Int): CompressionScheme = {
typeIdToScheme.getOrElse(typeId, throw new UnsupportedOperationException(
s"Unrecognized compression scheme type ID: $typeId"))
}

def copyColumnHeader(from: ByteBuffer, to: ByteBuffer) {
Expand Down
Loading

0 comments on commit 44fe4b2

Please sign in to comment.