Skip to content

Commit

Permalink
FileSourceScanExec can have logicalRelation parameter on some distrib…
Browse files Browse the repository at this point in the history
…utions (NVIDIA#190)

* FileSourceScanExec can have logicalRelation parameter on some
distributions

* Update sql-plugin/src/main/scala/org/apache/spark/sql/rapids/GpuFileSourceScanExec.scala

Co-authored-by: Jason Lowe <[email protected]>

Co-authored-by: Thomas Graves <[email protected]>
Co-authored-by: Jason Lowe <[email protected]>
  • Loading branch information
3 people authored Jun 17, 2020
1 parent 5029b37 commit ee583e9
Showing 1 changed file with 22 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier}
import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression, SortOrder}
import org.apache.spark.sql.catalyst.plans.physical.Partitioning
import org.apache.spark.sql.execution.{DataSourceScanExec, ExplainUtils, FileSourceScanExec}
import org.apache.spark.sql.execution.datasources.{FileFormat, HadoopFsRelation}
import org.apache.spark.sql.execution.datasources.{FileFormat, HadoopFsRelation, LogicalRelation}
import org.apache.spark.sql.execution.datasources.csv.CSVFileFormat
import org.apache.spark.sql.execution.datasources.orc.OrcFileFormat
import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
Expand All @@ -44,14 +44,27 @@ case class GpuFileSourceScanExec(
override val tableIdentifier: Option[TableIdentifier])
extends DataSourceScanExec with GpuExec {

private[this] val wrapped: FileSourceScanExec = FileSourceScanExec(
relation,
output,
requiredSchema,
partitionFilters,
optionalBucketSet,
dataFilters,
tableIdentifier)
private[this] val wrapped: FileSourceScanExec = {
val tclass = classOf[org.apache.spark.sql.execution.FileSourceScanExec]
val constructors = tclass.getConstructors()
if (constructors.size > 1) {
throw new IllegalStateException(s"Only expected 1 constructor for FileSourceScanExec")
}
val constructor = constructors(0)
val instance = if (constructor.getParameterCount() == 8) {
// Some distributions of Spark modified FileSourceScanExec to take an additional parameter
// that is the logicalRelation. We don't know what its used for exactly but haven't
// run into any issues in testing using the one we create here.
@transient val logicalRelation = LogicalRelation(relation)
constructor.newInstance(relation, output, requiredSchema, partitionFilters,
optionalBucketSet, dataFilters, tableIdentifier,
logicalRelation).asInstanceOf[FileSourceScanExec]
} else {
constructor.newInstance(relation, output, requiredSchema, partitionFilters,
optionalBucketSet, dataFilters, tableIdentifier).asInstanceOf[FileSourceScanExec]
}
instance
}

override lazy val outputPartitioning: Partitioning = wrapped.outputPartitioning

Expand Down

0 comments on commit ee583e9

Please sign in to comment.