Skip to content

Commit

Permalink
[SPARK-40433][SS][PYTHON] Add toJVMRow in PythonSQLUtils to convert p…
Browse files Browse the repository at this point in the history
…ickled PySpark Row to JVM Row

### What changes were proposed in this pull request?

This PR adds toJVMRow in PythonSQLUtils to convert pickled PySpark Row to JVM Row.

Co-authored with HyukjinKwon .

This is a breakdown PR of apache#37863.

### Why are the changes needed?

This change will be leveraged in [SPARK-40434](https://issues.apache.org/jira/browse/SPARK-40434).

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

N/A. We will make sure test suites are constructed via E2E manner under [SPARK-40431](https://issues.apache.org/jira/browse/SPARK-40431).

Closes apache#37891 from HeartSaVioR/SPARK-40433.

Lead-authored-by: Jungtaek Lim <[email protected]>
Co-authored-by: Hyukjin Kwon <[email protected]>
Signed-off-by: Jungtaek Lim <[email protected]>
  • Loading branch information
2 people authored and LuciferYang committed Sep 20, 2022
1 parent fbbaca5 commit 12c82d2
Showing 1 changed file with 35 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,27 +22,45 @@ import java.net.Socket
import java.nio.channels.Channels
import java.util.Locale

import net.razorvine.pickle.Pickler
import net.razorvine.pickle.{Pickler, Unpickler}

import org.apache.spark.api.python.DechunkedInputStream
import org.apache.spark.internal.Logging
import org.apache.spark.security.SocketAuthServer
import org.apache.spark.sql.{Column, DataFrame, Row, SparkSession}
import org.apache.spark.sql.catalyst.CatalystTypeConverters
import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow}
import org.apache.spark.sql.catalyst.analysis.FunctionRegistry
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.aggregate._
import org.apache.spark.sql.catalyst.parser.CatalystSqlParser
import org.apache.spark.sql.execution.{ExplainMode, QueryExecution}
import org.apache.spark.sql.execution.arrow.ArrowConverters
import org.apache.spark.sql.execution.python.EvaluatePython
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types.DataType
import org.apache.spark.sql.types.{DataType, StructType}

private[sql] object PythonSQLUtils extends Logging {
private lazy val internalRowPickler = {
private def withInternalRowPickler(f: Pickler => Array[Byte]): Array[Byte] = {
EvaluatePython.registerPicklers()
new Pickler(true, false)
val pickler = new Pickler(true, false)
val ret = try {
f(pickler)
} finally {
pickler.close()
}
ret
}

private def withInternalRowUnpickler(f: Unpickler => Any): Any = {
EvaluatePython.registerPicklers()
val unpickler = new Unpickler
val ret = try {
f(unpickler)
} finally {
unpickler.close()
}
ret
}

def parseDataType(typeText: String): DataType = CatalystSqlParser.parseDataType(typeText)
Expand Down Expand Up @@ -94,8 +112,18 @@ private[sql] object PythonSQLUtils extends Logging {

def toPyRow(row: Row): Array[Byte] = {
assert(row.isInstanceOf[GenericRowWithSchema])
internalRowPickler.dumps(EvaluatePython.toJava(
CatalystTypeConverters.convertToCatalyst(row), row.schema))
withInternalRowPickler(_.dumps(EvaluatePython.toJava(
CatalystTypeConverters.convertToCatalyst(row), row.schema)))
}

def toJVMRow(
arr: Array[Byte],
returnType: StructType,
deserializer: ExpressionEncoder.Deserializer[Row]): Row = {
val fromJava = EvaluatePython.makeFromJava(returnType)
val internalRow =
fromJava(withInternalRowUnpickler(_.loads(arr))).asInstanceOf[InternalRow]
deserializer(internalRow)
}

def castTimestampNTZToLong(c: Column): Column = Column(CastTimestampNTZToLong(c.expr))
Expand Down

0 comments on commit 12c82d2

Please sign in to comment.