Skip to content

Commit

Permalink
[SPARK-25786][CORE] If the ByteBuffer.hasArray is false , it will thr…
Browse files Browse the repository at this point in the history
…ow UnsupportedOperationException for Kryo

## What changes were proposed in this pull request?
`deserialize` for kryo,  the type of input parameter is ByteBuffer, if it is not backed by an accessible byte array. it will throw `UnsupportedOperationException`

Exception Info:
```
java.lang.UnsupportedOperationException was thrown.
java.lang.UnsupportedOperationException
    at java.nio.ByteBuffer.array(ByteBuffer.java:994)
    at org.apache.spark.serializer.KryoSerializerInstance.deserialize(KryoSerializer.scala:362)
```

## How was this patch tested?

Added a unit test

Closes #22779 from 10110346/InputStreamKryo.

Authored-by: liuxian <[email protected]>
Signed-off-by: Sean Owen <[email protected]>
  • Loading branch information
10110346 authored and srowen committed Nov 24, 2018
1 parent de84899 commit 7f5f7a9
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ import org.apache.spark.internal.Logging
import org.apache.spark.network.util.ByteUnit
import org.apache.spark.scheduler.{CompressedMapStatus, HighlyCompressedMapStatus}
import org.apache.spark.storage._
import org.apache.spark.util.{BoundedPriorityQueue, SerializableConfiguration, SerializableJobConf, Utils}
import org.apache.spark.util.{BoundedPriorityQueue, ByteBufferInputStream, SerializableConfiguration, SerializableJobConf, Utils}
import org.apache.spark.util.collection.CompactBuffer

/**
Expand Down Expand Up @@ -417,7 +417,12 @@ private[spark] class KryoSerializerInstance(
override def deserialize[T: ClassTag](bytes: ByteBuffer): T = {
val kryo = borrowKryo()
try {
input.setBuffer(bytes.array(), bytes.arrayOffset() + bytes.position(), bytes.remaining())
if (bytes.hasArray) {
input.setBuffer(bytes.array(), bytes.arrayOffset() + bytes.position(), bytes.remaining())
} else {
input.setBuffer(new Array[Byte](4096))
input.setInputStream(new ByteBufferInputStream(bytes))
}
kryo.readClassAndObject(input).asInstanceOf[T]
} finally {
releaseKryo(kryo)
Expand All @@ -429,7 +434,12 @@ private[spark] class KryoSerializerInstance(
val oldClassLoader = kryo.getClassLoader
try {
kryo.setClassLoader(loader)
input.setBuffer(bytes.array(), bytes.arrayOffset() + bytes.position(), bytes.remaining())
if (bytes.hasArray) {
input.setBuffer(bytes.array(), bytes.arrayOffset() + bytes.position(), bytes.remaining())
} else {
input.setBuffer(new Array[Byte](4096))
input.setInputStream(new ByteBufferInputStream(bytes))
}
kryo.readClassAndObject(input).asInstanceOf[T]
} finally {
kryo.setClassLoader(oldClassLoader)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.spark.serializer

import java.io.{ByteArrayInputStream, ByteArrayOutputStream, FileInputStream, FileOutputStream}
import java.nio.ByteBuffer
import java.util.concurrent.Executors

import scala.collection.JavaConverters._
Expand Down Expand Up @@ -551,6 +552,17 @@ class KryoSerializerAutoResetDisabledSuite extends SparkFunSuite with SharedSpar
deserializationStream.close()
assert(serInstance.deserialize[Any](helloHello) === ((hello, hello)))
}

test("SPARK-25786: ByteBuffer.array -- UnsupportedOperationException") {
val serInstance = new KryoSerializer(conf).newInstance().asInstanceOf[KryoSerializerInstance]
val obj = "UnsupportedOperationException"
val serObj = serInstance.serialize(obj)
val byteBuffer = ByteBuffer.allocateDirect(serObj.array().length)
byteBuffer.put(serObj.array())
byteBuffer.flip()
assert(serInstance.deserialize[Any](serObj) === (obj))
assert(serInstance.deserialize[Any](byteBuffer) === (obj))
}
}

class ClassLoaderTestingObject
Expand Down

0 comments on commit 7f5f7a9

Please sign in to comment.