Skip to content

Commit

Permalink
be safe
Browse files Browse the repository at this point in the history
  • Loading branch information
Davies Liu committed Mar 31, 2016
1 parent 8dc1adf commit dd71ba9
Show file tree
Hide file tree
Showing 3 changed files with 7 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ private[spark] case class ChainedPythonFunctions(funcs: Seq[PythonFunction])
private[spark] object PythonRunner {
def apply(func: PythonFunction, bufferSize: Int, reuse_worker: Boolean): PythonRunner = {
new PythonRunner(
Seq(ChainedPythonFunctions(Seq(func))), bufferSize, reuse_worker, false, Seq(Seq(0)))
Seq(ChainedPythonFunctions(Seq(func))), bufferSize, reuse_worker, false, Array(Array(0)))
}
}

Expand All @@ -101,7 +101,7 @@ private[spark] class PythonRunner(
bufferSize: Int,
reuse_worker: Boolean,
isUDF: Boolean,
argOffsets: Seq[Seq[Int]])
argOffsets: Array[Array[Int]])
extends Logging {

require(funcs.length == argOffsets.length, "numArgs should have the same length as funcs")
Expand Down
4 changes: 2 additions & 2 deletions python/pyspark/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
from pyspark.broadcast import Broadcast, _broadcastRegistry
from pyspark.files import SparkFiles
from pyspark.serializers import write_with_length, write_int, read_long, \
write_long, read_int, SpecialLengths, UTF8Deserializer, PickleSerializer, AutoBatchedSerializer
write_long, read_int, SpecialLengths, UTF8Deserializer, PickleSerializer, BatchedSerializer
from pyspark import shuffle

pickleSer = PickleSerializer()
Expand Down Expand Up @@ -101,7 +101,7 @@ def read_udfs(pickleSer, infile):
mapper = eval(mapper_str, udfs)

func = lambda _, it: map(mapper, it)
ser = AutoBatchedSerializer(PickleSerializer())
ser = BatchedSerializer(PickleSerializer(), 100)
# profiling is not supported for UDF
return func, None, ser, ser

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,13 +86,13 @@ case class BatchPythonEvaluation(udfs: Seq[PythonUDF], output: Seq[Attribute], c
dataTypes += e.dataType
allInputs.length - 1
}
}
}
}.toArray
}.toArray
val projection = newMutableProjection(allInputs, child.output)()

// Input iterator to Python: input rows are grouped so we send them in batches to Python.
// For each row, add it to the queue.
val inputIterator = iter.grouped(1024).map { inputRows =>
val inputIterator = iter.grouped(100).map { inputRows =>
val toBePickled = inputRows.map { inputRow =>
queue.add(inputRow)
val row = projection(inputRow)
Expand Down

0 comments on commit dd71ba9

Please sign in to comment.