Skip to content

Commit

Permalink
[SPARK-24230][SQL] Fix SpecificParquetRecordReaderBase with dictionar…
Browse files Browse the repository at this point in the history
…y filters.

## What changes were proposed in this pull request?

I missed this commit when preparing #21070.

When Parquet is able to filter blocks with dictionary filtering, the expected total value count to be too high in Spark, leading to an error when there were fewer than expected row groups to process. Spark should get the row groups from Parquet to pick up new filter schemes in Parquet like dictionary filtering.

## How was this patch tested?

Using in production at Netflix. Added test case for dictionary-filtered blocks.

Author: Ryan Blue <[email protected]>

Closes #21295 from rdblue/SPARK-24230-fix-parquet-block-tracking.

(cherry picked from commit 3469f5c)
Signed-off-by: Wenchen Fan <[email protected]>
  • Loading branch information
rdblue authored and cloud-fan committed May 24, 2018
1 parent 068c4ae commit f48d624
Show file tree
Hide file tree
Showing 2 changed files with 16 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,8 @@ public void initialize(InputSplit inputSplit, TaskAttemptContext taskAttemptCont
this.sparkSchema = StructType$.MODULE$.fromString(sparkRequestedSchemaString);
this.reader = new ParquetFileReader(
configuration, footer.getFileMetaData(), file, blocks, requestedSchema.getColumns());
for (BlockMetaData block : blocks) {
// use the blocks from the reader in case some do not match filters and will not be read
for (BlockMetaData block : reader.getRowGroups()) {
this.totalRowCount += block.getRowCount();
}

Expand Down Expand Up @@ -225,7 +226,8 @@ protected void initialize(String path, List<String> columns) throws IOException
this.sparkSchema = new ParquetToSparkSchemaConverter(config).convert(requestedSchema);
this.reader = new ParquetFileReader(
config, footer.getFileMetaData(), file, blocks, requestedSchema.getColumns());
for (BlockMetaData block : blocks) {
// use the blocks from the reader in case some do not match filters and will not be read
for (BlockMetaData block : reader.getRowGroups()) {
this.totalRowCount += block.getRowCount();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -879,6 +879,18 @@ class ParquetQuerySuite extends QueryTest with ParquetTest with SharedSQLContext
}
}
}

test("SPARK-24230: filter row group using dictionary") {
withSQLConf(("parquet.filter.dictionary.enabled", "true")) {
// create a table with values from 0, 2, ..., 18 that will be dictionary-encoded
withParquetTable((0 until 100).map(i => ((i * 2) % 20, s"data-$i")), "t") {
// search for a key that is not present so the dictionary filter eliminates all row groups
// Fails without SPARK-24230:
// java.io.IOException: expecting more rows but reached last block. Read 0 out of 50
checkAnswer(sql("SELECT _2 FROM t WHERE t._1 = 5"), Seq.empty)
}
}
}
}

object TestingUDT {
Expand Down

0 comments on commit f48d624

Please sign in to comment.