Skip to content

Commit

Permalink
Merge pull request #14 from metamx/SPARK-12222
Browse files Browse the repository at this point in the history
[SPARK-12222] [CORE] Deserialize RoaringBitmap using Kryo serializer …
  • Loading branch information
drcrallen committed Dec 16, 2015
2 parents 7f0ebdb + f9d20b9 commit 6e11464
Show file tree
Hide file tree
Showing 2 changed files with 36 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -368,7 +368,15 @@ private[serializer] class KryoInputDataInputBridge(input : KryoInput) extends Da
override def readUTF(): String = input.readString() // readString in kryo does utf8
override def readInt(): Int = input.readInt()
override def readUnsignedShort(): Int = input.readShortUnsigned()
override def skipBytes(n: Int): Int = input.skip(n.toLong).toInt
override def skipBytes(n: Int): Int = {
var remaining: Long = n
while (remaining > 0) {
val skip = Math.min(Integer.MAX_VALUE, remaining).asInstanceOf[Int]
input.skip(skip)
remaining -= skip
}
n
}
override def readFully(b: Array[Byte]): Unit = input.read(b)
override def readFully(b: Array[Byte], off: Int, len: Int): Unit = input.read(b, off, len)
override def readLine(): String = throw new UnsupportedOperationException("readLine")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,20 @@

package org.apache.spark.serializer

import java.io.{ByteArrayInputStream, ByteArrayOutputStream}
import java.io.{ByteArrayInputStream, ByteArrayOutputStream, FileOutputStream, FileInputStream}

import scala.collection.mutable
import scala.reflect.ClassTag

import com.esotericsoftware.kryo.Kryo
import com.esotericsoftware.kryo.io.{Input => KryoInput, Output => KryoOutput}

import org.roaringbitmap.RoaringBitmap

import org.apache.spark.{SharedSparkContext, SparkConf, SparkFunSuite}
import org.apache.spark.scheduler.HighlyCompressedMapStatus
import org.apache.spark.serializer.KryoTest._
import org.apache.spark.util.Utils
import org.apache.spark.storage.BlockManagerId

class KryoSerializerSuite extends SparkFunSuite with SharedSparkContext {
Expand Down Expand Up @@ -319,6 +323,28 @@ class KryoSerializerSuite extends SparkFunSuite with SharedSparkContext {
assert(thrown.getMessage.contains(kryoBufferMaxProperty))
}

test("SPARK-12222: deserialize RoaringBitmap throw Buffer underflow exception") {
val dir = Utils.createTempDir()
val tmpfile = dir.toString + "/RoaringBitmap"
val outStream = new FileOutputStream(tmpfile)
val output = new KryoOutput(outStream)
val bitmap = new RoaringBitmap
bitmap.add(1)
bitmap.add(3)
bitmap.add(5)
bitmap.serialize(new KryoOutputDataOutputBridge(output))
output.flush()
output.close()

val inStream = new FileInputStream(tmpfile)
val input = new KryoInput(inStream)
val ret = new RoaringBitmap
ret.deserialize(new KryoInputDataInputBridge(input))
input.close()
assert(ret == bitmap)
Utils.deleteRecursively(dir)
}

test("getAutoReset") {
val ser = new KryoSerializer(new SparkConf).newInstance().asInstanceOf[KryoSerializerInstance]
assert(ser.getAutoReset)
Expand Down

0 comments on commit 6e11464

Please sign in to comment.