-
Notifications
You must be signed in to change notification settings - Fork 234
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Pass metadata extractors to FileScanRDD [databricks] #10616
Pass metadata extractors to FileScanRDD [databricks] #10616
Conversation
Signed-off-by: Raza Jafri <[email protected]>
metadataColumns: Seq[AttributeReference] = Seq.empty): RDD[InternalRow] = { | ||
if (relation.isDefined) { | ||
new FileScanRDD(sparkSession, readFunction, filePartitions, readDataSchema, metadataColumns, | ||
metadataExtractors = relation.get.fileFormat.fileConstantMetadataExtractors) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if we pass relation
to access fileFormat
to access fileConstantMetadataExtractors
and only the latter is shim-specific should we just pass fileFormat
as an option to getFileScanRDD
?
if (relation.isDefined) { | ||
new FileScanRDD(sparkSession, readFunction, filePartitions, readDataSchema, metadataColumns, | ||
metadataExtractors = relation.get.fileFormat.fileConstantMetadataExtractors) | ||
} else { | ||
new FileScanRDD(sparkSession, readFunction, filePartitions, readDataSchema, metadataColumns) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Typically in Scala calling .get
on an Option
is an anti-pattern.
if (relation.isDefined) { | |
new FileScanRDD(sparkSession, readFunction, filePartitions, readDataSchema, metadataColumns, | |
metadataExtractors = relation.get.fileFormat.fileConstantMetadataExtractors) | |
} else { | |
new FileScanRDD(sparkSession, readFunction, filePartitions, readDataSchema, metadataColumns) | |
} | |
new FileScanRDD(sparkSession, readFunction, filePartitions, readDataSchema, metadataColumns, | |
relation.map(_.fileFormat.fileConstantMetadataExtractors).getOrElse(Map.empty)) |
@@ -78,6 +78,7 @@ trait SparkShims { | |||
readFunction: (PartitionedFile) => Iterator[InternalRow], | |||
filePartitions: Seq[FilePartition], | |||
readDataSchema: StructType, | |||
relation: Option[HadoopFsRelation], |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if you move this arg to the last position and give it a default value None, you probably will have fewer lines to modify
Thanks for the review and suggestions. PTAL again |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM but need to update copyrights
can either use a pre-commit hook or invoke directly
export SPARK_RAPIDS_AUTO_COPYRIGHTER=ON
git diff origin/branch-24.04..HEAD --name-status | \
awk '/^M\s+/ { print $2}' | \
xargs ./scripts/auto-copyrighter.sh
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Update copyright
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Update copyright year
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
build |
This PR handles the change that was made in Spark. We are passing through the metdata extractors from the fileFormat to the FileScanRDD.
Changes
fixes #8766