diff --git a/bin/pyspark b/bin/pyspark index 5142411e36974..118e6851af7a0 100755 --- a/bin/pyspark +++ b/bin/pyspark @@ -87,11 +87,7 @@ export PYSPARK_SUBMIT_ARGS if [[ -n "$SPARK_TESTING" ]]; then unset YARN_CONF_DIR unset HADOOP_CONF_DIR - if [[ -n "$PYSPARK_DOC_TEST" ]]; then - exec "$PYSPARK_PYTHON" -m doctest $1 - else - exec "$PYSPARK_PYTHON" $1 - fi + exec "$PYSPARK_PYTHON" $1 exit fi diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala index 034a90110af76..19cdbe679fd35 100644 --- a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala +++ b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala @@ -293,7 +293,7 @@ private class PythonException(msg: String, cause: Exception) extends RuntimeExce * Form an RDD[(Array[Byte], Array[Byte])] from key-value pairs returned from Python. * This is used by PySpark's shuffle operations. */ -private[spark] class PairwiseRDD(prev: RDD[Array[Byte]]) extends +private class PairwiseRDD(prev: RDD[Array[Byte]]) extends RDD[(Long, Array[Byte])](prev) { override def getPartitions = prev.partitions override def compute(split: Partition, context: TaskContext) = diff --git a/examples/src/main/python/streaming/network_wordcount.py b/examples/src/main/python/streaming/network_wordcount.py index 633e63172bad6..e3b6248c82a12 100644 --- a/examples/src/main/python/streaming/network_wordcount.py +++ b/examples/src/main/python/streaming/network_wordcount.py @@ -14,7 +14,7 @@ counts = lines.flatMap(lambda line: line.split(" "))\ .map(lambda word: (word, 1))\ .reduceByKey(lambda a, b: a+b) - counts.pyprint() + counts.pprint() ssc.start() ssc.awaitTermination() diff --git a/examples/src/main/python/streaming/wordcount.py b/examples/src/main/python/streaming/wordcount.py index c794711845af0..8c08ff0c89850 100644 --- a/examples/src/main/python/streaming/wordcount.py +++ b/examples/src/main/python/streaming/wordcount.py @@ -15,7 +15,7 @@ counts = lines.flatMap(lambda line: line.split(" "))\ .map(lambda x: (x, 1))\ .reduceByKey(lambda a, b: a+b) - counts.pyprint() + counts.pprint() ssc.start() ssc.awaitTermination() diff --git a/python/pyspark/accumulators.py b/python/pyspark/accumulators.py index ccbca67656c8d..9aa3db7ccf1dd 100644 --- a/python/pyspark/accumulators.py +++ b/python/pyspark/accumulators.py @@ -256,3 +256,8 @@ def _start_update_server(): thread.daemon = True thread.start() return server + + +if __name__ == "__main__": + import doctest + doctest.testmod() diff --git a/python/pyspark/serializers.py b/python/pyspark/serializers.py index 94bebc310bad6..e666dd9800256 100644 --- a/python/pyspark/serializers.py +++ b/python/pyspark/serializers.py @@ -526,3 +526,8 @@ def write_int(value, stream): def write_with_length(obj, stream): write_int(len(obj), stream) stream.write(obj) + + +if __name__ == "__main__": + import doctest + doctest.testmod() diff --git a/python/pyspark/streaming/dstream.py b/python/pyspark/streaming/dstream.py index acd9f27c46cbe..2653e75ccbc54 100644 --- a/python/pyspark/streaming/dstream.py +++ b/python/pyspark/streaming/dstream.py @@ -410,7 +410,7 @@ def fullOuterJoin(self, other, numPartitions=None): return self.transformWith(lambda a, b: a.fullOuterJoin(b, numPartitions), other) def _jtime(self, timestamp): - """ convert datetime or unix_timestamp into Time + """ Convert datetime or unix_timestamp into Time """ if isinstance(timestamp, datetime): timestamp = time.mktime(timestamp.timetuple()) diff --git a/python/pyspark/streaming/tests.py b/python/pyspark/streaming/tests.py index 0ef205754bb58..c547971cd7741 100644 --- a/python/pyspark/streaming/tests.py +++ b/python/pyspark/streaming/tests.py @@ -29,7 +29,6 @@ from pyspark.context import SparkContext from pyspark.streaming.context import StreamingContext -from pyspark.streaming.duration import Seconds class PySparkStreamingTestCase(unittest.TestCase): @@ -46,11 +45,6 @@ def setUp(self): def tearDown(self): self.ssc.stop() - @classmethod - def tearDownClass(cls): - # Make sure tp shutdown the callback server - SparkContext._gateway._shutdown_callback_server() - def _test_func(self, input, func, expected, sort=False): """ @param input: dataset for the test. This should be list of lists. diff --git a/python/pyspark/streaming/util.py b/python/pyspark/streaming/util.py index 02b51dc472c51..885411ed63936 100644 --- a/python/pyspark/streaming/util.py +++ b/python/pyspark/streaming/util.py @@ -64,3 +64,8 @@ def rddToFileName(prefix, suffix, time): return prefix + "-" + str(time) else: return prefix + "-" + str(time) + "." + suffix + + +if __name__ == "__main__": + import doctest + doctest.testmod() diff --git a/python/run-tests b/python/run-tests index 5aa9212c8adc1..e8796838c22c1 100755 --- a/python/run-tests +++ b/python/run-tests @@ -48,6 +48,39 @@ function run_test() { fi } +function run_core_tests() { + run_test "pyspark/conf.py" + run_test "pyspark/context.py" + run_test "pyspark/broadcast.py" + run_test "pyspark/accumulators.py" + run_test "pyspark/serializers.py" + run_test "pyspark/shuffle.py" + run_test "pyspark/rdd.py" + run_test "pyspark/tests.py" +} + +function run_sql_tests() { + run_test "pyspark/sql.py" +} + +function run_mllib_tests() { + run_test "pyspark/mllib/util.py" + run_test "pyspark/mllib/linalg.py" + run_test "pyspark/mllib/classification.py" + run_test "pyspark/mllib/clustering.py" + run_test "pyspark/mllib/random.py" + run_test "pyspark/mllib/recommendation.py" + run_test "pyspark/mllib/regression.py" + run_test "pyspark/mllib/stat.py" + run_test "pyspark/mllib/tree.py" + run_test "pyspark/mllib/tests.py" +} + +function run_streaming_tests() { + run_test "pyspark/streaming/util.py" + run_test "pyspark/streaming/tests.py" +} + echo "Running PySpark tests. Output is in python/unit-tests.log." export PYSPARK_PYTHON="python" @@ -60,30 +93,10 @@ fi echo "Testing with Python version:" $PYSPARK_PYTHON --version -run_test "pyspark/rdd.py" -run_test "pyspark/context.py" -run_test "pyspark/conf.py" -run_test "pyspark/sql.py" -# These tests are included in the module-level docs, and so must -# be handled on a higher level rather than within the python file. -export PYSPARK_DOC_TEST=1 -run_test "pyspark/broadcast.py" -run_test "pyspark/accumulators.py" -run_test "pyspark/serializers.py" -unset PYSPARK_DOC_TEST -run_test "pyspark/shuffle.py" -run_test "pyspark/tests.py" -run_test "pyspark/mllib/classification.py" -run_test "pyspark/mllib/clustering.py" -run_test "pyspark/mllib/linalg.py" -run_test "pyspark/mllib/random.py" -run_test "pyspark/mllib/recommendation.py" -run_test "pyspark/mllib/regression.py" -run_test "pyspark/mllib/stat.py" -run_test "pyspark/mllib/tests.py" -run_test "pyspark/mllib/tree.py" -run_test "pyspark/mllib/util.py" -run_test "pyspark/streaming/tests.py" +#run_core_tests +#run_sql_tests +#run_mllib_tests +run_streaming_tests # Try to test with PyPy if [ $(which pypy) ]; then @@ -91,20 +104,10 @@ if [ $(which pypy) ]; then echo "Testing with PyPy version:" $PYSPARK_PYTHON --version - run_test "pyspark/rdd.py" - run_test "pyspark/context.py" - run_test "pyspark/conf.py" - run_test "pyspark/sql.py" - # These tests are included in the module-level docs, and so must - # be handled on a higher level rather than within the python file. - export PYSPARK_DOC_TEST=1 - run_test "pyspark/broadcast.py" - run_test "pyspark/accumulators.py" - run_test "pyspark/serializers.py" - unset PYSPARK_DOC_TEST - run_test "pyspark/shuffle.py" - run_test "pyspark/tests.py" - run_test "pyspark/streaming/tests.py" + run_core_tests + run_sql_tests + run_mllib_tests + run_streaming_tests fi if [[ $FAILED == 0 ]]; then