Skip to content

Commit

Permalink
broke min/max out into separate transaction, added to rdd.py
Browse files Browse the repository at this point in the history
  • Loading branch information
dwmclary committed Mar 14, 2014
1 parent 1e7056d commit ed67136
Showing 1 changed file with 18 additions and 59 deletions.
77 changes: 18 additions & 59 deletions python/pyspark/rdd.py
Original file line number Diff line number Diff line change
Expand Up @@ -537,6 +537,24 @@ def func(iterator):
# TODO: aggregate


def max(self):
"""
Find the maximum item in this RDD.
>>> sc.parallelize([1.0, 5.0, 43.0, 10.0]).max()
43.0
"""
return self.stats().max()

def min(self):
"""
Find the maximum item in this RDD.
>>> sc.parallelize([1.0, 5.0, 43.0, 10.0]).min()
1.0
"""
return self.stats().min()

def sum(self):
"""
Add up the elements in this RDD.
Expand Down Expand Up @@ -612,65 +630,6 @@ def sampleVariance(self):
"""
return self.stats().sampleVariance()

def _getBuckets(self, bucketCount):
#use the statscounter as a quick way of getting max and min
mm_stats = self.stats()
min = mm_stats.min()
max = mm_stats.max()

increment = (max-min)/bucketCount
buckets = range(min,min)
if increment != 0:
buckets = range(min,max, increment)

return {"min":min, "max":max, "buckets":buckets}

def histogram(self, bucketCount, buckets=None):
"""
Compute a histogram of the data using bucketCount number of buckets
evenly spaced between the min and max of the RDD.
>>> sc.parallelize([1,49, 23, 100, 12, 13, 20, 22, 75, 50]).histogram(3)
defaultdict(<type 'int'>, {(67, inf): 2, (1, 33): 6, (34, 66): 2})
"""
min = float("-inf")
max = float("inf")
evenBuckets = False
if not buckets:
b = self._getBuckets(bucketCount)
buckets = b["buckets"]
min = b["min"]
max = b["max"]

if len(buckets) < 2:
raise ValueError("requires more than 1 bucket")
if len(buckets) % 2 == 0:
evenBuckets = True
# histogram partition
def histogramPartition(iterator):
counters = defaultdict(int)
for obj in iterator:
k = bisect_right(buckets, obj)
if k < len(buckets) and k > 0:
key = (buckets[k-1], buckets[k]-1)
elif k == len(buckets):
key = (buckets[k-1], max)
elif k == 0:
key = (min, buckets[k]-1)
counters[key] += 1
yield counters

# merge counters
def mergeCounters(d1, d2):
for k in d2.keys():
if k in d1:
d1[k] += d2[k]
return d1

#map partitions(histogram_partition(bucketFunction)).reduce(mergeCounters)
return self.mapPartitions(histogramPartition).reduce(mergeCounters)


def countByValue(self):
"""
Return the count of each unique value in this RDD as a dictionary of
Expand Down

0 comments on commit ed67136

Please sign in to comment.