From 5bfcbfdbdce31d640f9bdf6e874b7f916e89e7ce Mon Sep 17 00:00:00 2001 From: lazyman Date: Mon, 6 Apr 2015 23:26:34 +0800 Subject: [PATCH] move spark.sql.hive.verifyPartitionPath to SQLConf,fix scala style --- sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala | 6 ++++++ .../main/scala/org/apache/spark/sql/hive/TableReader.scala | 6 +++--- .../org/apache/spark/sql/hive/QueryPartitionSuite.scala | 2 +- 3 files changed, 10 insertions(+), 4 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala index 4815620c6fe57..ee641bdfeb2d7 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala @@ -39,6 +39,8 @@ private[spark] object SQLConf { val PARQUET_FILTER_PUSHDOWN_ENABLED = "spark.sql.parquet.filterPushdown" val PARQUET_USE_DATA_SOURCE_API = "spark.sql.parquet.useDataSourceApi" + val HIVE_VERIFY_PARTITIONPATH = "spark.sql.hive.verifyPartitionPath" + val COLUMN_NAME_OF_CORRUPT_RECORD = "spark.sql.columnNameOfCorruptRecord" val BROADCAST_TIMEOUT = "spark.sql.broadcastTimeout" @@ -119,6 +121,10 @@ private[sql] class SQLConf extends Serializable { private[spark] def parquetUseDataSourceApi = getConf(PARQUET_USE_DATA_SOURCE_API, "true").toBoolean + /** When true uses verifyPartitionPath to prune the path which is not exists. */ + private[spark] def verifyPartitionPath = + getConf(HIVE_VERIFY_PARTITIONPATH, "true").toBoolean + /** When true the planner will use the external sort, which may spill to disk. */ private[spark] def externalSortEnabled: Boolean = getConf(EXTERNAL_SORT, "false").toBoolean diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala index b451713279381..b871c58e545f7 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala @@ -147,7 +147,7 @@ class HadoopTableReader( def verifyPartitionPath( partitionToDeserializer: Map[HivePartition, Class[_ <: Deserializer]]): Map[HivePartition, Class[_ <: Deserializer]] = { - if (!sc.getConf("spark.sql.hive.verifyPartitionPath", "true").toBoolean) { + if (!sc.conf.verifyPartitionPath) { partitionToDeserializer } else { var existPathSet = collection.mutable.Set[String]() @@ -158,9 +158,9 @@ class HadoopTableReader( val pathPattern = new Path(pathPatternStr) val fs = pathPattern.getFileSystem(sc.hiveconf) val matches = fs.globStatus(pathPattern) - matches.map(fileStatus => existPathSet += fileStatus.getPath.toString) + matches.foreach(fileStatus => existPathSet += fileStatus.getPath.toString) } - // convert /demo/data/year/month/day to /demo/data/**/**/**/ + // convert /demo/data/year/month/day to /demo/data/*/*/*/ def getPathPatternByPath(parNum: Int, tempPath: Path): String = { var path = tempPath for (i <- (1 to parNum)) path = path.getParent diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/QueryPartitionSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/QueryPartitionSuite.scala index 8b90bfb19fde8..83f97128c5e83 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/QueryPartitionSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/QueryPartitionSuite.scala @@ -61,4 +61,4 @@ class QueryPartitionSuite extends QueryTest { sql("DROP TABLE table_with_partition") sql("DROP TABLE createAndInsertTest") } -} \ No newline at end of file +}