Skip to content

Commit

Permalink
fix
Browse files Browse the repository at this point in the history
  • Loading branch information
10110346 committed Nov 24, 2018
1 parent de84899 commit 55db355
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ import org.apache.spark.internal.Logging
import org.apache.spark.network.util.ByteUnit
import org.apache.spark.scheduler.{CompressedMapStatus, HighlyCompressedMapStatus}
import org.apache.spark.storage._
import org.apache.spark.util.{BoundedPriorityQueue, SerializableConfiguration, SerializableJobConf, Utils}
import org.apache.spark.util.{BoundedPriorityQueue, ByteBufferInputStream, SerializableConfiguration, SerializableJobConf, Utils}
import org.apache.spark.util.collection.CompactBuffer

/**
Expand Down Expand Up @@ -417,7 +417,12 @@ private[spark] class KryoSerializerInstance(
override def deserialize[T: ClassTag](bytes: ByteBuffer): T = {
val kryo = borrowKryo()
try {
input.setBuffer(bytes.array(), bytes.arrayOffset() + bytes.position(), bytes.remaining())
if (bytes.hasArray) {
input.setBuffer(bytes.array(), bytes.arrayOffset() + bytes.position(), bytes.remaining())
} else {
input.setBuffer(new Array[Byte](4096))
input.setInputStream(new ByteBufferInputStream(bytes))
}
kryo.readClassAndObject(input).asInstanceOf[T]
} finally {
releaseKryo(kryo)
Expand All @@ -429,7 +434,12 @@ private[spark] class KryoSerializerInstance(
val oldClassLoader = kryo.getClassLoader
try {
kryo.setClassLoader(loader)
input.setBuffer(bytes.array(), bytes.arrayOffset() + bytes.position(), bytes.remaining())
if (bytes.hasArray) {
input.setBuffer(bytes.array(), bytes.arrayOffset() + bytes.position(), bytes.remaining())
} else {
input.setBuffer(new Array[Byte](4096))
input.setInputStream(new ByteBufferInputStream(bytes))
}
kryo.readClassAndObject(input).asInstanceOf[T]
} finally {
kryo.setClassLoader(oldClassLoader)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.spark.serializer

import java.io.{ByteArrayInputStream, ByteArrayOutputStream, FileInputStream, FileOutputStream}
import java.nio.ByteBuffer
import java.util.concurrent.Executors

import scala.collection.JavaConverters._
Expand Down Expand Up @@ -551,6 +552,17 @@ class KryoSerializerAutoResetDisabledSuite extends SparkFunSuite with SharedSpar
deserializationStream.close()
assert(serInstance.deserialize[Any](helloHello) === ((hello, hello)))
}

test("SPARK-25786: ByteBuffer.array -- UnsupportedOperationException") {
val serInstance = new KryoSerializer(conf).newInstance().asInstanceOf[KryoSerializerInstance]
val obj = "UnsupportedOperationException"
val serObj = serInstance.serialize(obj)
val byteBuffer = ByteBuffer.allocateDirect(serObj.array().length)
byteBuffer.put(serObj.array())
byteBuffer.flip()
assert(serInstance.deserialize[Any](serObj) === (obj))
assert(serInstance.deserialize[Any](byteBuffer) === (obj))
}
}

class ClassLoaderTestingObject
Expand Down

0 comments on commit 55db355

Please sign in to comment.