Skip to content

Commit

Permalink
[SPARK-12511] [PYSPARK] [STREAMING] Make sure PythonDStream.registerS…
Browse files Browse the repository at this point in the history
…erializer is called only once

There is an issue that Py4J's PythonProxyHandler.finalize blocks forever. (py4j/py4j#184)

Py4j will create a PythonProxyHandler in Java for "transformer_serializer" when calling "registerSerializer". If we call "registerSerializer" twice, the second PythonProxyHandler will override the first one, then the first one will be GCed and trigger "PythonProxyHandler.finalize". To avoid that, we should not call"registerSerializer" more than once, so that "PythonProxyHandler" in Java side won't be GCed.

Author: Shixiong Zhu <[email protected]>

Closes #10514 from zsxwing/SPARK-12511.

(cherry picked from commit 6cfe341)
Signed-off-by: Davies Liu <[email protected]>
  • Loading branch information
zsxwing authored and davies committed Jan 5, 2016
1 parent f31d0fd commit 83fe5cf
Show file tree
Hide file tree
Showing 3 changed files with 38 additions and 10 deletions.
33 changes: 25 additions & 8 deletions python/pyspark/streaming/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,8 +98,28 @@ def _ensure_initialized(cls):

# register serializer for TransformFunction
# it happens before creating SparkContext when loading from checkpointing
cls._transformerSerializer = TransformFunctionSerializer(
SparkContext._active_spark_context, CloudPickleSerializer(), gw)
if cls._transformerSerializer is None:
transformer_serializer = TransformFunctionSerializer()
transformer_serializer.init(
SparkContext._active_spark_context, CloudPickleSerializer(), gw)
# SPARK-12511 streaming driver with checkpointing unable to finalize leading to OOM
# There is an issue that Py4J's PythonProxyHandler.finalize blocks forever.
# (https://github.com/bartdag/py4j/pull/184)
#
# Py4j will create a PythonProxyHandler in Java for "transformer_serializer" when
# calling "registerSerializer". If we call "registerSerializer" twice, the second
# PythonProxyHandler will override the first one, then the first one will be GCed and
# trigger "PythonProxyHandler.finalize". To avoid that, we should not call
# "registerSerializer" more than once, so that "PythonProxyHandler" in Java side won't
# be GCed.
#
# TODO Once Py4J fixes this issue, we should upgrade Py4j to the latest version.
transformer_serializer.gateway.jvm.PythonDStream.registerSerializer(
transformer_serializer)
cls._transformerSerializer = transformer_serializer
else:
cls._transformerSerializer.init(
SparkContext._active_spark_context, CloudPickleSerializer(), gw)

@classmethod
def getOrCreate(cls, checkpointPath, setupFunc):
Expand All @@ -116,16 +136,13 @@ def getOrCreate(cls, checkpointPath, setupFunc):
gw = SparkContext._gateway

# Check whether valid checkpoint information exists in the given path
if gw.jvm.CheckpointReader.read(checkpointPath).isEmpty():
ssc_option = gw.jvm.StreamingContextPythonHelper().tryRecoverFromCheckpoint(checkpointPath)
if ssc_option.isEmpty():
ssc = setupFunc()
ssc.checkpoint(checkpointPath)
return ssc

try:
jssc = gw.jvm.JavaStreamingContext(checkpointPath)
except Exception:
print("failed to load StreamingContext from checkpoint", file=sys.stderr)
raise
jssc = gw.jvm.JavaStreamingContext(ssc_option.get())

# If there is already an active instance of Python SparkContext use it, or create a new one
if not SparkContext._active_spark_context:
Expand Down
3 changes: 1 addition & 2 deletions python/pyspark/streaming/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,11 +89,10 @@ class TransformFunctionSerializer(object):
it uses this class to invoke Python, which returns the serialized function
as a byte array.
"""
def __init__(self, ctx, serializer, gateway=None):
def init(self, ctx, serializer, gateway=None):
self.ctx = ctx
self.serializer = serializer
self.gateway = gateway or self.ctx._gateway
self.gateway.jvm.PythonDStream.registerSerializer(self)
self.failure = None

def dumps(self, id):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -897,3 +897,15 @@ object StreamingContext extends Logging {
result
}
}

private class StreamingContextPythonHelper {

/**
* This is a private method only for Python to implement `getOrCreate`.
*/
def tryRecoverFromCheckpoint(checkpointPath: String): Option[StreamingContext] = {
val checkpointOption = CheckpointReader.read(
checkpointPath, new SparkConf(), SparkHadoopUtil.get.conf, false)
checkpointOption.map(new StreamingContext(null, _, null))
}
}

0 comments on commit 83fe5cf

Please sign in to comment.