Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-22387][SQL] Propagate session configs to data source read/write options #19861

Closed
wants to merge 14 commits into from
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.sources.v2;

import org.apache.spark.annotation.InterfaceStability;
import org.apache.spark.sql.sources.v2.reader.DataSourceV2Reader;

import java.util.List;

/**
* A mix-in interface for {@link DataSourceV2}. Data sources can implement this interface to
* propagate session configs with chosen key-prefixes to the particular data source.
Copy link
Contributor

@cloud-fan cloud-fan Dec 13, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

propagate session configs with the specified key-prefix to all data source operations in this session

*/
@InterfaceStability.Evolving
public interface ConfigSupport {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: SessionConfigSupport


/**
* Create a list of key-prefixes, all session configs that match at least one of the prefixes
* will be propagated to the data source options.
*/
List<String> getConfigPrefixes();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we need to think about the current use cases and validate this API. E.g. CSV data source and JSON data source both accept an option columnNameOfCorruptRecord, or session config spark.sql.columnNameOfCorruptRecord. We get the following information:

  1. mostly session config maps to an existing option.
  2. session configs are always prefixed with spark.sql, we should not ask the data source to always specify it.
  3. do we really need to support more than one prefixes?

}
16 changes: 13 additions & 3 deletions sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,9 @@ import org.apache.spark.sql.execution.datasources.{DataSource, FailureSafeParser
import org.apache.spark.sql.execution.datasources.csv._
import org.apache.spark.sql.execution.datasources.jdbc._
import org.apache.spark.sql.execution.datasources.json.TextInputJsonDataSource
import org.apache.spark.sql.execution.datasources.v2.DataSourceV2ConfigSupport
import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
import org.apache.spark.sql.sources.v2.{DataSourceV2, DataSourceV2Options, ReadSupport, ReadSupportWithSchema}
import org.apache.spark.sql.sources.v2._
import org.apache.spark.sql.types.{StringType, StructType}
import org.apache.spark.unsafe.types.UTF8String

Expand Down Expand Up @@ -169,6 +170,8 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging {
option("path", path).load(Seq.empty: _*) // force invocation of `load(...varargs...)`
}

import DataSourceV2ConfigSupport._

/**
* Loads input in as a `DataFrame`, for data sources that support multiple paths.
* Only works if the source is a HadoopFsRelationProvider.
Expand All @@ -184,9 +187,16 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging {

val cls = DataSource.lookupDataSource(source)
if (classOf[DataSourceV2].isAssignableFrom(cls)) {
val options = new DataSourceV2Options(extraOptions.asJava)
val dataSource = cls.newInstance()
val options = dataSource match {
case cs: ConfigSupport =>
val confs = withSessionConfig(cs, sparkSession.sessionState.conf)
new DataSourceV2Options((confs ++ extraOptions).asJava)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What happened if they have duplicate names?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch! Should the confs in the extraOptions have a higher priority? WDYT @cloud-fan ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yea extraOptions needs higher priority.

case _ =>
new DataSourceV2Options(extraOptions.asJava)
}

val reader = (cls.newInstance(), userSpecifiedSchema) match {
val reader = (dataSource, userSpecifiedSchema) match {
case (ds: ReadSupportWithSchema, Some(schema)) =>
ds.createReader(schema, options)

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.execution.datasources.v2

import scala.collection.JavaConverters._
import scala.collection.immutable

import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.sources.v2.ConfigSupport

private[sql] object DataSourceV2ConfigSupport {

/**
* Helper method to filter session configs with config key that matches at least one of the given
* prefixes.
*
* @param cs the config key-prefixes that should be filtered.
* @param conf the session conf
* @return an immutable map that contains all the session configs that should be propagated to
* the data source.
*/
def withSessionConfig(
cs: ConfigSupport,
conf: SQLConf): immutable.Map[String, String] = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

in Scala Map by default refers to immutable.Map

val prefixes = cs.getConfigPrefixes
require(prefixes != null, "The config key-prefixes cann't be null.")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

double n

conf.getAllConfs.filterKeys { confKey =>
prefixes.asScala.exists(confKey.startsWith(_))
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ import test.org.apache.spark.sql.sources.v2._
import org.apache.spark.SparkException
import org.apache.spark.sql.{AnalysisException, QueryTest, Row}
import org.apache.spark.sql.catalyst.expressions.UnsafeRow
import org.apache.spark.sql.execution.datasources.v2.DataSourceV2ConfigSupport
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.sources.{Filter, GreaterThan}
import org.apache.spark.sql.sources.v2.reader._
import org.apache.spark.sql.test.SharedSQLContext
Expand All @@ -43,6 +45,21 @@ class DataSourceV2Suite extends QueryTest with SharedSQLContext {
}
}

test("simple implementation with config support") {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this doesn't need to be here, but just a new test suite for the new Utils

withSQLConf(SQLConf.PARQUET_SCHEMA_MERGING_ENABLED.key -> "false",
SQLConf.PARQUET_INT96_AS_TIMESTAMP.key -> "true",
SQLConf.PARALLEL_PARTITION_DISCOVERY_THRESHOLD.key -> "32",
SQLConf.PARALLEL_PARTITION_DISCOVERY_PARALLELISM.key -> "10000") {
val cs = classOf[DataSourceV2WithConfig].newInstance().asInstanceOf[ConfigSupport]
val confs = DataSourceV2ConfigSupport.withSessionConfig(cs, SQLConf.get)
assert(confs.size == 3)
assert(confs.keySet.filter(_.startsWith("spark.sql.parquet")).size == 2)
assert(confs.keySet.filter(
_.startsWith("spark.sql.sources.parallelPartitionDiscovery.threshold")).size == 1)
assert(confs.keySet.filter(_.startsWith("not.exist.prefix")).size == 0)
}
}

test("advanced implementation") {
Seq(classOf[AdvancedDataSourceV2], classOf[JavaAdvancedDataSourceV2]).foreach { cls =>
withClue(cls.getName) {
Expand Down Expand Up @@ -179,7 +196,14 @@ class SimpleReadTask(start: Int, end: Int) extends ReadTask[Row] with DataReader
override def close(): Unit = {}
}

class DataSourceV2WithConfig extends SimpleDataSourceV2 with ConfigSupport {

override def getConfigPrefixes: JList[String] = {
java.util.Arrays.asList(
"spark.sql.parquet",
"spark.sql.sources.parallelPartitionDiscovery.threshold")
}
}

class AdvancedDataSourceV2 extends DataSourceV2 with ReadSupport {

Expand Down