-
Notifications
You must be signed in to change notification settings - Fork 28.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-15689][SQL] data source v2 read path #19136
Conversation
* 2. propagate information upward to Spark, e.g., report statistics, report ordering, etc. | ||
* Spark first applies all operator push down optimizations which this data source supports. Then | ||
* Spark collects information this data source provides for further optimizations. Finally Spark | ||
* issues the scan request and does the actual data reading. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
TODO: this is not true now, as we push down operators at the planning phase. We need to do some refactor and move it to the optimizing phase.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This would be really nice imho.
Test build #81412 has finished for PR 19136 at commit
|
543a40b
to
a824d44
Compare
Test build #81441 has finished for PR 19136 at commit
|
retest this please |
Test build #81466 has finished for PR 19136 at commit
|
Thanks for pinging me. I left comments on the older PR, since other discussion was already there. If you'd prefer comments here, just let me know. |
a824d44
to
89cbfb7
Compare
/** | ||
* A variant of `DataSourceV2` which requires users to provide a schema when reading data. A data | ||
* source can inherit both `DataSourceV2` and `SchemaRequiredDataSourceV2` if it supports both schema | ||
* inference and user-specified schemas. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
cc @rdblue for the new API of schema reference.
Test build #81490 has finished for PR 19136 at commit
|
*/ | ||
@Experimental | ||
@InterfaceStability.Unstable | ||
public List<ReadTask<UnsafeRow>> createUnsafeRowReadTasks() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I really like the new API's flexibility to implement the different types of support. Considering UnsafeRow is unstable , Would it be possible to move createUnsafeRowReadTasks to a different interface ? That might make data source implement two types of data sources one with Row , and another one with UnsafeRow and make it easily configurable based on the spark version.
* Adds one more entry to the options. | ||
* This should only be called by Spark, not data source implementations. | ||
*/ | ||
public void addOption(String key, String value) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The check added for addOption protects modifying the options passed to the datasource, but data source can still add new options by accident. I think it might be safer to pass DataSourceV2Options that are Unmodifiable by the data source.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
good point, I'll make it immutable.
ee5faf1
to
182b89d
Compare
*/ | ||
@Experimental | ||
@InterfaceStability.Unstable | ||
public interface UnsafeRowScan extends DataSourceV2Reader { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
cc @j-baker for the new unsafe row scan API. Programmatically unsafe row scan should be in the base class, and normal row scan should be in the child class. However, conceptually for a developer, normal row scan is a basic interface and should be in the base class. Unsafe row scan is kind of an add-on and should be in the child class.
* task will always run on these locations. Implementations should make sure that it can | ||
* be run on any location. | ||
*/ | ||
default String[] preferredLocations() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what format are these strings expected to be in? If Spark will be placing this ReadTask onto an executor that is a preferred location, the format will need to be a documented part of the API
are there levels of preference, or only the binary? I'm thinking node vs rack vs datacenter for on-prem clusters, or instance vs AZ vs region for cloud clusers
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These have previously only been ip/hostnames. To match the RDD definition I think we would have to continue with that.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This API matches the RDD.preferredLocations
directly, I'll add more documents here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we have a class Host which represents this? Just makes the API more clear.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
hmmm, do you mean create a Host
class which only has a string field?
* A mix in interface for `DataSourceV2Reader`. Users can implement this interface to report | ||
* statistics to Spark. | ||
*/ | ||
public interface StatisticsSupport { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
some datasources have per-column statistics, like how many bytes a column has or its min/max (e.g. things required for CBO).
should that be a separate interface from this one?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd like to put column stats in a separated interface, because we already separate basic stats and column stats in ANALYZE TABLE
.
Test build #81506 has finished for PR 19136 at commit
|
Test build #81510 has finished for PR 19136 at commit
|
* | ||
* @param schema the full schema of this data source reader. Full schema usually maps to the | ||
* physical schema of the underlying storage of this data source reader, e.g. | ||
* parquet files, JDBC tables, etc, while this reader may not read data with full |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe update the doc here, since JDBC sources and Parquet files probably shouldn't implement this. CSV and JSON are the examples that come to mind for sources that require a schema.
|
||
object DataSourceV2Relation { | ||
def apply(reader: DataSourceV2Reader): DataSourceV2Relation = { | ||
new DataSourceV2Relation(reader.readSchema().toAttributes, reader) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this the right schema? The docs for readSchema
say it is the result of pushdown and projection, which doesn't seem appropriate for a Relation
. Does relation represent a table that can be filtered and projected, or does it represent a single read? At least in the Hive read path, it's a table.
On the other hand, it is much better to compute stats on a relation that's already filtered.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I leave it as a TODO as it needs some refactoring on the optimizer. For now DataSourceV2Relation
represents a data source without any optimization: we do these optimizations during planning. This is also a problem for data source v1, and that's why we implement partition pruning as an optimizer rule instead of data source internal, because we need to update the stats.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are you saying that partition pruning isn't delegated to the data source in this interface?
I was just looking into how the data source should provide partition data, or at least fields that are the same for all rows in a ReadTask
. It would be nice to have a way to pass those up instead of materializing them in each UnsafeRow
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In data source V2, we will delegate partition pruning to the data source, although we need to do some refactoring to make it happen.
I was just looking into how the data source should provide partition data, or at least fields that are the same for all rows in a
ReadTask
. It would be nice to have a way to pass those up instead of materializing them in eachUnsafeRow
.
This can be achieved by the columnar reader. Think about a data source having a data column i
and a partition column j
, the returned columnar batch has 2 column vectors for i
and j
. Column vector i
is a normal one that contains all the values of column i
within this batch, column vector j
is a constant vector that only contains a single value.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we should add a way to provide partition values outside of the columnar reader. It wouldn't be too difficult to add a method on ReadTask
that returns them, then create a joined row in the scan exec. Otherwise, this requires a lot of wasted memory for a scan.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think users can write a special ReadTask
to do it, but we can't save memory by doing this. When an operator(the scan operator) transfers data to another operator, the data must be UnsafeRow
s. So even users return a joined row in ReadTask
, Spark need to convert it to UnsafeRow
.
case r: CatalystFilterPushDownSupport => | ||
r.pushCatalystFilters(filters.toArray) | ||
|
||
case r: FilterPushDownSupport => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks like CatalystFilterPushDownSupport
and FilterPushDownSupport
are exclusive? But we can't prevent users to implement them both?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yea we can't prevent users to implement them both, and we will pick CatalystFilterPushDownSupport
over FilterPushDownSupport
. Let me document it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can FilterPushDownSupport be an interface which extends CatalystFilterPushDownSupport and provides a default impl of pruning the catalyst flter? Like, this code can just go there as a method:
interface FilterPushDownSupport extends CatalystFilterPushDownSupport {
List<Filter> pushFilters(List<Filter> filters);
default List<Expression> pushCatalystFilters(List<Expression> filters) {
Map<Filter, Expression> translatedMap = new HashMap<>();
List<Filter> nonconvertiblePredicates = new ArrayList<>();
for (Expression catalystFilter : filters) {
Optional<Filter> translatedFilter = DataSourceStrategy.translateFilter(catalystFilter);
if (translatedFilter.isPresent()) {
translatedMap.put(translatedFilter.get(), catalystFilter);
} else {
nonconvertiblePredicates.add(catalystFilter);
}
}
List<Filter> unhandledFilters = pushFilters(new ArrayList<>(translatedMap.values()));
return Stream.concat(
nonconvertiblePredicates.stream(),
unhandledFilters().stream().map(translatedMap::get))
.collect(toList());
}
}
and we can trivially ignore the interface confusion (it's truly confusing if you can implement two interfaces)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
like, we might as well not document it if the code can document it
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
good idea!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
By doing so, do we still need to match both CatalystFilterPushDownSupport
and FilterPushDownSupport
here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
After some attempts, I went back with 2 individual interfaces. The reason is that, a) CatalystFilterPushDownSupport
is an unstable interface, and it looks weird to let a stable interface extend an unstable one. b) the logic that converts expressions to public filters belongs to Spark internal, and we may change it in the future, so we should not put these codes in a public interface. We may have a risk of breaking compatibility for this interface.
|
||
// Match original case of attributes. | ||
// TODO: nested fields pruning | ||
val requiredColumns = (projectSet ++ filterSet).toSeq.map(attrMap) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need to request columns that are only referenced by pushed filters?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems reasonable to only request the ones that will be used, or that have residuals after pushing filters.
import java.io.Closeable; | ||
|
||
/** | ||
* A data reader returned by a read task and is responsible for outputting data for an RDD |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: an
-> a
* 1. push operators downward to the data source, e.g., column pruning, filter push down, etc. | ||
* 2. propagate information upward to Spark, e.g., report statistics, report ordering, etc. | ||
* 3. special scans like columnar scan, unsafe row scan, etc. Note that a data source reader can | ||
* at most implement one special scan. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
at most implement one
-> implement at most one
* 3. special scans like columnar scan, unsafe row scan, etc. Note that a data source reader can | ||
* at most implement one special scan. | ||
* | ||
* Spark first applies all operator push down optimizations which this data source supports. Then |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
push down
-> push-down
StructType readSchema(); | ||
|
||
/** | ||
* Returns a list of read tasks, each task is responsible for outputting data for one RDD |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
, each
-> . Each
|
||
/** | ||
* Returns a list of read tasks, each task is responsible for outputting data for one RDD | ||
* partition, which means the number of tasks returned here is same as the number of RDD |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
, which means
-> That means
|
||
/** | ||
* A mix-in interface for `DataSourceV2Reader`. Users can implement this interface to push down | ||
* arbitrary expressions as predicates to the data source. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Note that, this is an experimental and unstable interface
|
||
/** | ||
* A mix-in interface for `DataSourceV2Reader`. Users can implement this interface to only read | ||
* required columns/nested fields during scan. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
-> the required
/** | ||
* Apply column pruning w.r.t. the given requiredSchema. | ||
* | ||
* Implementation should try its best to prune unnecessary columns/nested fields, but it's also |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the unnecessary
public interface FilterPushDownSupport { | ||
|
||
/** | ||
* Push down filters, returns unsupported filters. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pushes down filters, and returns unsupported filters.
* statistics to Spark. | ||
*/ | ||
public interface StatisticsSupport { | ||
Statistics getStatistics(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will the returned stats be adjusted by the data sources based on the operator push-down?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It should, but we need some refactor on optimizer, see #19136 (comment)
/** | ||
* Push down filters, returns unsupported filters. | ||
*/ | ||
Expression[] pushCatalystFilters(Expression[] filters); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
any chance this could push java lists? They're just more idiomatic in a java interface
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
java list is not friendly to scala implementations :)
* An interface to represent statistics for a data source. | ||
*/ | ||
public interface Statistics { | ||
long sizeInBytes(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OptionalLong for sizeInBytes? It's not obvious that sizeInBytes is well defined for e.g. JDBC datasources, but row count can generally be easily estimated from the query plan.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
like, I get that it's non-optional at the moment, but it's odd that we have a method that the normal implementor will have to replace with
public long sizeInBytes() {
return Long.MAX_VALUE;
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
and now is a good time to fix it :)
|
||
/** | ||
* Returns the option value to which the specified key is mapped, case-insensitively, | ||
* or {@code null} if there is no mapping for the key. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we return Optional<String>
here? JDK maintainers wish they could return optional on Map
* Returns the option value to which the specified key is mapped, case-insensitively, | ||
* or {@code defaultValue} if there is no mapping for the key. | ||
*/ | ||
public String getOrDefault(String key, String defaultValue) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if the above returns Optional
, you probably don't need this method.
1e86d5c
to
abcc606
Compare
Test build #81717 has finished for PR 19136 at commit
|
Test build #81722 has finished for PR 19136 at commit
|
Test build #81723 has finished for PR 19136 at commit
|
import java.util.OptionalLong; | ||
|
||
/** | ||
* An interface to represent statistics for a data source. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
link back to SupportsReportStatistics
Test build #81773 has finished for PR 19136 at commit
|
any more comments? is it ready to go? |
* constructor. | ||
* | ||
* Note that this is an empty interface, data source implementations should mix-in at least one of | ||
* the plug-in interfaces like `ReadSupport`. Otherwise it's just a dummy data source which is |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
use an actual link ...
import org.apache.spark.sql.sources.v2.reader.DataSourceV2Reader; | ||
|
||
/** | ||
* A mix-in interface for `DataSourceV2`. Users can implement this interface to provide data reading |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Users -> data source implementers
Actually a better one is
"Data sources can implement"
* source can implement both `ReadSupport` and `ReadSupportWithSchema` if it supports both schema | ||
* inference and user-specified schema. | ||
*/ | ||
public interface ReadSupportWithSchema { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I still find ReadSupport vs ReadSupportWithSchema pretty confusing. But let's address that separately.
|
||
/** | ||
* An interface to represent statistics for a data source, which is returned by | ||
* `SupportsReportStatistics`. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
also use @link
|
||
class DataSourceRDD( | ||
sc: SparkContext, | ||
@transient private val generators: java.util.List[ReadTask[UnsafeRow]]) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why is this called a generators?
LGTM. Still some feedback that can be addressed later. We should also document all the APIs as Evolving. |
Test build #81809 has finished for PR 19136 at commit
|
thank you all for the review, merging to master! |
## What changes were proposed in this pull request? As we discussed in apache#19136 (comment) , we should push down operators to data source before planning, so that data source can report statistics more accurate. This PR also includes some cleanup for the read path. ## How was this patch tested? existing tests. Author: Wenchen Fan <[email protected]> Closes apache#19424 from cloud-fan/follow.
This PR adds the infrastructure for data source v2, and implement features which Spark already have in data source v1, i.e. column pruning, filter push down, catalyst expression filter push down, InternalRow scan, schema inference, data size report. The write path is excluded to avoid making this PR growing too big, and will be added in follow-up PR. new tests Author: Wenchen Fan <[email protected]> Closes apache#19136 from cloud-fan/data-source-v2.
As we discussed in apache#19136 (comment) , we should push down operators to data source before planning, so that data source can report statistics more accurate. This PR also includes some cleanup for the read path. existing tests. Author: Wenchen Fan <[email protected]> Closes apache#19424 from cloud-fan/follow.
What changes were proposed in this pull request?
This PR adds the infrastructure for data source v2, and implement features which Spark already have in data source v1, i.e. column pruning, filter push down, catalyst expression filter push down, InternalRow scan, schema inference, data size report. The write path is excluded to avoid making this PR growing too big, and will be added in follow-up PR.
How was this patch tested?
new tests