Skip to content

Commit

Permalink
code review
Browse files Browse the repository at this point in the history
  • Loading branch information
ScrapCodes committed Apr 3, 2014
1 parent 2b1124d commit 35f86ba
Showing 1 changed file with 8 additions and 3 deletions.
11 changes: 8 additions & 3 deletions python/pyspark/rdd.py
Original file line number Diff line number Diff line change
Expand Up @@ -788,7 +788,7 @@ def takeOrdered(self, num, key=None):
>>> sc.parallelize([10, 1, 2, 9, 3, 4, 5, 6, 7]).takeOrdered(6)
[1, 2, 3, 4, 5, 6]
>>> sc.parallelize([10, 1, 2, 9, 3, 4, 5, 6, 7], 2).takeOrdered(6, key=lambda x: -x)
[(-10, 10), (-9, 9), (-7, 7), (-6, 6), (-5, 5), (-4, 4)]
[10, 9, 7, 6, 5, 4]
"""

def topNKeyedElems(iterator, key_=None):
Expand All @@ -799,10 +799,15 @@ def topNKeyedElems(iterator, key_=None):
q.insert(k)
yield q.getElements()

def unKey(x, key_=None):
if key_ != None:
x = [i[1] for i in x]
return x

def merge(a, b):
return next(topNKeyedElems(a + b))

return sorted(self.mapPartitions(lambda i: topNKeyedElems(i, key)).reduce(merge))
result = self.mapPartitions(lambda i: topNKeyedElems(i, key)).reduce(merge)
return sorted(unKey(result, key), key=key)


def take(self, num):
Expand Down

0 comments on commit 35f86ba

Please sign in to comment.