From 636090ac5323cdde6c72d48336b716693a80e010 Mon Sep 17 00:00:00 2001 From: giwa Date: Mon, 18 Aug 2014 13:24:17 -0700 Subject: [PATCH] added sparkContext as input parameter in StreamingContext --- python/pyspark/streaming/context.py | 16 ++++++++++------ 1 file changed, 10 insertions(+), 6 deletions(-) diff --git a/python/pyspark/streaming/context.py b/python/pyspark/streaming/context.py index e380626aa080b..3f455a3e06072 100644 --- a/python/pyspark/streaming/context.py +++ b/python/pyspark/streaming/context.py @@ -34,7 +34,7 @@ class StreamingContext(object): def __init__(self, master=None, appName=None, sparkHome=None, pyFiles=None, environment=None, batchSize=1024, serializer=PickleSerializer(), conf=None, - gateway=None, duration=None): + gateway=None, sparkContext=None, duration=None): """ Create a new StreamingContext. At least the master and app name and duration should be set, either through the named parameters here or through C{conf}. @@ -55,14 +55,18 @@ def __init__(self, master=None, appName=None, sparkHome=None, pyFiles=None, @param conf: A L{SparkConf} object setting Spark properties. @param gateway: Use an existing gateway and JVM, otherwise a new JVM will be instatiated. - @param duration: A L{Duration} Duration for SparkStreaming + @param sparkContext: L{SparkContext} object. + @param duration: A L{Duration} object for SparkStreaming. """ - # Create the Python Sparkcontext - self._sc = SparkContext(master=master, appName=appName, sparkHome=sparkHome, - pyFiles=pyFiles, environment=environment, batchSize=batchSize, - serializer=serializer, conf=conf, gateway=gateway) + if sparkContext is None: + # Create the Python Sparkcontext + self._sc = SparkContext(master=master, appName=appName, sparkHome=sparkHome, + pyFiles=pyFiles, environment=environment, batchSize=batchSize, + serializer=serializer, conf=conf, gateway=gateway) + else: + self._sc = sparkContext # Start py4j callback server. # Callback sever is need only by SparkStreming; therefore the callback sever