Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-2871] [PySpark] add RDD.lookup(key) #2093

Closed
wants to merge 6 commits into from
Closed

Conversation

davies
Copy link
Contributor

@davies davies commented Aug 22, 2014

RDD.lookup(key)

    Return the list of values in the RDD for key `key`. This operation
    is done efficiently if the RDD has a known partitioner by only
    searching the partition that the key maps to.

    >>> l = range(1000)
    >>> rdd = sc.parallelize(zip(l, l), 10)
    >>> rdd.lookup(42)  # slow
    [42]
    >>> sorted = rdd.sortByKey()
    >>> sorted.lookup(42)  # fast
    [42]

It also clean up the code in RDD.py, and fix several bugs (related to preservesPartitioning).

@SparkQA
Copy link

SparkQA commented Aug 22, 2014

QA tests have started for PR 2093 at commit eb1305d.

  • This patch merges cleanly.

@SparkQA
Copy link

SparkQA commented Aug 22, 2014

QA tests have finished for PR 2093 at commit eb1305d.

  • This patch passes unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@mattf
Copy link
Contributor

mattf commented Aug 23, 2014

are you planning to add tests for this?

@davies
Copy link
Contributor Author

davies commented Aug 23, 2014

The doc tests should covered all the code paths, do we still need more tests?

@SparkQA
Copy link

SparkQA commented Aug 23, 2014

QA tests have started for PR 2093 at commit be0e8ba.

  • This patch merges cleanly.

@SparkQA
Copy link

SparkQA commented Aug 23, 2014

QA tests have finished for PR 2093 at commit be0e8ba.

  • This patch passes unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@mattf
Copy link
Contributor

mattf commented Aug 23, 2014

The doc tests should covered all the code paths, do we still need more tests?

it's worth including a lookup for 1000 or 1234, which won't be found

@davies
Copy link
Contributor Author

davies commented Aug 23, 2014

@mattf I had added a test case for it, thx.

I had do much refactor in this PR, please re-review it, thanks.

@SparkQA
Copy link

SparkQA commented Aug 23, 2014

QA tests have started for PR 2093 at commit 0f1bce8.

  • This patch merges cleanly.

@SparkQA
Copy link

SparkQA commented Aug 23, 2014

QA tests have finished for PR 2093 at commit 0f1bce8.

  • This patch passes unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

>>> sc.parallelize([]).reduce(add)
Traceback (most recent call last):
...
ValueError: Can not reduce() of empty RDD
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor nit, but I'd drop the 'of' and just say "Cannot reduce() empty RDD"

@JoshRosen
Copy link
Contributor

This PR is great. I like how you systematically addressed all of the bugs in preservesPartitioning; the minor cleanups are good, too.

It looks like there's a merge conflict due to me merging another one of your patches. Once you fix that and address my comments, I'll merge this into master (and hopefully branch-1.1, since the partitioner preservation could be a huge performance win).

@davies
Copy link
Contributor Author

davies commented Aug 24, 2014

Jenkins, test this please.

@SparkQA
Copy link

SparkQA commented Aug 24, 2014

QA tests have started for PR 2093 at commit 2871b80.

  • This patch merges cleanly.

@SparkQA
Copy link

SparkQA commented Aug 24, 2014

QA tests have started for PR 2093 at commit 2871b80.

  • This patch merges cleanly.

@SparkQA
Copy link

SparkQA commented Aug 24, 2014

