Skip to content

Commit

Permalink
Replaced KryoSerializer with an updated SparkSqlSerializer
Browse files Browse the repository at this point in the history
* SparkSqlSerializer is moved to a separate source file
* SparkSqlSerializer.newKryo calls super.newKryo
* Class registration is no longer required, since we may de/serialise objects of any class with generic column accessor/builder.

Signed-off-by: Cheng Lian <[email protected]>
  • Loading branch information
liancheng committed Mar 22, 2014
1 parent b6c0a49 commit b8a645a
Show file tree
Hide file tree
Showing 7 changed files with 67 additions and 68 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import java.nio.{ByteOrder, ByteBuffer}

import org.apache.spark.sql.catalyst.types.{BinaryType, NativeType, DataType}
import org.apache.spark.sql.catalyst.expressions.MutableRow
import org.apache.spark.sql.execution.KryoSerializer
import org.apache.spark.sql.execution.SparkSqlSerializer

/**
* An `Iterator` like trait used to extract values from columnar byte buffer. When a value is
Expand Down Expand Up @@ -133,7 +133,7 @@ class GenericColumnAccessor(buffer: ByteBuffer)

override protected def doExtractTo(row: MutableRow, ordinal: Int) {
val serialized = columnType.extract(buffer)
row(ordinal) = KryoSerializer.deserialize[Any](serialized)
row(ordinal) = SparkSqlSerializer.deserialize[Any](serialized)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ package columnar
import java.nio.{ByteOrder, ByteBuffer}

import org.apache.spark.sql.catalyst.types._
import org.apache.spark.sql.execution.KryoSerializer
import org.apache.spark.sql.execution.SparkSqlSerializer

trait ColumnBuilder {
/**
Expand Down Expand Up @@ -140,7 +140,7 @@ class GenericColumnBuilder
def columnType = GENERIC

override def doAppendFrom(row: Row, ordinal: Int) {
val serialized = KryoSerializer.serialize(row(ordinal))
val serialized = SparkSqlSerializer.serialize(row(ordinal))
buffer = ColumnBuilder.ensureFreeSpace(buffer, columnType.actualSize(serialized))
columnType.append(serialized, buffer)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,47 +18,15 @@
package org.apache.spark.sql
package execution

import java.nio.ByteBuffer

import com.esotericsoftware.kryo.{Kryo, Serializer}
import com.esotericsoftware.kryo.io.{Output, Input}

import org.apache.spark.{SparkConf, RangePartitioner, HashPartitioner}
import org.apache.spark.rdd.ShuffledRDD
import org.apache.spark.serializer.{KryoSerializer => SparkKryoSerializer}
import org.apache.spark.util.MutablePair

import catalyst.rules.Rule
import catalyst.errors._
import catalyst.expressions._
import catalyst.plans.physical._

private class SparkSqlSerializer(conf: SparkConf) extends SparkKryoSerializer(conf) {
override def newKryo(): Kryo = {
val kryo = new Kryo
kryo.setRegistrationRequired(true)
kryo.register(classOf[MutablePair[_,_]])
kryo.register(classOf[Array[Any]])
kryo.register(classOf[org.apache.spark.sql.catalyst.expressions.GenericRow])
kryo.register(classOf[org.apache.spark.sql.catalyst.expressions.GenericMutableRow])
kryo.register(classOf[scala.collection.mutable.ArrayBuffer[_]])
kryo.register(classOf[scala.math.BigDecimal], new BigDecimalSerializer)
kryo.setReferences(false)
kryo.setClassLoader(this.getClass.getClassLoader)
kryo
}
}

private class BigDecimalSerializer extends Serializer[BigDecimal] {
def write(kryo: Kryo, output: Output, bd: math.BigDecimal) {
// TODO: There are probably more efficient representations than strings...
output.writeString(bd.toString)
}

def read(kryo: Kryo, input: Input, tpe: Class[BigDecimal]): BigDecimal = {
BigDecimal(input.readString())
}
}
import execution.SparkSqlSerializer

case class Exchange(newPartitioning: Partitioning, child: SparkPlan) extends UnaryNode {

Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
package org.apache.spark.sql
package execution

import java.nio.ByteBuffer

import com.esotericsoftware.kryo.io.{Input, Output}
import com.esotericsoftware.kryo.{Serializer, Kryo}

import org.apache.spark.{SparkEnv, SparkConf}
import org.apache.spark.serializer.KryoSerializer
import org.apache.spark.util.MutablePair

class SparkSqlSerializer(conf: SparkConf) extends KryoSerializer(conf) {
override def newKryo(): Kryo = {
val kryo = super.newKryo()
kryo.setRegistrationRequired(false)
kryo.register(classOf[MutablePair[_,_]])
kryo.register(classOf[Array[Any]])
kryo.register(classOf[org.apache.spark.sql.catalyst.expressions.GenericRow])
kryo.register(classOf[org.apache.spark.sql.catalyst.expressions.GenericMutableRow])
kryo.register(classOf[scala.collection.mutable.ArrayBuffer[_]])
kryo.register(classOf[scala.math.BigDecimal], new BigDecimalSerializer)
kryo.setReferences(false)
kryo.setClassLoader(this.getClass.getClassLoader)
kryo
}
}

object SparkSqlSerializer {
@transient lazy val ser: SparkSqlSerializer = {
val sparkConf = Option(SparkEnv.get).map(_.conf).getOrElse(new SparkConf())
new SparkSqlSerializer(sparkConf)
}

def serialize[T](o: T): Array[Byte] = {
ser.newInstance().serialize(o).array()
}

def deserialize[T](bytes: Array[Byte]): T = {
ser.newInstance().deserialize[T](ByteBuffer.wrap(bytes))
}
}

class BigDecimalSerializer extends Serializer[BigDecimal] {
def write(kryo: Kryo, output: Output, bd: math.BigDecimal) {
// TODO: There are probably more efficient representations than strings...
output.writeString(bd.toString())
}

def read(kryo: Kryo, input: Input, tpe: Class[BigDecimal]): BigDecimal = {
BigDecimal(input.readString())
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import scala.util.Random
import org.scalatest.FunSuite

import org.apache.spark.sql.catalyst.types._
import org.apache.spark.sql.execution.KryoSerializer
import org.apache.spark.sql.execution.SparkSqlSerializer

class ColumnTypeSuite extends FunSuite {
val columnTypes = Seq(INT, SHORT, LONG, BYTE, DOUBLE, FLOAT, STRING, BINARY, GENERIC)
Expand All @@ -22,7 +22,7 @@ class ColumnTypeSuite extends FunSuite {
}

test("actualSize") {
val expectedSizes = Seq(4, 2, 8, 1, 8, 4, 4 + 5, 4 + 4, 4 + 11)
val expectedSizes = Seq(4, 2, 8, 1, 8, 4, 4 + 5, 4 + 4, 4 + 8)
val actualSizes = Seq(
INT.actualSize(Int.MaxValue),
SHORT.actualSize(Short.MaxValue),
Expand All @@ -32,7 +32,7 @@ class ColumnTypeSuite extends FunSuite {
FLOAT.actualSize(Float.MaxValue),
STRING.actualSize("hello"),
BINARY.actualSize(new Array[Byte](4)),
GENERIC.actualSize(KryoSerializer.serialize(Map(1 -> "a"))))
GENERIC.actualSize(SparkSqlSerializer.serialize(Map(1 -> "a"))))

expectedSizes.zip(actualSizes).foreach { case (expected, actual) =>
assert(expected === actual)
Expand Down Expand Up @@ -153,23 +153,23 @@ class ColumnTypeSuite extends FunSuite {
test("GENERIC") {
val buffer = ByteBuffer.allocate(512)
val obj = Map(1 -> "spark", 2 -> "sql")
val serializedObj = KryoSerializer.serialize(obj)
val serializedObj = SparkSqlSerializer.serialize(obj)

GENERIC.append(KryoSerializer.serialize(obj), buffer)
GENERIC.append(SparkSqlSerializer.serialize(obj), buffer)
buffer.rewind()

val length = buffer.getInt()
assert(length === serializedObj.length)

val bytes = new Array[Byte](length)
buffer.get(bytes, 0, length)
assert(obj === KryoSerializer.deserialize(bytes))
assert(obj === SparkSqlSerializer.deserialize(bytes))

buffer.rewind()
buffer.putInt(serializedObj.length).put(serializedObj)

buffer.rewind()
assert(obj === KryoSerializer.deserialize(GENERIC.extract(buffer)))
assert(obj === SparkSqlSerializer.deserialize(GENERIC.extract(buffer)))
}

def testNumericColumnType[T <: DataType, JvmType](
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ package columnar
import org.scalatest.FunSuite

import org.apache.spark.sql.catalyst.types.DataType
import org.apache.spark.sql.execution.KryoSerializer
import org.apache.spark.sql.execution.SparkSqlSerializer

class NullableColumnBuilderSuite extends FunSuite {
import ColumnarTestData._
Expand Down Expand Up @@ -81,7 +81,7 @@ class NullableColumnBuilderSuite extends FunSuite {
// For non-null values
(0 until 4).foreach { _ =>
val actual = if (columnType == GENERIC) {
KryoSerializer.deserialize[Any](GENERIC.extract(buffer))
SparkSqlSerializer.deserialize[Any](GENERIC.extract(buffer))
} else {
columnType.extract(buffer)
}
Expand Down

0 comments on commit b8a645a

Please sign in to comment.