-
Notifications
You must be signed in to change notification settings - Fork 28.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-19426][SQL] Custom coalesce for Dataset #16766
Conversation
* Returns a new RDD that has exactly `numPartitions` partitions. | ||
*/ | ||
case class CoalesceLogical(numPartitions: Int, partitionCoalescer: Option[PartitionCoalescer], | ||
child: LogicalPlan) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you follow the styles documented in https://github.com/databricks/scala-style-guide?
@@ -823,6 +825,17 @@ case class Repartition(numPartitions: Int, shuffle: Boolean, child: LogicalPlan) | |||
} | |||
|
|||
/** | |||
* Returns a new RDD that has exactly `numPartitions` partitions. | |||
*/ | |||
case class CoalesceLogical(numPartitions: Int, partitionCoalescer: Option[PartitionCoalescer], |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
CoalesceLogical
-> Coalesce
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Main reason is there was already a Coalesce expression class
Could you please also add a few test cases? For example, |
I'd second that. I'd be interested to know if this implementation changes behavior for |
CoalesceLogical(numPartitions, partitionCoalescer, logicalPlan) | ||
} | ||
|
||
def coalesce(numPartitions: Int): Dataset[T] = coalesce(numPartitions, None) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please also add the function description, like what we did in the other functions in Dataset.scala?
case class CoalesceExec(numPartitions: Int, child: SparkPlan) extends UnaryExecNode { | ||
case class CoalesceExec(numPartitions: Int, child: SparkPlan, | ||
partitionCoalescer: Option[PartitionCoalescer] | ||
) extends UnaryExecNode { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The same indent issue here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
case class CoalesceExec(
numPartitions: Int,
child: SparkPlan,
partitionCoalescer: Option[PartitionCoalescer]) extends UnaryExecNode {
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you guys have a .scalafmt.conf that applies all of this? that should make things cleaner.
import org.apache.spark.sql.catalyst.{CatalystConf, TableIdentifier} | ||
import scala.collection.mutable.ArrayBuffer |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Useless?
@@ -823,6 +825,17 @@ case class Repartition(numPartitions: Int, shuffle: Boolean, child: LogicalPlan) | |||
} | |||
|
|||
/** | |||
* Returns a new RDD that has exactly `numPartitions` partitions. | |||
*/ | |||
case class CoalesceLogical(numPartitions: Int, partitionCoalescer: Option[PartitionCoalescer], |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The name still looks inconsistent with the others. How about PartitionCoalesce
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
that sounds good
@@ -823,6 +825,17 @@ case class Repartition(numPartitions: Int, shuffle: Boolean, child: LogicalPlan) | |||
} | |||
|
|||
/** | |||
* Returns a new RDD that has exactly `numPartitions` partitions. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This description is not right.
*/ | ||
case class CoalesceLogical(numPartitions: Int, partitionCoalescer: Option[PartitionCoalescer], | ||
child: LogicalPlan) | ||
extends UnaryNode { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Style issues:
case class PartitionCoalesce(
numPartitions: Int,
partitionCoalescer: Option[PartitionCoalescer],
child: LogicalPlan) extends UnaryNode {
@@ -19,9 +19,8 @@ package org.apache.spark.sql.execution | |||
|
|||
import scala.concurrent.{ExecutionContext, Future} | |||
import scala.concurrent.duration.Duration | |||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add it back?
val data = (1 to 1000).map(i => ClassData(i.toString, i)) | ||
data.toDS().repartition(10).write.format("csv").save(path.toString) | ||
|
||
val ds = spark.read.format("csv").load(path.toString).as[ClassData] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
cannot resolve '`a`' given input columns: [_c0, _c1];
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh right csv doesn't do headers.
// Similar to the implementation of `test("custom RDD coalescer")` from [[RDDSuite]] we first | ||
// write out to disk, to ensure that our splits are in fact [[FileSplit]] instances. | ||
val data = (1 to 1000).map(i => ClassData(i.toString, i)) | ||
data.toDS().repartition(10).write.format("csv").save(path.toString) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
use WithTempPath
to generate the path?
|
||
after { | ||
Utils.deleteRecursively(path) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No need to do it, if you use withTempPath
. This is an example
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ah thanks. I looked at the writer tests
assert(splitSizeSum <= maxSplitSize) | ||
}) | ||
assert(totalPartitionCount == 10) | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: Remove this empty line.
@felixcheung This does not touch any of the coalesce internals. Only allows setting a partitionCoalescer similar to what is already available in rdd.coalesce |
Sorry for the late reply. @mariusvniekerk Could you please update the PR? |
ok to test |
Test build #77969 has finished for PR 16766 at commit
|
Let me rebase this. I don't currently have a clean way of testing this on Windows |
Could you run the following four commands to do a local test in your local environment?
|
d4bde0b
to
00b2a7a
Compare
Test build #78212 has finished for PR 16766 at commit
|
Test build #78213 has finished for PR 16766 at commit
|
Test build #78218 has finished for PR 16766 at commit
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actual javadoc errors are as below:
[error] /home/jenkins/workspace/SparkPullRequestBuilder/sql/core/target/java/org/apache/spark/sql/Dataset.java:2222: error: reference not found
[error] * A {@link PartitionCoalescer} can also be supplied allowing the behavior of the partitioning to be
[error] ^
[error] /home/jenkins/workspace/SparkPullRequestBuilder/sql/core/target/java/org/apache/spark/sql/Dataset.java:2223: error: reference not found
[error] * customized similar to {@link RDD.coalesce}.
[error]
@@ -2603,12 +2603,27 @@ class Dataset[T] private[sql]( | |||
* current upstream partitions will be executed in parallel (per whatever | |||
* the current partitioning is). | |||
* | |||
* A [[PartitionCoalescer]] can also be supplied allowing the behavior of the partitioning to be | |||
* customized similar to [[RDD.coalesce]]. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it should be [[org.apache.spark.rdd.RDD##coalesce]]
.
@@ -2603,12 +2603,27 @@ class Dataset[T] private[sql]( | |||
* current upstream partitions will be executed in parallel (per whatever | |||
* the current partitioning is). | |||
* | |||
* A [[PartitionCoalescer]] can also be supplied allowing the behavior of the partitioning to be |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sounds this trait is unable to be generated as is in Java. Simply wrapping `...`
would be fine.
Hi @mariusvniekerk, would you be able to fix the javadoc errors? |
cc @maropu Do you want to take this over? |
@gatorsmile Sure! I'll do, Thanks! |
What changes were proposed in this pull request?
This adds support for using the PartitionCoalescer features added in #11865 (SPARK-14042) to the Dataset API
How was this patch tested?
Manual tests