Skip to content

Commit

Permalink
implemented reduce and count function in Dstream
Browse files Browse the repository at this point in the history
  • Loading branch information
giwa committed Sep 20, 2014
1 parent b349649 commit 3c45cd2
Show file tree
Hide file tree
Showing 2 changed files with 19 additions and 10 deletions.
2 changes: 2 additions & 0 deletions examples/src/main/python/streaming/network_wordcount.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,5 +19,7 @@
reduced_lines = mapped_lines.reduceByKey(add)

reduced_lines.pyprint()
count_lines = mapped_lines.count()
count_lines.pyprint()
ssc.start()
ssc.awaitTermination()
27 changes: 17 additions & 10 deletions python/pyspark/streaming/dstream.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,25 +22,23 @@ def count(self):
"""
"""
pass
#TODO: make sure count implementation, thiis different from what pyspark does
#return self._mapPartitions(lambda i: [sum(1 for _ in i)]).map(lambda x: (None, 1))
# TODO: make sure count implementation, this different from what pyspark does
return self._mapPartitions(lambda i: [sum(1 for _ in i)])._sum()

def _sum(self):
"""
"""
pass
#return self._mapPartitions(lambda x: [sum(x)]).reduce(operator.add)
return self._mapPartitions(lambda x: [sum(x)]).reduce(operator.add)

def print_(self):
"""
Since print is reserved name for python, we cannot make a print method function.
Since print is reserved name for python, we cannot define a print method function.
This function prints serialized data in RDD in DStream because Scala and Java cannot
deserialized pickled python object. Please use DStream.pyprint() instead to print result.
deserialized pickled python object. Please use DStream.pyprint() instead to print results.
Call DStream.print().
"""
#hack to call print function in DStream
# a hack to call print function in DStream
getattr(self._jdstream, "print")()

def filter(self, f):
Expand Down Expand Up @@ -79,17 +77,23 @@ def _mapPartitionsWithIndex(self, f, preservesPartitioning=False):
"""
return PipelinedDStream(self, f, preservesPartitioning)

def reduce(self, func):
"""
"""
return self.map(lambda x: (None, x)).reduceByKey(func, 1).map(lambda x: x[1])

def reduceByKey(self, func, numPartitions=None):
"""
Merge the value for each key using an associative reduce function.
This will also perform the merging locally on each mapper before
sending resuls to reducer, similarly to a "combiner" in MapReduce.
sending results to reducer, similarly to a "combiner" in MapReduce.
Output will be hash-partitioned with C{numPartitions} partitions, or
the default parallelism level if C{numPartitions} is not specified.
"""
return self.combineByKey(lambda x:x, func, func, numPartitions)
return self.combineByKey(lambda x: x, func, func, numPartitions)

def combineByKey(self, createCombiner, mergeValue, mergeCombiners,
numPartitions = None):
Expand All @@ -99,6 +103,7 @@ def combineByKey(self, createCombiner, mergeValue, mergeCombiners,
"""
if numPartitions is None:
numPartitions = self._defaultReducePartitions()

def combineLocally(iterator):
combiners = {}
for x in iterator:
Expand All @@ -116,6 +121,7 @@ def combineLocally(iterator):
return combiners.iteritems()
locally_combined = self._mapPartitions(combineLocally)
shuffled = locally_combined.partitionBy(numPartitions)

def _mergeCombiners(iterator):
combiners = {}
for (k, v) in iterator:
Expand All @@ -124,6 +130,7 @@ def _mergeCombiners(iterator):
else:
combiners[k] = mergeCombiners(combiners[k], v)
return combiners.iteritems()

return shuffled._mapPartitions(_mergeCombiners)

def partitionBy(self, numPartitions, partitionFunc=None):
Expand Down

0 comments on commit 3c45cd2

Please sign in to comment.