Skip to content

Commit

Permalink
Code review comments.
Browse files Browse the repository at this point in the history
  • Loading branch information
ScrapCodes committed Apr 2, 2014
1 parent 49e6ba7 commit e8a08e2
Showing 1 changed file with 40 additions and 29 deletions.
69 changes: 40 additions & 29 deletions python/pyspark/rdd.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@
from threading import Thread
import warnings
import heapq
import bisect

from pyspark.serializers import NoOpSerializer, CartesianDeserializer, \
BatchedSerializer, CloudPickleSerializer, PairDeserializer, pack_long
Expand Down Expand Up @@ -95,55 +94,70 @@ def __exit__(self, type, value, tb):
class MaxHeapQ(object):
"""
An implementation of MaxHeap.
>>> import pyspark.rdd
>>> heap = pyspark.rdd.MaxHeapQ(5)
>>> [heap.insert(i) for i in range(10)]
[None, None, None, None, None, None, None, None, None, None]
>>> sorted(heap.getElements())
[0, 1, 2, 3, 4]
>>> heap = pyspark.rdd.MaxHeapQ(5)
>>> [heap.insert(i) for i in range(9, -1, -1)]
[None, None, None, None, None, None, None, None, None, None]
>>> sorted(heap.getElements())
[0, 1, 2, 3, 4]
>>> heap = pyspark.rdd.MaxHeapQ(1)
>>> [heap.insert(i) for i in range(9, -1, -1)]
[None, None, None, None, None, None, None, None, None, None]
>>> heap.getElements()
[0]
"""
def __init__(self):

def __init__(self, maxsize):
# we start from q[1], this makes calculating children as trivial as 2 * k
self.q = [0]

self.maxsize = maxsize

def _swim(self, k):
while (k > 1) and (self.q[k/2] < self.q[k]):
self._swap(k, k/2)
k = k/2

def _swap(self, i, j):
t = self.q[i]
self.q[i] = self.q[j]
self.q[j] = t

def _sink(self, k):
N=len(self.q)-1
while 2*k <= N:
j = 2*k
N = self.size()
while 2 * k <= N:
j = 2 * k
# Here we test if both children are greater than parent
# if not swap with larger one.
if j<N and self.q[j] < self.q[j+1]:
j = j+1
if j < N and self.q[j] < self.q[j + 1]:
j = j + 1
if(self.q[k] > self.q[j]):
break
self._swap(k, j)
k = j

def size(self):
return len(self.q) - 1

def insert(self, value):
self.q.append(value)
self._swim(len(self.q) - 1)
if (self.size()) < self.maxsize:
self.q.append(value)
self._swim(self.size())
else:
self._replaceRoot(value)

def getQ(self):
def getElements(self):
return self.q[1:]

def replaceRoot(self, value):
def _replaceRoot(self, value):
if(self.q[1] > value):
self.q[1] = value
self._sink(1)

def delMax(self):
r = self.q[1]
self.q[1] = self.q[len(self.q) - 1]
self.q.pop()
self._sink(1)
return r

class RDD(object):
"""
A Resilient Distributed Dataset (RDD), the basic abstraction in Spark.
Expand Down Expand Up @@ -778,15 +792,12 @@ def takeOrdered(self, num, key=None):
"""

def topNKeyedElems(iterator, key_=None):
q = MaxHeapQ()
q = MaxHeapQ(num)
for k in iterator:
if not (key_ == None):
if key_ != None:
k = (key_(k), k)
if (len(q.q) -1) < num:
q.insert(k)
else:
q.replaceRoot(k)
yield q.getQ()
q.insert(k)
yield q.getElements()

def merge(a, b):
return next(topNKeyedElems(a + b))
Expand Down

0 comments on commit e8a08e2

Please sign in to comment.