Skip to content

Commit

Permalink
Fixed bugs in IntegralDelta
Browse files Browse the repository at this point in the history
  • Loading branch information
liancheng committed Apr 11, 2014
1 parent 7b4203a commit 95b3301
Show file tree
Hide file tree
Showing 2 changed files with 19 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -396,26 +396,27 @@ private[sql] sealed abstract class IntegralDelta[I <: IntegralType] extends Comp

if (initial) {
initial = false
prev = value
_compressedSize += 1 + columnType.defaultSize
} else {
val (smallEnough, _) = byteSizedDelta(value, prev)
_compressedSize += (if (smallEnough) 1 else 1 + columnType.defaultSize)
}

prev = value
}

override def compress(from: ByteBuffer, to: ByteBuffer, columnType: NativeColumnType[I]) = {
to.putInt(typeId)

if (from.hasRemaining) {
val prev = columnType.extract(from)

var prev = columnType.extract(from)
to.put(Byte.MinValue)
columnType.append(prev, to)

while (from.hasRemaining) {
val current = columnType.extract(from)
val (smallEnough, delta) = byteSizedDelta(current, prev)
prev = current

if (smallEnough) {
to.put(delta)
Expand All @@ -442,13 +443,8 @@ private[sql] sealed abstract class IntegralDelta[I <: IntegralType] extends Comp

override def next() = {
val delta = buffer.get()

if (delta > Byte.MinValue) {
addDelta(prev, delta)
} else {
prev = columnType.extract(buffer)
prev
}
prev = if (delta > Byte.MinValue) addDelta(prev, delta) else columnType.extract(buffer)
prev
}

override def hasNext = buffer.hasRemaining
Expand All @@ -464,7 +460,7 @@ private[sql] case object IntDelta extends IntegralDelta[IntegerType.type] {

override protected def byteSizedDelta(x: Int, y: Int): (Boolean, Byte) = {
val delta = x - y
if (delta < Byte.MaxValue) (true, delta.toByte) else (false, 0: Byte)
if (math.abs(delta) <= Byte.MaxValue) (true, delta.toByte) else (false, 0: Byte)
}
}

Expand All @@ -477,6 +473,6 @@ private[sql] case object LongDelta extends IntegralDelta[LongType.type] {

override protected def byteSizedDelta(x: Long, y: Long): (Boolean, Byte) = {
val delta = x - y
if (delta < Byte.MaxValue) (true, delta.toByte) else (false, 0: Byte)
if (math.abs(delta) <= Byte.MaxValue) (true, delta.toByte) else (false, 0: Byte)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import org.scalatest.FunSuite
import org.apache.spark.sql.catalyst.expressions.GenericMutableRow
import org.apache.spark.sql.catalyst.types.IntegralType
import org.apache.spark.sql.columnar._
import org.apache.spark.sql.columnar.ColumnarTestUtils._

class IntegralDeltaSuite extends FunSuite {
testIntegralDelta(new IntColumnStats, INT, IntDelta)
Expand Down Expand Up @@ -63,7 +64,7 @@ class IntegralDeltaSuite extends FunSuite {
} else {
val oneBoolean = columnType.defaultSize
1 + oneBoolean + deltas.map {
d => if (math.abs(d) < Byte.MaxValue) 1 else 1 + oneBoolean
d => if (math.abs(d) <= Byte.MaxValue) 1 else 1 + oneBoolean
}.sum
})

Expand All @@ -78,7 +79,7 @@ class IntegralDeltaSuite extends FunSuite {
expectResult(input.head, "The first value is wrong")(columnType.extract(buffer))

(input.tail, deltas).zipped.foreach { (value, delta) =>
if (delta < Byte.MaxValue) {
if (math.abs(delta) <= Byte.MaxValue) {
expectResult(delta, "Wrong delta")(buffer.get())
} else {
expectResult(Byte.MinValue, "Expecting escaping mark here")(buffer.get())
Expand All @@ -105,11 +106,17 @@ class IntegralDeltaSuite extends FunSuite {

test(s"$scheme: simple case") {
val input = columnType match {
case INT => Seq(1: Int, 2: Int, 130: Int)
case LONG => Seq(1: Long, 2: Long, 130: Long)
case INT => Seq(2: Int, 1: Int, 2: Int, 130: Int)
case LONG => Seq(2: Long, 1: Long, 2: Long, 130: Long)
}

skeleton(input.map(_.asInstanceOf[I#JvmType]))
}

test(s"$scheme: long random series") {
// Have to workaround with `Any` since no `ClassTag[I#JvmType]` available here.
val input = Array.fill[Any](10000)(makeRandomValue(columnType))
skeleton(input.map(_.asInstanceOf[I#JvmType]))
}
}
}

0 comments on commit 95b3301

Please sign in to comment.