-
Notifications
You must be signed in to change notification settings - Fork 28.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-22387][SQL] Propagate session configs to data source read/write options #19861
Conversation
Test build #84377 has finished for PR 19861 at commit
|
cc @cloud-fan |
* @return an immutable map that contains all the session configs that should be propagated to | ||
* the data source. | ||
*/ | ||
def withSessionConfig( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These helper functions need to be moved to org.apache.spark.sql.execution.datasources.v2 package. This will be called by SQL API code path.
Another more straightforward option is to provide it by ConfigSupport
. WDYT? cc @cloud-fan
val options = dataSource match { | ||
case cs: ConfigSupport => | ||
val confs = withSessionConfig(cs, sparkSession.sessionState.conf) | ||
new DataSourceV2Options((confs ++ extraOptions).asJava) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What happened if they have duplicate names?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good catch! Should the confs in the extraOptions
have a higher priority? WDYT @cloud-fan ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yea extraOptions
needs higher priority.
* Create a list of key-prefixes, all session configs that match at least one of the prefixes | ||
* will be propagated to the data source options. | ||
*/ | ||
List<String> getConfigPrefixes(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we need to think about the current use cases and validate this API. E.g. CSV data source and JSON data source both accept an option columnNameOfCorruptRecord
, or session config spark.sql.columnNameOfCorruptRecord
. We get the following information:
- mostly session config maps to an existing option.
- session configs are always prefixed with
spark.sql
, we should not ask the data source to always specify it. - do we really need to support more than one prefixes?
Test build #84433 has finished for PR 19861 at commit
|
Test build #84563 has finished for PR 19861 at commit
|
Test build #84565 has finished for PR 19861 at commit
|
Test build #84603 has finished for PR 19861 at commit
|
retest this please |
Test build #84607 has finished for PR 19861 at commit
|
source: String, | ||
conf: SQLConf): immutable.Map[String, String] = { | ||
val prefixes = cs.getConfigPrefixes | ||
require(prefixes != null, "The config key-prefixes cann't be null.") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
double n
require(prefixes != null, "The config key-prefixes cann't be null.") | ||
val mapping = cs.getConfigMapping.asScala | ||
val validOptions = cs.getValidOptions | ||
require(validOptions != null, "The valid options list cann't be null.") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
double n
Test build #84712 has finished for PR 19861 at commit
|
retest this please |
Test build #84745 has finished for PR 19861 at commit
|
retest this please |
Test build #84776 has finished for PR 19861 at commit
|
* propagate session configs with chosen key-prefixes to the particular data source. | ||
*/ | ||
@InterfaceStability.Evolving | ||
public interface ConfigSupport { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: SessionConfigSupport
|
||
/** | ||
* A mix-in interface for {@link DataSourceV2}. Data sources can implement this interface to | ||
* propagate session configs with chosen key-prefixes to the particular data source. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
propagate session configs with the specified key-prefix to all data source operations in this session
* `spark.datasource.$name`, turn `spark.datasource.$name.xxx -> yyy` into | ||
* `xxx -> yyy`, and propagate them to all data source operations in this session. | ||
*/ | ||
String name(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
how about keyPrefix
Test build #84941 has finished for PR 19861 at commit
|
val keyPrefix = cs.keyPrefix() | ||
require(keyPrefix != null, "The data source config key prefix can't be null.") | ||
|
||
val pattern = Pattern.compile(s"^spark\\.datasource\\.$keyPrefix\\.(.*)") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: (.*)
-> (.+)
. Just to forbid some corner case like spark.datasource.$keyPrefix.
Test build #84952 has finished for PR 19861 at commit
|
retest this please |
Test build #84961 has finished for PR 19861 at commit
|
retest this please |
SparkR test is pretty flaky recently... |
Test build #84965 has finished for PR 19861 at commit
|
retest this please |
Test build #84988 has finished for PR 19861 at commit
|
retest this please |
Test build #84995 has finished for PR 19861 at commit
|
retest this please |
Test build #85049 has finished for PR 19861 at commit
|
retest this please |
Test build #85060 has finished for PR 19861 at commit
|
thanks, merging to master! |
@jiangxb1987, @cloud-fan, what was the use case you needed to add this for? |
With
|
@jiangxb1987, I understand what this does. I just wanted an example use case where it was necessary. What was the motivating use case? |
@rdblue This is also what we already have for built-in data sources, e.g. |
Basically we want per-query options, which can be specified via |
Thanks for the example, @cloud-fan. |
…e options Introduce a new interface `SessionConfigSupport` for `DataSourceV2`, it can help to propagate session configs with the specified key-prefix to all data source operations in this session. Add new test suite `DataSourceV2UtilsSuite`. Author: Xingbo Jiang <[email protected]> Closes apache#19861 from jiangxb1987/datasource-configs.
What changes were proposed in this pull request?
Introduce a new interface
SessionConfigSupport
forDataSourceV2
, it can help to propagate session configs with the specified key-prefix to all data source operations in this session.How was this patch tested?
Add new test suite
DataSourceV2UtilsSuite
.