-
Notifications
You must be signed in to change notification settings - Fork 28.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-24230][SQL] Fix SpecificParquetRecordReaderBase with dictionary filters. #21295
[SPARK-24230][SQL] Fix SpecificParquetRecordReaderBase with dictionary filters. #21295
Conversation
…ers. Filtered blocks were causing the expected total value count to be too high, which led to an error when there were fewer than expected row groups to process.
Test build #90472 has finished for PR 21295 at commit
|
Test build #90474 has finished for PR 21295 at commit
|
@gatorsmile and @dongjoon-hyun, can you take a look at this? It is a commit I missed for the Parquet 1.10.0 update. |
@@ -879,6 +879,18 @@ class ParquetQuerySuite extends QueryTest with ParquetTest with SharedSQLContext | |||
} | |||
} | |||
} | |||
|
|||
test("SPARK-24230: filter row group using dictionary") { | |||
withSQLConf(("parquet.filter.dictionary.enabled", "true")) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this a valid way to control this configuration? It seems to pass with false
, too.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is the default, so it is possible that it isn't getting passed to Parquet correctly. I can debug it at some point to find out why it passes with false
. I did make sure that the test case fails without the fix, so we know it should be correctly using dictionary filtering. Well, that or there were other cases that had the same problem and this hits one of those.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If there is someone who leads this correctly, it's you, @rdblue . :)
I knew that this is the default of parquet. With the patch of SpecificParquetRecordReaderBase.java
or not, we should not add no-op
invalid conf line like withSQLConf
here. It's misleading for the whole Spark community for the future. Please debug and add the correct test case here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think this is misleading. The dictionary filter needs to be on and there's no guarantee from Parquet that the default will continue to be true.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That was my point, too. This configuration is needed, but this code doesn't do anything for that. To use this configuration correctly, we need to fix it first. We should not have no-op code like this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We are unable to pass the parquet-specific parameter through withSQLConf
. Below shows the way to pass the parquet option.
withTable("t1") {
spark.createDataFrame((0 until 100).map(i => ((i * 2) % 20, s"data-$i"))).write
.option("parquet.filter.dictionary.enabled", false).saveAsTable("t1")
checkAnswer(sql("SELECT _2 FROM t1 WHERE t1._1 = 5"), Seq.empty)
}
Could you help investigate why we still hit the error [without the fix] when we set parquet.filter.dictionary.enabled
to false
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
using SQLConf is OK, Spark will put all SQL configs to Hadoop conf, which will be accessed by parquet writer at the executor side.
I'm also curious about why turning parquet.filter.dictionary.enabled
off can't avoid this bug.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks like the problem is a bug in Parquet. It is using the stats property instead of the dictionary property. This is minor because there is almost no reason to turn either one off now that we've built more confidence in the filters.
@@ -147,7 +147,8 @@ public void initialize(InputSplit inputSplit, TaskAttemptContext taskAttemptCont | |||
this.sparkSchema = StructType$.MODULE$.fromString(sparkRequestedSchemaString); | |||
this.reader = new ParquetFileReader( | |||
configuration, footer.getFileMetaData(), file, blocks, requestedSchema.getColumns()); | |||
for (BlockMetaData block : blocks) { | |||
// use the blocks from the reader in case some do not match filters and will not be read |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you be more specific by mentioning the corresponding Parquet JIRA issue or versions (1.10.0)?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure what you mean here. This is a problem entirely in Spark because Spark is reaching into Parquet internals for its vectorized support. There's no Parquet issue to reference.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What I mean is this patch is logically okay, but only valid for master branch, Spark 2.4 with Parquet 1.10.0
. For example, the test case will pass in branch-2.3
without this patch because it uses Parquet 1.8.X. As you mentioned, it would be great if we had included this patch in #21070.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually, it is fine and more correct for this to be ported to older versions. I doubt it will because it is unnecessary though.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It looks correct to me, too. However, this comment isn't clear.
- If the comment is correct only in Parquet 1.10.0, please fix the comment.
- If the comment is correct in general, the failure should occur in Apache Spark 2.3.X (with old Parquet). Why don't we fix that in 2.3.1? This was my original suggestion.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, we will need to backport this to the 2.3.x line. No rush to make it for 2.3.1 though, since dictionary filtering is off by default and this isn't a correctness problem.
My understanding is that this doesn't affect 2.3 at all? |
So far, I guess so, @vanzin . |
@@ -225,7 +226,8 @@ protected void initialize(String path, List<String> columns) throws IOException | |||
this.sparkSchema = new ParquetToSparkSchemaConverter(config).convert(requestedSchema); | |||
this.reader = new ParquetFileReader( | |||
config, footer.getFileMetaData(), file, blocks, requestedSchema.getColumns()); | |||
for (BlockMetaData block : blocks) { | |||
// use the blocks from the reader in case some do not match filters and will not be read | |||
for (BlockMetaData block : reader.getRowGroups()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this is an existing issue, does your test case fail on Spark 2.3 too?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Dictionary filtering is off by default in 1.8.x. It was enabled after we built confidence in its correctness in 1.9.x.
We should backport this fix to 2.3.x also, but the only downside to not having it is that dictionary filtering will throw an exception when it is enabled. So the feature just isn't available.
Thanks for looking at this, everyone. Sorry for the delay in updating it, I'm currently out on paternity leave and don't have a lot of time. I'll get an update pushed sometime soon though. |
Thanks for your investigation! Also, congratulations! |
@rdblue congrats! All my concerns have been addressed, I think it's ready to merge, also cc @michal-databricks |
retest this please |
Congratulation, @rdblue ! :) |
Test build #91080 has finished for PR 21295 at commit
|
retest this please |
Test build #91086 has finished for PR 21295 at commit
|
retest this please |
Test build #91097 has finished for PR 21295 at commit
|
thanks, merging to master/2.3! |
…y filters. ## What changes were proposed in this pull request? I missed this commit when preparing #21070. When Parquet is able to filter blocks with dictionary filtering, the expected total value count to be too high in Spark, leading to an error when there were fewer than expected row groups to process. Spark should get the row groups from Parquet to pick up new filter schemes in Parquet like dictionary filtering. ## How was this patch tested? Using in production at Netflix. Added test case for dictionary-filtered blocks. Author: Ryan Blue <[email protected]> Closes #21295 from rdblue/SPARK-24230-fix-parquet-block-tracking. (cherry picked from commit 3469f5c) Signed-off-by: Wenchen Fan <[email protected]>
Thanks for merging, @cloud-fan, and thanks for the reviews everyone! I've opened PARQUET-1309 to track the Parquet fix for the properties. |
What changes were proposed in this pull request?
I missed this commit when preparing #21070.
When Parquet is able to filter blocks with dictionary filtering, the expected total value count to be too high in Spark, leading to an error when there were fewer than expected row groups to process. Spark should get the row groups from Parquet to pick up new filter schemes in Parquet like dictionary filtering.
How was this patch tested?
Using in production at Netflix. Added test case for dictionary-filtered blocks.