-
Notifications
You must be signed in to change notification settings - Fork 28.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-9689][SQL]Fix bug of not invalidate the cache for InsertIntoHadoopFsRelation #8023
Conversation
cc @liancheng |
(a, f) => | ||
toCatalystRDD(l, a, t.buildScan(a.map(_.name).toArray, f, t.paths, confBroadcast))) :: Nil | ||
(a, f) => { | ||
t.refresh() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Refresh the HadoopFsRelation
right before making the rdd.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, this is really not acceptable. We can't afford a refresh for every data source table scan... I missed those two similar refresh()
calls when merging #7696. I'm trying to figuring out a more reasonable fix for this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
After rethining about this, this can be removed, as the InsertIntoHadoopFsRelation
will refresh the file status for us. we don't need to do that any more.
no logs? |
retest this please |
Test build #40156 has finished for PR 8023 at commit
|
Test build #40174 has finished for PR 8023 at commit
|
This fixing probably cause some other failure, I will look at this tomorrow. |
@@ -92,14 +92,20 @@ private[sql] case class LogicalRDD( | |||
) | |||
} | |||
|
|||
private[sql] object PhysicalRDD { | |||
def apply(output: Seq[Attribute], rdd: RDD[InternalRow]): PhysicalRDD = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This can be made another constructor of PhysicalRDD
.
051c31e
to
94d6804
Compare
@@ -565,6 +565,7 @@ abstract class HadoopFsRelation private[sql](maybePartitionSpec: Option[Partitio | |||
filters: Array[Filter], | |||
inputPaths: Array[String], | |||
broadcastedConf: Broadcast[SerializableConfiguration]): RDD[Row] = { | |||
refresh() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@liancheng seems refresh the file status is unavoidable. let's do that right before getting the input files.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, I agree. Basically it's impossible to
- Create a temporary JSON table pointing to path
P
- Change the contents by arbitrary means without notifying the temporary table
- Read the table again and expect to get updated contents
In the old JSON relation implementation, the refreshing logic is done by TextInputFormat.listStatus()
, while the new JSONRelation
relies on HadoopFsRelation
. We can use SqlNewHadoopRDD
and override the input format there to inject the FileStatus
cache to avoid extra refreshing costs there. Similar to what we did in ParquetRelation
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree we'd better provide our own InputFormat
, at least we minimize the refreshed dir. But it probably requires lots of code change, can we do that in a separated PR?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
And we also need to refresh the partition directory before pruning the partition, probably we need to think more further how to fix that also. In the following PR(s).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, I'll probably work on this later this week, it can be relatively tricky to handle...
no logs? |
retest this please |
Test build #40389 has finished for PR 8023 at commit
|
@@ -183,6 +183,16 @@ private[sql] case class InMemoryRelation( | |||
batchStats).asInstanceOf[this.type] | |||
} | |||
|
|||
private[sql] def withChild(newChild: SparkPlan): this.type = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@yhuai @liancheng After double checking the source code, the spark plan of InMemoryRelation
is the PhysicalRDD
, which hold a data source scanning RDD instances as its property.
That's what I mean we will not take the latest files under the path when recache
method called, because the RDD
is materialized already and never been changed, this PR will re-created the SparkPlan from the logical plan, and the DataSourceStrategy
will rebuild the RDD based on the latest files.
See:
https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala#L99
https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala#L312
I've actually tried some other approaches for the fixing:
- Update the code of
PhyscialRDD
, to take the RDDBuilder instead of the RDD for as its property, however this failed due to widely impact the existed code. - Create a customized RDD, which take the path as parameter (instead of the file status), however, it's requires lots of interface changed in
HadoopFsRelation
, asinputFiles: Array[FileStatus]
is widely used forbuildScan
, particularly the partition pruning is done in theDataSourceStrategy
, not theHadoopFsRelation
.
I think this has been addressed in 2.0 with the introduction of refreshByPath. |
We didn't refresh the cache(CacheManager) in
InsertIntoHadoopFsRelation
, however, even by adding the fresh operation, I also noticed that the spark plan is immutable(inCacheData
), this is probably a bug if the underlying files are changed externally (added/deleted).So I make the the PhyscialRdd to mutable, and always create the new RDD whenever the
doExecute()
function is called.