QA tests have finished for PR 2093 at commit 2871b80.

  • This patch passes unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • class BoundedFloat(float):
    • class JoinedRow2 extends Row
    • class JoinedRow3 extends Row
    • class JoinedRow4 extends Row
    • class JoinedRow5 extends Row
    • class GenericRow(protected[sql] val values: Array[Any]) extends Row
    • abstract class MutableValue extends Serializable
    • final class MutableInt extends MutableValue
    • final class MutableFloat extends MutableValue
    • final class MutableBoolean extends MutableValue
    • final class MutableDouble extends MutableValue
    • final class MutableShort extends MutableValue
    • final class MutableLong extends MutableValue
    • final class MutableByte extends MutableValue
    • final class MutableAny extends MutableValue
    • final class SpecificMutableRow(val values: Array[MutableValue]) extends MutableRow
    • case class CountDistinct(expressions: Seq[Expression]) extends PartialAggregate
    • case class CollectHashSet(expressions: Seq[Expression]) extends AggregateExpression
    • case class CollectHashSetFunction(
    • case class CombineSetsAndCount(inputSet: Expression) extends AggregateExpression
    • case class CombineSetsAndCountFunction(
    • case class CountDistinctFunction(
    • case class MaxOf(left: Expression, right: Expression) extends Expression
    • case class NewSet(elementType: DataType) extends LeafExpression
    • case class AddItemToSet(item: Expression, set: Expression) extends Expression
    • case class CombineSets(left: Expression, right: Expression) extends BinaryExpression
    • case class CountSet(child: Expression) extends UnaryExpression

@SparkQA
Copy link

SparkQA commented Aug 24, 2014

QA tests have finished for PR 2093 at commit 2871b80.

  • This patch passes unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • class BoundedFloat(float):
    • class JoinedRow2 extends Row
    • class JoinedRow3 extends Row
    • class JoinedRow4 extends Row
    • class JoinedRow5 extends Row
    • class GenericRow(protected[sql] val values: Array[Any]) extends Row
    • abstract class MutableValue extends Serializable
    • final class MutableInt extends MutableValue
    • final class MutableFloat extends MutableValue
    • final class MutableBoolean extends MutableValue
    • final class MutableDouble extends MutableValue
    • final class MutableShort extends MutableValue
    • final class MutableLong extends MutableValue
    • final class MutableByte extends MutableValue
    • final class MutableAny extends MutableValue
    • final class SpecificMutableRow(val values: Array[MutableValue]) extends MutableRow
    • case class CountDistinct(expressions: Seq[Expression]) extends PartialAggregate
    • case class CollectHashSet(expressions: Seq[Expression]) extends AggregateExpression
    • case class CollectHashSetFunction(
    • case class CombineSetsAndCount(inputSet: Expression) extends AggregateExpression
    • case class CombineSetsAndCountFunction(
    • case class CountDistinctFunction(
    • case class MaxOf(left: Expression, right: Expression) extends Expression
    • case class NewSet(elementType: DataType) extends LeafExpression
    • case class AddItemToSet(item: Expression, set: Expression) extends Expression
    • case class CombineSets(left: Expression, right: Expression) extends BinaryExpression
    • case class CountSet(child: Expression) extends UnaryExpression

@mattf
Copy link
Contributor

mattf commented Aug 24, 2014

i'm never a fan of code reformatting, whitespace changes or refactoring at the same time as functional changes, e.g. if (k, v) => if k, v; v if k not in m else func(m[k], v) => func(m[k], v) if k in m else v; MaxHeapQ; if not x: return None -> if x: return y

while all those are good changes, it's good practice (and future you me and others will thank you) to bundle those changes w/ their own justification, for instance -

question: why did you change to using count() instead of collect() to force evaluation?

@davies
Copy link
Contributor Author

davies commented Aug 24, 2014

@mattf While I was scanning down the whole file line by line in order to find out all the issues related to persersesPartitioning, reformatting them in the same time, if some lines did not looks nice to me. It's a completely personal judgement, so maybe it does not make sense to others.

It's not a good idea to do this kind of reformatting in in PR, I also was thinking of do it as a separated PR or do not dot it if we have no necessary reason.

Should I remove these not-related changes?

@davies
Copy link
Contributor Author

davies commented Aug 24, 2014

It's supposed that count() will cheaper than collect(), we call count() instead of collect() to trigger the calculation in Scala/Java, It's better to keep the same style in Python.

But in PySpark, count() depends on collect(), which will dump the result into disks and load them into Python. In future, this is maybe changed, count() will returned a number from JVM.

Right now, no strong reason to change collect() to count(), revert it?

@SparkQA
Copy link

SparkQA commented Aug 24, 2014

QA tests have started for PR 2093 at commit 1789cd4.

  • This patch merges cleanly.

@SparkQA
Copy link

SparkQA commented Aug 24, 2014

QA tests have finished for PR 2093 at commit 1789cd4.

  • This patch passes unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • $FWDIR/bin/spark-submit --class org.apache.spark.repl.Main "$
    • $FWDIR/bin/spark-submit --class org.apache.spark.repl.Main "$

@JoshRosen
Copy link
Contributor

@mattf I agree with you that it's generally a good idea to separate functional vs. cosmetic changes (I've weighed in against several PRs that only perform minor code formatting changes, especially ones that touch tens or hundreds of files).

Ideally, this PR would have only made the necessary changes for lookup(), since that makes things easier to review. However, I think we do want to eventually make most of the other changes, so I don't mind trying to review those changes here (the alternative is having to review another PR, which creates more work for me now that I've already reviewed the code here).

@JoshRosen
Copy link
Contributor

@ScrapCodes Can you explain the original motivation for MaxHeapQ? Is there a reason why you used it instead of heapq.nsmallest? If there's a corner-case that the old code supported but that this PR breaks, can you submit a failing test?

@ScrapCodes
Copy link
Member

Simply because, heapq is min heap implementation and to track "top N" I needed a Max heap. #97 has some discussion. Let me see if I can come up with something.

@ScrapCodes
Copy link
Member

Hey so I think heapq.nsmallest does the same thing. It should be correct to use it, quick look at its source, I was doubtful about its performance since it has a sort call in each nsmallest. For larger N "I think", MaxHeapQ will perform better. What do you think ?

@davies
Copy link
Contributor Author

davies commented Aug 26, 2014

Only for N (top) > S (total), nsmallest() will sort, it's faster than heap, because it's implemented in C.

For N << S, nsmallest() will faster than MaxHeapQ, because it's implemented in C (_heapq).

nsmallest() is stable, for the same key, the values will keep in the order as them in the original RDD.

nsmallest(n, it, key=None)/nlargest(n, it, key=None) are available since Python 2.5.

@davies
Copy link
Contributor Author

davies commented Aug 26, 2014

btw, MaxHeapQ is actually a min heap, why we call it MaxHeapQ ?

@ScrapCodes
Copy link
Member

Only for N (top) > S (total), nsmallest() will sort, it's faster than heap, because it's implemented in C.

My point was MaxHeap implementation does not need to sort. For Most values it may just replace the top. Or at worst logN swaps. ? But I think even though sort is C based implementation, it would be NlogN (at best). My python knowledge is definitely limited. I will be happy to be corrected.

btw, MaxHeapQ is actually a min heap, why we call it MaxHeapQ ?

From what I have learnt, MaxHeap keeps largest element at top. ? Is that wrong ?

@davies
Copy link
Contributor Author

davies commented Aug 26, 2014

If N (top) > S(total), sort() will be SlogS, MaxHeapQ will keep all of them, so it's also S * logS (logS is swap), they have same CPU time in algorithm. (heap is also a kind of algorithm used for sorting)

@ScrapCodes Sorry, I misunderstood it. your are right, it's max heap.

@davies
Copy link
Contributor Author

davies commented Aug 26, 2014

Some quick benchmark:

    >>> from pyspark.rdd import MaxHeapQ
    >>> import heapq, random, timeit
    >>> l = range(1<<13)
    >>> random.shuffle(l)
    >>>
    >>> def take1():
    >>>     q = MaxHeapQ(100)
    >>>     for i in l:
    >>>         q.insert(i)
    >>>     return q.getElements()
    >>>
    >>> def take2():
    >>>     return heapq.nsmallest(100, l)
    >>> # for S >> N
    >>> print timeit.timeit("take1()", "from __main__ import *", number=100)
    0.748146057129
    >>> print timeit.timeit("take2()", "from __main__ import *", number=100)
    0.142593860626
    >>> # for N > S
    >>> l = range(80)
    >>> random.shuffle(l)
    >>> print timeit.timeit("take1()", "from __main__ import *", number=1000)
    0.156821012497
    >>> print timeit.timeit("take2()", "from __main__ import *", number=1000)
    0.00907206535339

Whenever S < N or S > N, nsmallest() is much faster than MaxHeapQ.

@davies
Copy link
Contributor Author

davies commented Aug 26, 2014

In PyPy 2.3, the result is reversed, MaxHeapQ is 3x faster than nsmallest(). They are both implemented in pure Python, nsmallest() does more than MaxHeapQ, it will make it stable.

If nsmallest() does not do stable sort, MaxHeapQ is still 30% faster than nsmallest(), because it will try to call le if no lt (MaxHeapQ will fail if object has no gt).

BTW, I admire that PyPy do very well in optimizing these algorithm.

@ScrapCodes
Copy link
Member

Looks like I understand what you mean now, I am still curious to try to benchmark the takeOrdered function with both approaches. There is definitely 3x difference of C vs python implementation(w/o pypy). Can we use PyPy with spark ?

@mattf
Copy link
Contributor

mattf commented Aug 26, 2014

@mattf While I was scanning down the whole file line by line in order to find out all the issues related to persersesPartitioning, reformatting them in the same time, if some lines did not looks nice to me. It's a completely personal judgement, so maybe it does not make sense to others.

It's not a good idea to do this kind of reformatting in in PR, I also was thinking of do it as a separated PR or do not dot it if we have no necessary reason.

Should I remove these not-related changes?

if it were up to me, i'd say yes. it's not though, so i'll go with the flow.

i'm still trying to get a feel for what the spark community likes in its PRs and JIRAs.

@mattf
Copy link
Contributor

mattf commented Aug 26, 2014

It's supposed that count() will cheaper than collect(), we call count() instead of collect() to trigger the calculation in Scala/Java, It's better to keep the same style in Python.

But in PySpark, count() depends on collect(), which will dump the result into disks and load them into Python. In future, this is maybe changed, count() will returned a number from JVM.

Right now, no strong reason to change collect() to count(), revert it?

thank you for the explaination, it wasn't clear from the code. my preference is for isolated changes, so i'd suggest reverting and doing it separately. however, others may not agree. so i'd say at least add a comment about why count() is used -- someone might come along and change it back to collect() without knowing they shouldn't.

@davies
Copy link
Contributor Author

davies commented Aug 26, 2014

@mattf I'm working on Spark since recently, also trying to follow the process as others, and made some mistakes sometime, hope that I will do better, thanks.

There are many things needed to do (especially to 1.1 release) recently, quality and process are both important things we should take care of.

Could we merge this, maybe it can catch the last train of 1.1?

@davies
Copy link
Contributor Author

davies commented Aug 26, 2014

@ScrapCodes I had ran pyspark with PyPy successfully, will send out PR and some benchmarks later.

@mattf
Copy link
Contributor

mattf commented Aug 26, 2014

Could we merge this, maybe it can catch the last train of 1.1?

i understand, but not my call.

i'll probably have a stronger opinion in coming weeks smile

@ScrapCodes
Copy link
Member

@davies Thanks, just saw your patch.

@JoshRosen
Copy link
Contributor

This looks good to me. Thanks for discussing MaxHeapQ and verifying that it's behavior is the same as nsmallest.

I'm going to merge this into master for now. Thanks!

@asfgit asfgit closed this in 4fa2fda Aug 27, 2014
xiliu82 pushed a commit to xiliu82/spark that referenced this pull request Sep 4, 2014
RDD.lookup(key)

        Return the list of values in the RDD for key `key`. This operation
        is done efficiently if the RDD has a known partitioner by only
        searching the partition that the key maps to.

        >>> l = range(1000)
        >>> rdd = sc.parallelize(zip(l, l), 10)
        >>> rdd.lookup(42)  # slow
        [42]
        >>> sorted = rdd.sortByKey()
        >>> sorted.lookup(42)  # fast
        [42]

It also clean up the code in RDD.py, and fix several bugs (related to preservesPartitioning).

Author: Davies Liu <[email protected]>

Closes apache#2093 from davies/lookup and squashes the following commits:

1789cd4 [Davies Liu] `f` in foreach could be generator or not.
2871b80 [Davies Liu] Merge branch 'master' into lookup
c6390ea [Davies Liu] address all comments
0f1bce8 [Davies Liu] add test case for lookup()
be0e8ba [Davies Liu] fix preservesPartitioning
eb1305d [Davies Liu] add RDD.lookup(key)
@davies davies deleted the lookup branch September 15, 2014 22:18
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants