From bcdec333146151da4777bd4df3b53f81483ad816 Mon Sep 17 00:00:00 2001 From: giwa Date: Wed, 13 Aug 2014 21:04:26 -0700 Subject: [PATCH] WIP: solved partitioned and None is not recognized --- python/pyspark/streaming/context.py | 20 +++++++++++++++++++- python/pyspark/streaming/dstream.py | 16 ++++++++++++++++ python/pyspark/streaming_tests.py | 23 +++++++++++++---------- 3 files changed, 48 insertions(+), 11 deletions(-) diff --git a/python/pyspark/streaming/context.py b/python/pyspark/streaming/context.py index eee298badcbad..32b52f74e16f0 100644 --- a/python/pyspark/streaming/context.py +++ b/python/pyspark/streaming/context.py @@ -154,7 +154,7 @@ def _testInputStream(self, test_inputs, numSlices=None): # Make sure we distribute data evenly if it's smaller than self.batchSize if "__len__" not in dir(test_input): - c = list(test_input) # Make it a list so we can compute its length + test_input = list(test_input) # Make it a list so we can compute its length batchSize = min(len(test_input) // numSlices, self._sc._batchSize) if batchSize > 1: serializer = BatchedSerializer(self._sc._unbatched_serializer, @@ -162,6 +162,7 @@ def _testInputStream(self, test_inputs, numSlices=None): else: serializer = self._sc._unbatched_serializer serializer.dump_stream(test_input, tempFile) + tempFile.close() tempFiles.append(tempFile.name) jtempFiles = ListConverter().convert(tempFiles, SparkContext._gateway._gateway_client) @@ -169,3 +170,20 @@ def _testInputStream(self, test_inputs, numSlices=None): jtempFiles, numSlices).asJavaDStream() return DStream(jinput_stream, self, PickleSerializer()) + + + def _testInputStream2(self, test_inputs, numSlices=None): + """ + This is inpired by QueStream implementation. Give list of RDD and generate DStream + which contain the RDD. + """ + test_rdds = list() + for test_input in test_inputs: + test_rdd = self._sc.parallelize(test_input, numSlices) + print test_rdd.glom().collect() + test_rdds.append(test_rdd._jrdd) + + jtest_rdds = ListConverter().convert(test_rdds, SparkContext._gateway._gateway_client) + jinput_stream = self._jvm.PythonTestInputStream2(self._jssc, jtest_rdds).asJavaDStream() + + return DStream(jinput_stream, self, BatchedSerializer(PickleSerializer())) diff --git a/python/pyspark/streaming/dstream.py b/python/pyspark/streaming/dstream.py index 02f35fac47ac0..240b2983b5b5d 100644 --- a/python/pyspark/streaming/dstream.py +++ b/python/pyspark/streaming/dstream.py @@ -246,6 +246,8 @@ def takeAndPrint(rdd, time): taken = rdd.take(11) print "-------------------------------------------" print "Time: %s" % (str(time)) + print rdd.glom().collect() + print "-------------------------------------------" print "-------------------------------------------" for record in taken[:10]: print record @@ -301,6 +303,20 @@ def get_output(rdd, time): self.foreachRDD(get_output) +# TODO: implement groupByKey +# TODO: impelment union +# TODO: implement cache +# TODO: implement persist +# TODO: implement repertitions +# TODO: implement saveAsTextFile +# TODO: implement cogroup +# TODO: implement join +# TODO: implement countByValue +# TODO: implement leftOuterJoin +# TODO: implemtnt rightOuterJoin + + + class PipelinedDStream(DStream): def __init__(self, prev, func, preservesPartitioning=False): if not isinstance(prev, PipelinedDStream) or not prev._is_pipelinable(): diff --git a/python/pyspark/streaming_tests.py b/python/pyspark/streaming_tests.py index 25ea350ca425f..e346bc227fe46 100644 --- a/python/pyspark/streaming_tests.py +++ b/python/pyspark/streaming_tests.py @@ -71,8 +71,9 @@ class TestBasicOperationsSuite(PySparkStreamingTestCase): """ def setUp(self): PySparkStreamingTestCase.setUp(self) - StreamOutput.result = list() self.timeout = 10 # seconds + self.numInputPartitions = 2 + self.result = list() def tearDown(self): PySparkStreamingTestCase.tearDown(self) @@ -137,6 +138,8 @@ def test_reduceByKey(self): test_input = [["a", "a", "b"], ["", ""], []] def test_func(dstream): + print "reduceByKey" + dstream.map(lambda x: (x, 1)).pyprint() return dstream.map(lambda x: (x, 1)).reduceByKey(operator.add) expected_output = [[("a", 2), ("b", 1)], [("", 2)], []] output = self._run_stream(test_input, test_func, expected_output) @@ -168,9 +171,8 @@ def test_glom(self): numSlices = 2 def test_func(dstream): - dstream.pyprint() return dstream.glom() - expected_output = [[[1,2], [3,4]],[[5,6], [7,8]],[[9,10], [11,12]]] + expected_output = [[[1,2], [3,4]], [[5,6], [7,8]], [[9,10], [11,12]]] output = self._run_stream(test_input, test_func, expected_output, numSlices) self.assertEqual(expected_output, output) @@ -180,20 +182,21 @@ def test_mapPartitions(self): numSlices = 2 def test_func(dstream): - dstream.pyprint() - return dstream.mapPartitions(lambda x: reduce(operator.add, x)) - expected_output = [[3, 7],[11, 15],[19, 23]] + def f(iterator): yield sum(iterator) + return dstream.mapPartitions(f) + expected_output = [[3, 7], [11, 15], [19, 23]] output = self._run_stream(test_input, test_func, expected_output, numSlices) self.assertEqual(expected_output, output) def _run_stream(self, test_input, test_func, expected_output, numSlices=None): """Start stream and return the output""" # Generate input stream with user-defined input - test_input_stream = self.ssc._testInputStream(test_input, numSlices) + numSlices = numSlices or self.numInputPartitions + test_input_stream = self.ssc._testInputStream2(test_input, numSlices) # Apply test function to stream test_stream = test_func(test_input_stream) # Add job to get output from stream - test_stream._test_output(StreamOutput.result) + test_stream._test_output(self.result) self.ssc.start() start_time = time.time() @@ -205,9 +208,9 @@ def _run_stream(self, test_input, test_func, expected_output, numSlices=None): break self.ssc.awaitTermination(50) # check if the output is the same length of expexted output - if len(expected_output) == len(StreamOutput.result): + if len(expected_output) == len(self.result): break - return StreamOutput.result + return self.result if __name__ == "__main__": unittest.main()