diff --git a/python/pyspark/java_gateway.py b/python/pyspark/java_gateway.py index c3fef42d118bd..db5b97f8472d1 100644 --- a/python/pyspark/java_gateway.py +++ b/python/pyspark/java_gateway.py @@ -23,6 +23,7 @@ import platform from subprocess import Popen, PIPE from threading import Thread + from py4j.java_gateway import java_import, JavaGateway, GatewayClient @@ -108,9 +109,6 @@ def run(self): java_import(gateway.jvm, "org.apache.spark.SparkConf") java_import(gateway.jvm, "org.apache.spark.api.java.*") java_import(gateway.jvm, "org.apache.spark.api.python.*") - java_import(gateway.jvm, "org.apache.spark.streaming.*") - java_import(gateway.jvm, "org.apache.spark.streaming.api.java.*") - java_import(gateway.jvm, "org.apache.spark.streaming.api.python.*") java_import(gateway.jvm, "org.apache.spark.mllib.api.python.*") java_import(gateway.jvm, "org.apache.spark.sql.SQLContext") java_import(gateway.jvm, "org.apache.spark.sql.hive.HiveContext") diff --git a/python/pyspark/streaming/context.py b/python/pyspark/streaming/context.py index 88e0cbbede1be..a647c9ec734df 100644 --- a/python/pyspark/streaming/context.py +++ b/python/pyspark/streaming/context.py @@ -22,6 +22,7 @@ from pyspark.streaming.dstream import DStream from py4j.java_collections import ListConverter +from py4j.java_gateway import java_import __all__ = ["StreamingContext"] @@ -72,7 +73,7 @@ def __init__(self, sparkContext, duration): should be set, either through the named parameters here or through C{conf}. @param sparkContext: L{SparkContext} object. - @param duration: A L{Duration} object or seconds for SparkStreaming. + @param duration: seconds for SparkStreaming. """ self._sc = sparkContext @@ -89,6 +90,9 @@ def _start_callback_server(self): gw._python_proxy_port = gw._callback_server.port # update port with real port def _initialize_context(self, sc, duration): + java_import(self._jvm, "org.apache.spark.streaming.*") + java_import(self._jvm, "org.apache.spark.streaming.api.java.*") + java_import(self._jvm, "org.apache.spark.streaming.api.python.*") return self._jvm.JavaStreamingContext(sc._jsc, self._jduration(duration)) def _jduration(self, seconds): @@ -217,7 +221,6 @@ def union(self, *dstreams): raise ValueError("should have at least one DStream to union") if len(dstreams) == 1: return dstreams[0] - self._check_serialzers(dstreams) first = dstreams[0] jrest = ListConverter().convert([d._jdstream for d in dstreams[1:]], SparkContext._gateway._gateway_client)