diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala index 587d5c7dd0fee..ff38e9a2903ff 100644 --- a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala +++ b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala @@ -86,7 +86,7 @@ private[spark] case class ChainedPythonFunctions(funcs: Seq[PythonFunction]) private[spark] object PythonRunner { def apply(func: PythonFunction, bufferSize: Int, reuse_worker: Boolean): PythonRunner = { new PythonRunner( - Seq(ChainedPythonFunctions(Seq(func))), bufferSize, reuse_worker, false, Seq(Seq(0))) + Seq(ChainedPythonFunctions(Seq(func))), bufferSize, reuse_worker, false, Array(Array(0))) } } @@ -101,7 +101,7 @@ private[spark] class PythonRunner( bufferSize: Int, reuse_worker: Boolean, isUDF: Boolean, - argOffsets: Seq[Seq[Int]]) + argOffsets: Array[Array[Int]]) extends Logging { require(funcs.length == argOffsets.length, "numArgs should have the same length as funcs") diff --git a/python/pyspark/worker.py b/python/pyspark/worker.py index 849516c8106be..575ec5ba3ffa9 100644 --- a/python/pyspark/worker.py +++ b/python/pyspark/worker.py @@ -29,7 +29,7 @@ from pyspark.broadcast import Broadcast, _broadcastRegistry from pyspark.files import SparkFiles from pyspark.serializers import write_with_length, write_int, read_long, \ - write_long, read_int, SpecialLengths, UTF8Deserializer, PickleSerializer, AutoBatchedSerializer + write_long, read_int, SpecialLengths, UTF8Deserializer, PickleSerializer, BatchedSerializer from pyspark import shuffle pickleSer = PickleSerializer() @@ -101,7 +101,7 @@ def read_udfs(pickleSer, infile): mapper = eval(mapper_str, udfs) func = lambda _, it: map(mapper, it) - ser = AutoBatchedSerializer(PickleSerializer()) + ser = BatchedSerializer(PickleSerializer(), 100) # profiling is not supported for UDF return func, None, ser, ser diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/BatchPythonEvaluation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/BatchPythonEvaluation.scala index 180d2f375d01f..a537ed3bc4199 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/BatchPythonEvaluation.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/BatchPythonEvaluation.scala @@ -86,13 +86,13 @@ case class BatchPythonEvaluation(udfs: Seq[PythonUDF], output: Seq[Attribute], c dataTypes += e.dataType allInputs.length - 1 } - } - } + }.toArray + }.toArray val projection = newMutableProjection(allInputs, child.output)() // Input iterator to Python: input rows are grouped so we send them in batches to Python. // For each row, add it to the queue. - val inputIterator = iter.grouped(1024).map { inputRows => + val inputIterator = iter.grouped(100).map { inputRows => val toBePickled = inputRows.map { inputRow => queue.add(inputRow) val row = projection(inputRow)