From 84a966811695f73497cc80dda6b2e53d0df5051f Mon Sep 17 00:00:00 2001 From: Ken Takagiwa Date: Sat, 2 Aug 2014 15:58:24 -0700 Subject: [PATCH] tried to restart callback server --- python/pyspark/java_gateway.py | 5 ++++- python/pyspark/streaming/context.py | 8 ++++++++ 2 files changed, 12 insertions(+), 1 deletion(-) diff --git a/python/pyspark/java_gateway.py b/python/pyspark/java_gateway.py index 888c850779ff8..d32beef9b860c 100644 --- a/python/pyspark/java_gateway.py +++ b/python/pyspark/java_gateway.py @@ -102,7 +102,10 @@ def run(self): EchoOutputThread(proc.stdout).start() # Connect to the gateway - gateway = JavaGateway(GatewayClient(port=gateway_port), auto_convert=False, start_callback_server=True) + # If start_callback_server is True, it looks like callback server is not killed + # process is hang up and test case does not move forward. + #gateway = JavaGateway(GatewayClient(port=gateway_port), auto_convert=False, start_callback_server=True) + gateway = JavaGateway(GatewayClient(port=gateway_port), auto_convert=False, start_callback_server=False) # Import the classes used by PySpark java_import(gateway.jvm, "org.apache.spark.SparkConf") diff --git a/python/pyspark/streaming/context.py b/python/pyspark/streaming/context.py index a4900191d1730..04737243f3192 100644 --- a/python/pyspark/streaming/context.py +++ b/python/pyspark/streaming/context.py @@ -15,6 +15,8 @@ # limitations under the License. # +import time + from pyspark.conf import SparkConf from pyspark.files import SparkFiles from pyspark.java_gateway import launch_gateway @@ -60,6 +62,12 @@ def __init__(self, master=None, appName=None, sparkHome=None, pyFiles=None, @param duration: A L{Duration} Duration for SparkStreaming """ + + # launch call back server + if not gateway: + gateway = launch_gateway() +# gateway.restart_callback_server() + # Create the Python Sparkcontext self._sc = SparkContext(master=master, appName=appName, sparkHome=sparkHome, pyFiles=pyFiles, environment=environment, batchSize=batchSize,