Skip to content

Commit

Permalink
Fix serialization error in UserDefinedGenerator.
Browse files Browse the repository at this point in the history
  • Loading branch information
JoshRosen committed May 29, 2015
1 parent 85bba9d commit 8033d4c
Showing 1 changed file with 21 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,13 @@

package org.apache.spark.sql.catalyst.expressions

import java.io.{ObjectInputStream, IOException}

import scala.collection.Map

import org.apache.spark.sql.catalyst.{CatalystTypeConverters, trees}
import org.apache.spark.sql.types._
import org.apache.spark.util.Utils

/**
* An expression that produces zero or more rows given a single input row.
Expand Down Expand Up @@ -71,11 +74,24 @@ case class UserDefinedGenerator(
children: Seq[Expression])
extends Generator {

private[this] val inputRow: InterpretedProjection = new InterpretedProjection(children)
private[this] val convertToScala: (Row) => Row = {
val inputSchema = StructType(children.map(e => StructField(e.simpleString, e.dataType, true)))
CatalystTypeConverters.createToScalaConverter(inputSchema)
}.asInstanceOf[(Row => Row)]
@transient private[this] var inputRow: InterpretedProjection = _
@transient private[this] var convertToScala: (Row) => Row = _

private def initializeConverters(): Unit = {
inputRow = new InterpretedProjection(children)
convertToScala = {
val inputSchema = StructType(children.map(e => StructField(e.simpleString, e.dataType, true)))
CatalystTypeConverters.createToScalaConverter(inputSchema)
}.asInstanceOf[(Row => Row)]
}

initializeConverters()

@throws(classOf[IOException])
private def readObject(ois: ObjectInputStream): Unit = Utils.tryOrIOException {
ois.defaultReadObject()
initializeConverters()
}

override def eval(input: Row): TraversableOnce[Row] = {
// TODO(davies): improve this
Expand Down

0 comments on commit 8033d4c

Please sign in to comment.