-
Notifications
You must be signed in to change notification settings - Fork 28.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-1837] NumericRange should be partitioned in the same way as other... #776
Conversation
Can one of the admins verify this patch? |
var r = nr | ||
for (i <- 0 until numSlices) { | ||
for ((start, end) <- positions(nr.length, numSlices)) { | ||
val sliceSize = end - start | ||
slices += r.take(sliceSize).asInstanceOf[Seq[T]] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
scala> dr.take(1)
res11: scala.collection.immutable.NumericRange[Double] = NumericRange(1.0)
scala> dr.take(2)
res12: scala.collection.immutable.NumericRange[Double] = NumericRange(1.0)
scala> dr.take(3)
res13: scala.collection.immutable.NumericRange[Double] = NumericRange(1.0, 1.2)
scala> dr.take(4)
res14: scala.collection.immutable.NumericRange[Double] = NumericRange(1.0, 1.2, 1.4, 1.5999999999999999)
scala> lr.take(1)
res15: scala.collection.immutable.NumericRange[Long] = NumericRange(1)
scala> lr.take(2)
res16: scala.collection.immutable.NumericRange[Long] = NumericRange(1, 4)
scala> lr.take(3)
res17: scala.collection.immutable.NumericRange[Long] = NumericRange(1, 4, 7)
scala> lr.take(4)
res18: scala.collection.immutable.NumericRange[Long] = NumericRange(1, 4, 7)
(1D to 2D).by(0.2).take(2) => NumericRange(1.0)
why ? This is a bug?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks like a bug to me. This, in turn, is causing problems in RDD.
scala> sc.parallelize((1D to 2D).by(0.2), 2).collectPartitions
res15: Array[Array[Double]] = Array(Array(1.0, 1.2), Array(1.6, 1.8))
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This issue has been reported at Scala and is still open, https://issues.scala-lang.org/browse/SI-8518
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you want to wait on this to be fixed by Scala, or do you want to work around it for now?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd wait for Scala to fix it. That said, I'm open to work around (I just don't see one myself).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Alright, maybe we should wait for Scala then. By the way, for your original use case, was the range you wanted always (0 to numElements)? If so you can also try RDD.zipWithIndex.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@mateiz the use case wasn't mine, it was from reporter of SPARK-1817. Btw, I think this PR can be committed independent of Scala fix. It fixes the issue for other numeric ranges (e.g., Long), and will also work on Double once the Scala fix is in.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Won't this patch make it lose numbers out of Double ranges? Whereas the current implementation works.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@mateiz the current implementation would lose elements for all types of numeric ranges (including Long and Double) when we zip a numeric range with other sequences, because we partition numeric ranges differently from other sequences. This patch fixes it by partitioning numeric ranges at exactly the same indexes as we would on other sequences. However, we still depend on take
and drop
being implemented correctly on numeric ranges for things to work. The Scala bug affects take
and drop
on Double ranges, but not on other numeric ranges like Long (hence, the unit tests in this patch, which are based on Long ranges, are successful).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Okay, that makes sense then; I didn't realize that we were already using drop
and take
. In that case we should merge this patch as is and maybe create a JIRA for Double ranges so people see it's a known issue. Made one other small comment on the patch.
Jenkins, this is ok to test |
Merged build triggered. |
Merged build started. |
Merged build finished. All automated tests passed. |
All automated tests passed. |
@@ -117,6 +117,15 @@ private object ParallelCollectionRDD { | |||
if (numSlices < 1) { | |||
throw new IllegalArgumentException("Positive number of slices required") | |||
} | |||
// Sequences need to be sliced at same positions for operations | |||
// like RDD.zip() to behave as expected | |||
def positions(length: Long, numSlices: Int) = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This needs an explicit return type (e.g. : Seq[(Int, Int)]
)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually it would be better if this returned an Iterator, so that it doesn't materialize the whole sequence. You can do (0 until numSlices).iterator.map(...)
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This needs an explicit return type (e.g. : Seq[(Int, Int)])
For binary compatibility?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is just our style throughout the code. It makes it easier to avoid compatibility-breaking changes.
Updated patch based on @mateiz comments. |
Merged build triggered. |
Merged build started. |
Merged build finished. |
Refer to this link for build results: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/15392/ |
Jenkins, retest this please |
Merged build triggered. |
Merged build started. |
Merged build finished. All automated tests passed. |
All automated tests passed. |
@mateiz could you take another look at this when you get a chance? SPARK-1817 has been marked as resolved, but the fix for the original issue depends on this patch. Thx. |
Oh, sorry, I forgot to merge this after testing it. Jenkins, retest this please. |
Merged build triggered. |
Merged build started. |
Merged build finished. All automated tests passed. |
All automated tests passed. |
Alright, merged this. Thanks! |
…her... ... sequences Author: Kan Zhang <[email protected]> Closes apache#776 from kanzhang/SPARK-1837 and squashes the following commits: e48f018 [Kan Zhang] [SPARK-1837] code refactoring 67c33b5 [Kan Zhang] minor change 403f9b1 [Kan Zhang] [SPARK-1837] NumericRange should be partitioned in the same way as other sequences
…her... ... sequences Author: Kan Zhang <[email protected]> Closes apache#776 from kanzhang/SPARK-1837 and squashes the following commits: e48f018 [Kan Zhang] [SPARK-1837] code refactoring 67c33b5 [Kan Zhang] minor change 403f9b1 [Kan Zhang] [SPARK-1837] NumericRange should be partitioned in the same way as other sequences
... sequences