diff --git a/examples/src/main/python/streaming/network_wordcount.py b/examples/src/main/python/streaming/network_wordcount.py index 2bbb36a6b787e..f6fba4488e238 100644 --- a/examples/src/main/python/streaming/network_wordcount.py +++ b/examples/src/main/python/streaming/network_wordcount.py @@ -19,5 +19,7 @@ reduced_lines = mapped_lines.reduceByKey(add) reduced_lines.pyprint() + count_lines = mapped_lines.count() + count_lines.pyprint() ssc.start() ssc.awaitTermination() diff --git a/python/pyspark/streaming/dstream.py b/python/pyspark/streaming/dstream.py index 45a591db5a416..0049c1e7a0d5c 100644 --- a/python/pyspark/streaming/dstream.py +++ b/python/pyspark/streaming/dstream.py @@ -22,25 +22,23 @@ def count(self): """ """ - pass - #TODO: make sure count implementation, thiis different from what pyspark does - #return self._mapPartitions(lambda i: [sum(1 for _ in i)]).map(lambda x: (None, 1)) + # TODO: make sure count implementation, this different from what pyspark does + return self._mapPartitions(lambda i: [sum(1 for _ in i)])._sum() def _sum(self): """ """ - pass - #return self._mapPartitions(lambda x: [sum(x)]).reduce(operator.add) + return self._mapPartitions(lambda x: [sum(x)]).reduce(operator.add) def print_(self): """ - Since print is reserved name for python, we cannot make a print method function. + Since print is reserved name for python, we cannot define a print method function. This function prints serialized data in RDD in DStream because Scala and Java cannot - deserialized pickled python object. Please use DStream.pyprint() instead to print result. + deserialized pickled python object. Please use DStream.pyprint() instead to print results. Call DStream.print(). """ - #hack to call print function in DStream + # a hack to call print function in DStream getattr(self._jdstream, "print")() def filter(self, f): @@ -79,17 +77,23 @@ def _mapPartitionsWithIndex(self, f, preservesPartitioning=False): """ return PipelinedDStream(self, f, preservesPartitioning) + def reduce(self, func): + """ + + """ + return self.map(lambda x: (None, x)).reduceByKey(func, 1).map(lambda x: x[1]) + def reduceByKey(self, func, numPartitions=None): """ Merge the value for each key using an associative reduce function. This will also perform the merging locally on each mapper before - sending resuls to reducer, similarly to a "combiner" in MapReduce. + sending results to reducer, similarly to a "combiner" in MapReduce. Output will be hash-partitioned with C{numPartitions} partitions, or the default parallelism level if C{numPartitions} is not specified. """ - return self.combineByKey(lambda x:x, func, func, numPartitions) + return self.combineByKey(lambda x: x, func, func, numPartitions) def combineByKey(self, createCombiner, mergeValue, mergeCombiners, numPartitions = None): @@ -99,6 +103,7 @@ def combineByKey(self, createCombiner, mergeValue, mergeCombiners, """ if numPartitions is None: numPartitions = self._defaultReducePartitions() + def combineLocally(iterator): combiners = {} for x in iterator: @@ -116,6 +121,7 @@ def combineLocally(iterator): return combiners.iteritems() locally_combined = self._mapPartitions(combineLocally) shuffled = locally_combined.partitionBy(numPartitions) + def _mergeCombiners(iterator): combiners = {} for (k, v) in iterator: @@ -124,6 +130,7 @@ def _mergeCombiners(iterator): else: combiners[k] = mergeCombiners(combiners[k], v) return combiners.iteritems() + return shuffled._mapPartitions(_mergeCombiners) def partitionBy(self, numPartitions, partitionFunc=None):