From c60233aaf0c7dcde64ea122f7bcffa38b44e4b5c Mon Sep 17 00:00:00 2001 From: Holden Karau Date: Thu, 27 Mar 2014 16:14:11 -0700 Subject: [PATCH] Start investigating moving to iterators for python API like the Java/Scala one. tl;dr: We will have to write our own iterator since the default one doesn't pickle well --- python/pyspark/join.py | 2 +- python/pyspark/rdd.py | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/python/pyspark/join.py b/python/pyspark/join.py index 5f4294fb1b777..9feb4362dc469 100644 --- a/python/pyspark/join.py +++ b/python/pyspark/join.py @@ -88,5 +88,5 @@ def dispatch(seq): vbuf.append(v) elif n == 2: wbuf.append(v) - return (vbuf, wbuf) + return (iter(vbuf), iter(wbuf)) return vs.union(ws).groupByKey(numPartitions).mapValues(dispatch) diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py index fb27863e07f55..99693a4bff75f 100644 --- a/python/pyspark/rdd.py +++ b/python/pyspark/rdd.py @@ -1133,7 +1133,7 @@ def mergeCombiners(a, b): return a + b return self.combineByKey(createCombiner, mergeValue, mergeCombiners, - numPartitions) + numPartitions).mapValues(lambda x: iter(x)) # TODO: add tests def flatMapValues(self, f): @@ -1180,7 +1180,7 @@ def cogroup(self, other, numPartitions=None): >>> x = sc.parallelize([("a", 1), ("b", 4)]) >>> y = sc.parallelize([("a", 2)]) - >>> sorted(x.cogroup(y).collect()) + >>> sorted(list(x.cogroup(y).collect())) [('a', ([1], [2])), ('b', ([4], []))] """ return python_cogroup(self, other, numPartitions)