From 84511b548211085998d52db1e32021ae2546b06b Mon Sep 17 00:00:00 2001 From: Yuan Date: Mon, 27 Jun 2022 17:48:30 +0800 Subject: [PATCH] [NSE-979] fix data source (#993) * fix data source Signed-off-by: Yuan Zhou * fix Signed-off-by: Yuan Zhou * Fix ut issue Co-authored-by: philo --- .../sql/execution/datasources/arrow/ArrowFileFormat.scala | 2 +- .../datasources/v2/arrow/ArrowPartitionReaderFactory.scala | 3 ++- .../sql/execution/datasources/arrow/ArrowDataSourceTest.scala | 1 - 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/arrow-data-source/standard/src/main/scala/com/intel/oap/spark/sql/execution/datasources/arrow/ArrowFileFormat.scala b/arrow-data-source/standard/src/main/scala/com/intel/oap/spark/sql/execution/datasources/arrow/ArrowFileFormat.scala index 155ecc21c..50f8182c5 100644 --- a/arrow-data-source/standard/src/main/scala/com/intel/oap/spark/sql/execution/datasources/arrow/ArrowFileFormat.scala +++ b/arrow-data-source/standard/src/main/scala/com/intel/oap/spark/sql/execution/datasources/arrow/ArrowFileFormat.scala @@ -131,7 +131,7 @@ class ArrowFileFormat extends FileFormat with DataSourceRegister with Serializab // todo predicate validation / pushdown val parquetFileFields = factory.inspect().getFields.asScala val caseInsensitiveFieldMap = mutable.Map[String, String]() - val requiredFields = if (sqlConf.caseSensitiveAnalysis) { + val requiredFields = if (caseSensitive) { new Schema(requiredSchema.map { field => parquetFileFields.find(_.getName.equals(field.name)) .getOrElse(ArrowUtils.toArrowField(field)) diff --git a/arrow-data-source/standard/src/main/scala/com/intel/oap/spark/sql/execution/datasources/v2/arrow/ArrowPartitionReaderFactory.scala b/arrow-data-source/standard/src/main/scala/com/intel/oap/spark/sql/execution/datasources/v2/arrow/ArrowPartitionReaderFactory.scala index 0ff3a2d56..9618a185e 100644 --- a/arrow-data-source/standard/src/main/scala/com/intel/oap/spark/sql/execution/datasources/v2/arrow/ArrowPartitionReaderFactory.scala +++ b/arrow-data-source/standard/src/main/scala/com/intel/oap/spark/sql/execution/datasources/v2/arrow/ArrowPartitionReaderFactory.scala @@ -48,6 +48,7 @@ case class ArrowPartitionReaderFactory( private val batchSize = sqlConf.parquetVectorizedReaderBatchSize private val enableFilterPushDown: Boolean = sqlConf.arrowFilterPushDown + private val caseSensitive: Boolean = sqlConf.caseSensitiveAnalysis override def supportColumnarReads(partition: InputPartition): Boolean = true @@ -63,7 +64,7 @@ case class ArrowPartitionReaderFactory( partitionedFile.start, partitionedFile.length, options) val parquetFileFields = factory.inspect().getFields.asScala val caseInsensitiveFieldMap = mutable.Map[String, String]() - val requiredFields = if (sqlConf.caseSensitiveAnalysis) { + val requiredFields = if (caseSensitive) { new Schema(readDataSchema.map { field => parquetFileFields.find(_.getName.equals(field.name)) .getOrElse(ArrowUtils.toArrowField(field)) diff --git a/arrow-data-source/standard/src/test/scala/com/intel/oap/spark/sql/execution/datasources/arrow/ArrowDataSourceTest.scala b/arrow-data-source/standard/src/test/scala/com/intel/oap/spark/sql/execution/datasources/arrow/ArrowDataSourceTest.scala index c6c8ebe7f..e08396d64 100644 --- a/arrow-data-source/standard/src/test/scala/com/intel/oap/spark/sql/execution/datasources/arrow/ArrowDataSourceTest.scala +++ b/arrow-data-source/standard/src/test/scala/com/intel/oap/spark/sql/execution/datasources/arrow/ArrowDataSourceTest.scala @@ -306,7 +306,6 @@ class ArrowDataSourceTest extends QueryTest with SharedSparkSession { "id" } val df = spark.read - .schema(s"$selectColName long") .arrow(tempPath.getPath) .filter(s"$selectColName <= 2") checkAnswer(df, Row(0) :: Row(1) :: Row(2) :: Nil)