Skip to content

Commit

Permalink
[SPARK-15991] SparkContext.hadoopConfiguration should be always the b…
Browse files Browse the repository at this point in the history
…ase of hadoop conf created by SessionState

## What changes were proposed in this pull request?
Before this patch, after a SparkSession has been created, hadoop conf set directly to SparkContext.hadoopConfiguration will not affect the hadoop conf created by SessionState. This patch makes the change to always use SparkContext.hadoopConfiguration  as the base.

This patch also changes the behavior of hive-site.xml support added in #12689. With this patch, we will load hive-site.xml to SparkContext.hadoopConfiguration.

## How was this patch tested?
New test in SparkSessionBuilderSuite.

Author: Yin Huai <[email protected]>

Closes #13711 from yhuai/SPARK-15991.

(cherry picked from commit d9c6628)
Signed-off-by: Shixiong Zhu <[email protected]>
  • Loading branch information
yhuai authored and zsxwing committed Jun 17, 2016
1 parent 8f71388 commit b3678eb
Show file tree
Hide file tree
Showing 5 changed files with 28 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ private[sql] class SessionState(sparkSession: SparkSession) {
lazy val conf: SQLConf = new SQLConf

def newHadoopConf(): Configuration = {
val hadoopConf = new Configuration(sparkSession.sharedState.hadoopConf)
val hadoopConf = new Configuration(sparkSession.sparkContext.hadoopConfiguration)
conf.getAllConfs.foreach { case (k, v) => if (v ne null) hadoopConf.set(k, v) }
hadoopConf
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,23 +43,17 @@ private[sql] class SharedState(val sparkContext: SparkContext) extends Logging {
*/
val listener: SQLListener = createListenerAndUI(sparkContext)

/**
* The base hadoop configuration which is shared among all spark sessions. It is based on the
* default hadoop configuration of Spark, with custom configurations inside `hive-site.xml`.
*/
val hadoopConf: Configuration = {
val conf = new Configuration(sparkContext.hadoopConfiguration)
{
val configFile = Utils.getContextOrSparkClassLoader.getResource("hive-site.xml")
if (configFile != null) {
conf.addResource(configFile)
sparkContext.hadoopConfiguration.addResource(configFile)
}
conf
}

/**
* A catalog that interacts with external systems.
*/
lazy val externalCatalog: ExternalCatalog = new InMemoryCatalog(hadoopConf)
lazy val externalCatalog: ExternalCatalog = new InMemoryCatalog(sparkContext.hadoopConfiguration)

/**
* A classloader used to load all user-added jar.
Expand All @@ -71,7 +65,7 @@ private[sql] class SharedState(val sparkContext: SparkContext) extends Logging {
// Set the Hive metastore warehouse path to the one we use
val tempConf = new SQLConf
sparkContext.conf.getAll.foreach { case (k, v) => tempConf.setConfString(k, v) }
val hiveWarehouseDir = hadoopConf.get("hive.metastore.warehouse.dir")
val hiveWarehouseDir = sparkContext.hadoopConfiguration.get("hive.metastore.warehouse.dir")
if (hiveWarehouseDir != null && !tempConf.contains(SQLConf.WAREHOUSE_PATH.key)) {
// If hive.metastore.warehouse.dir is set and spark.sql.warehouse.dir is not set,
// we will respect the value of hive.metastore.warehouse.dir.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2870,8 +2870,4 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext {
sql(s"SELECT '$literal' AS DUMMY"),
Row(s"$expected") :: Nil)
}

test("SPARK-15887: hive-site.xml should be loaded") {
assert(spark.sessionState.newHadoopConf().get("hive.in.test") == "true")
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -102,4 +102,24 @@ class SparkSessionBuilderSuite extends SparkFunSuite {
assert(session.sparkContext.conf.get("key2") == "value2")
session.stop()
}

test("SPARK-15887: hive-site.xml should be loaded") {
val session = SparkSession.builder().master("local").getOrCreate()
assert(session.sessionState.newHadoopConf().get("hive.in.test") == "true")
assert(session.sparkContext.hadoopConfiguration.get("hive.in.test") == "true")
session.stop()
}

test("SPARK-15991: Set global Hadoop conf") {
val session = SparkSession.builder().master("local").getOrCreate()
val mySpecialKey = "my.special.key.15991"
val mySpecialValue = "msv"
try {
session.sparkContext.hadoopConfiguration.set(mySpecialKey, mySpecialValue)
assert(session.sessionState.newHadoopConf().get(mySpecialKey) == mySpecialValue)
} finally {
session.sparkContext.hadoopConfiguration.unset(mySpecialKey)
session.stop()
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,11 +36,12 @@ private[hive] class HiveSharedState(override val sparkContext: SparkContext)
*/
// This needs to be a lazy val at here because TestHiveSharedState is overriding it.
lazy val metadataHive: HiveClient = {
HiveUtils.newClientForMetadata(sparkContext.conf, hadoopConf)
HiveUtils.newClientForMetadata(sparkContext.conf, sparkContext.hadoopConfiguration)
}

/**
* A catalog that interacts with the Hive metastore.
*/
override lazy val externalCatalog = new HiveExternalCatalog(metadataHive, hadoopConf)
override lazy val externalCatalog =
new HiveExternalCatalog(metadataHive, sparkContext.hadoopConfiguration)
}

0 comments on commit b3678eb

Please sign in to comment.