Skip to content

Commit

Permalink
[SPARK-12222] [CORE] Deserialize RoaringBitmap using Kryo serializer …
Browse files Browse the repository at this point in the history
…throw Buffer underflow exception

Jira: https://issues.apache.org/jira/browse/SPARK-12222

Deserialize RoaringBitmap using Kryo serializer throw Buffer underflow exception:
```
com.esotericsoftware.kryo.KryoException: Buffer underflow.
	at com.esotericsoftware.kryo.io.Input.require(Input.java:156)
	at com.esotericsoftware.kryo.io.Input.skip(Input.java:131)
	at com.esotericsoftware.kryo.io.Input.skip(Input.java:264)
```

This is caused by a bug of kryo's `Input.skip(long count)`(EsotericSoftware/kryo#119) and we call this method in `KryoInputDataInputBridge`.

Instead of upgrade kryo's version, this pr bypass the  kryo's `Input.skip(long count)` by directly call another `skip` method in kryo's Input.java(https://github.com/EsotericSoftware/kryo/blob/kryo-2.21/src/com/esotericsoftware/kryo/io/Input.java#L124), i.e. write the bug-fixed version of `Input.skip(long count)` in KryoInputDataInputBridge's `skipBytes` method.

more detail link to #9748 (comment)

Author: Fei Wang <[email protected]>

Closes #10213 from scwf/patch-1.

(cherry picked from commit 3934562)
Signed-off-by: Davies Liu <[email protected]>
  • Loading branch information
scwf authored and davies committed Dec 9, 2015
1 parent 9e82273 commit 0be792a
Show file tree
Hide file tree
Showing 2 changed files with 36 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -398,7 +398,15 @@ private[serializer] class KryoInputDataInputBridge(input: KryoInput) extends Dat
override def readUTF(): String = input.readString() // readString in kryo does utf8
override def readInt(): Int = input.readInt()
override def readUnsignedShort(): Int = input.readShortUnsigned()
override def skipBytes(n: Int): Int = input.skip(n.toLong).toInt
override def skipBytes(n: Int): Int = {
var remaining: Long = n
while (remaining > 0) {
val skip = Math.min(Integer.MAX_VALUE, remaining).asInstanceOf[Int]
input.skip(skip)
remaining -= skip
}
n
}
override def readFully(b: Array[Byte]): Unit = input.read(b)
override def readFully(b: Array[Byte], off: Int, len: Int): Unit = input.read(b, off, len)
override def readLine(): String = throw new UnsupportedOperationException("readLine")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,21 @@

package org.apache.spark.serializer

import java.io.{ByteArrayInputStream, ByteArrayOutputStream}
import java.io.{ByteArrayInputStream, ByteArrayOutputStream, FileOutputStream, FileInputStream}

import scala.collection.JavaConverters._
import scala.collection.mutable
import scala.reflect.ClassTag

import com.esotericsoftware.kryo.Kryo
import com.esotericsoftware.kryo.io.{Input => KryoInput, Output => KryoOutput}

import org.roaringbitmap.RoaringBitmap

import org.apache.spark.{SharedSparkContext, SparkConf, SparkFunSuite}
import org.apache.spark.scheduler.HighlyCompressedMapStatus
import org.apache.spark.serializer.KryoTest._
import org.apache.spark.util.Utils
import org.apache.spark.storage.BlockManagerId

class KryoSerializerSuite extends SparkFunSuite with SharedSparkContext {
Expand Down Expand Up @@ -350,6 +354,28 @@ class KryoSerializerSuite extends SparkFunSuite with SharedSparkContext {
assert(thrown.getMessage.contains(kryoBufferMaxProperty))
}

test("SPARK-12222: deserialize RoaringBitmap throw Buffer underflow exception") {
val dir = Utils.createTempDir()
val tmpfile = dir.toString + "/RoaringBitmap"
val outStream = new FileOutputStream(tmpfile)
val output = new KryoOutput(outStream)
val bitmap = new RoaringBitmap
bitmap.add(1)
bitmap.add(3)
bitmap.add(5)
bitmap.serialize(new KryoOutputDataOutputBridge(output))
output.flush()
output.close()

val inStream = new FileInputStream(tmpfile)
val input = new KryoInput(inStream)
val ret = new RoaringBitmap
ret.deserialize(new KryoInputDataInputBridge(input))
input.close()
assert(ret == bitmap)
Utils.deleteRecursively(dir)
}

test("getAutoReset") {
val ser = new KryoSerializer(new SparkConf).newInstance().asInstanceOf[KryoSerializerInstance]
assert(ser.getAutoReset)
Expand Down

0 comments on commit 0be792a

Please sign in to comment.