Skip to content

Commit

Permalink
TypeTags are not kryo serializable by default, rearrange implicits to…
Browse files Browse the repository at this point in the history
… address this issue
  • Loading branch information
pomadchin committed Mar 27, 2022
1 parent b1c60ed commit 6eae7b1
Show file tree
Hide file tree
Showing 3 changed files with 27 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,8 @@ object HSerializer extends Serializable {
// format: off
/**
* Derive HSerializer from ExpressionEncoder.
* Intentionally not used for instances implementation, causes the following failure on DataBricks:
* Intentionally not used for instances implementation, causes the following failure on DataBricks;
* TypeTags are not Kryo serializable by default:
* org.apache.spark.SparkException: Job aborted due to stage failure: Task serialization failed: com.esotericsoftware.kryo.KryoException: java.util.ConcurrentModificationException
* Serialization trace:
* classes (sun.misc.Launcher$AppClassLoader)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,12 +51,29 @@ object UnaryDeserializer extends Serializable {
implicit def tryUnaryDeserializer[T: UnaryDeserializer[Id, *]]: UnaryDeserializer[Try, T] =
(arguments, inspectors) => Try(id[T].deserialize(arguments, inspectors))

/** Derive Optional UnaryDeserializers. */
implicit def optionalUnaryDeserializer[T: UnaryDeserializer[Id, *]]: UnaryDeserializer[Id, Option[T]] =
(arguments, inspectors) => (arguments.headOption, inspectors.headOption).mapN(id[T].deserialize)

/** Derive UnaryDeserializers from ExpressionEncoders. */
implicit def expressionEncoderUnaryDeserializer[T: TypeTag: ExpressionEncoder]: UnaryDeserializer[Id, T] =
// format: off
/**
* Derive UnaryDeserializers from ExpressionEncoders.
* Intentionally not used for instances implementation, causes the following failure on DataBricks;
* TypeTags are not Kryo serializable by default:
* org.apache.spark.SparkException: Job aborted due to stage failure: Task serialization failed: com.esotericsoftware.kryo.KryoException: java.util.ConcurrentModificationException
* Serialization trace:
* classes (sun.misc.Launcher$AppClassLoader)
* classloader (java.security.ProtectionDomain)
* context (java.security.AccessControlContext)
* acc (com.databricks.backend.daemon.driver.ClassLoaders$LibraryClassLoader)
* classLoader (scala.reflect.runtime.JavaMirrors$JavaMirror)
* mirror (scala.reflect.api.TypeTags$TypeTagImpl)
* evidence$3$1 (com.azavea.hiveless.serializers.UnaryDeserializer$$anonfun$expressionEncoderUnaryDeserializer$2)
* evidence$1$1 (com.azavea.hiveless.serializers.UnaryDeserializer$$anonfun$tryUnaryDeserializer$3)
* dh$1 (com.azavea.hiveless.serializers.GenericDeserializer$$anon$4)
* d$2 (com.azavea.hiveless.serializers.GenericDeserializer$$anon$2)
*/
// format: on
def expressionEncoderUnaryDeserializer[T: TypeTag: ExpressionEncoder]: UnaryDeserializer[Id, T] =
(arguments, inspectors) => arguments.deserialize[InternalRow](inspectors).as[T]

/** Derivation helper deserializer. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,7 @@ package object index extends StandardEncoders {
def serialize: CRS => Any = crs => crs.toProj4String.serialize
}

/**
* HSerializer.expressionEncoderSerializer causes serialization issues on DataBricks. TODO: investigate this issue.
*/
/** HSerializer.expressionEncoderSerializer is not used since TypeTags are not Kryo serializable by default. */
implicit def extentSerializer: HSerializer[Extent] = new HSerializer[Extent] {
def dataType: DataType = extentEncoder.schema
def serialize: Extent => InternalRow = _.toInternalRow
Expand All @@ -52,4 +50,8 @@ package object index extends StandardEncoders {
def dataType: DataType = z2IndexEncoder.schema
def serialize: Z2Index => InternalRow = _.toInternalRow
}

/** UnaryDeserializer.expressionEncoderUnaryDeserializer since TypeTags are not Kryo serializable by default. */
implicit def extentUnaryDeserializer: UnaryDeserializer[Id, Extent] =
(arguments, inspectors) => arguments.deserialize[InternalRow](inspectors).as[Extent]
}

0 comments on commit 6eae7b1

Please sign in to comment.