-
Notifications
You must be signed in to change notification settings - Fork 3.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Format] Passing column statistics through Arrow C data interface #38837
Comments
There's no current standard convention, there's a few ways such could be sent though such as via a struct array with one row and fields for each of those or various other configurations. Passing such statistics would be beyond the scope of the current C Data interface IMHO |
There is a related discussion at duckdb/duckdb#4636 (reply in thread) |
I would argue that it makes sense to include statistical information in the C data interface. Efficient execution of complex queries requires statistical information, and I believe that most Arrow Producers possess this information. Therefore, it should be somehow passed. An alternative I can think of for the C-Data interface is to wrap the statistical information in top-level objects (e.g., ArrowRecordBatch in Python), but that approach is quite cumbersome and would necessitate specific implementations for every client API. |
Just to provide a clearer example of how statistics can be useful for query optimization. In DuckDB, join ordering is currently determined using heuristics based on table cardinalities with future plans to enhance this with sample statistics. Not only join ordering is affected by statistics but even the choice of the probe side in a hash join, will be determined based on the expected cardinalities. One example of a query that is affected by join ordering is Q 21 of tpch. However, the plan for it is too big to share it easily in a GitHub Discussion. To give a simpler example of how cardinalities affect this, I've created two tables.
My example query is a simple inner join of these two tables and we calculate the sum of SELECT SUM(t.i) from t inner join t_2 on (t_2.k = t.i) Because the optimizer doesn't have any information of statistics from the Arrow side, it will basically pick the probe side depending on what's presented in the query. SELECT SUM(t.i) from t_2 inner join t on (t_2.k = t.i) This would result in a slightly different plan, yet with significant differences in performance. As depicted in the screenshot of executing both queries, choosing the incorrect probe side for this query already results in a performance difference of an order of magnitude. For more complex queries, the variations in execution time could be not only larger but also more difficult to trace. For reference, the code I used for this example: import duckdb
import time
import statistics
con = duckdb.connect()
# Create table with 10^8
con.execute("CREATE TABLE t as SELECT * FROM RANGE(0, 100000000) tbl(i)")
# Create Table with 10
con.execute("CREATE TABLE t_2 as SELECT * FROM RANGE(0, 10) tbl(k)")
query_slow = '''
SELECT SUM(t.i) from t inner join t_2 on (t_2.k = t.i);
'''
query_fast = '''
SELECT SUM(t.i) from t_2 inner join t on (t_2.k = t.i);'''
t = con.execute("FROM t").fetch_arrow_table()
t_2 = con.execute("FROM t_2").fetch_arrow_table()
con_2 = duckdb.connect()
print("DuckDB Arrow - Query Slow")
print(con_2.execute("EXPLAIN " + query_slow).fetchall()[0][1])
execution_times = []
for _ in range(5):
start_time = time.time()
con_2.execute(query_slow)
end_time = time.time()
execution_times.append(end_time - start_time)
median_time = statistics.median(execution_times)
print(median_time)
print("DuckDB Arrow - Query Fast")
print(con_2.execute("EXPLAIN " + query_fast).fetchall()[0][1])
execution_times = []
for _ in range(5):
start_time = time.time()
con_2.execute(query_fast)
end_time = time.time()
execution_times.append(end_time - start_time)
median_time = statistics.median(execution_times)
print(median_time) |
I'm considering some approaches for this use case. This is not completed yet but share my idea so far. Feedback is appreciated. ADBC uses the following schema to return statistics: It's designed for returning statistics of a database. We can simplify this schema because we can just return statistics of a record batch. For example:
TODO: How to represent statistic key? Should we use ADBC style? (Assigning an ID for each statistic key and using it.) If we represent statistics as a record batch, we can pass statistics through Arrow C data interface. This may be a reasonable approach. If we use this approach, we need to do the followings:
TODO: Consider statistics related API for Apache Arrow C++. |
I'd be curious what others think of this approach as opposed to actually making a format change to include statistics alongside the record batches in the API. Particular in the case of a stream of batches. I'm not against it, I just don't know if others would be opposed to needing an entirely separate record batch being sent containing the statistics |
I think the top priority should be to avoid breaking ABI compatibility. I suspect that most users of the C data interface will not want to pass statistics. We should avoid doing anything that would cause disruption for those users. |
Ah, I should have written an approach that changes the current Arrow C data interface. |
This proposal is great. Just a un-related issue, Parquet |
Is |
At least for ADBC, the idea was that other types can be encoded in those choices. (Decimals can be represented as bytes, dates can be represented as int64, etc.) |
Some approaches that are based the C Data interface https://arrow.apache.org/docs/format/CDataInterface.html : (1) Add
|
Do consumers want per-batch metadata anyways? I would assume in the context of say DuckDB is that they'd like to get statistics for the whole stream up front, without reading any data, and use that to inform their query plan. |
Ah, then we should not mix statistics and |
This is just off the cuff, but maybe we could somehow signal to an ArrowArrayStream that a next call to get_next should instead return a batch of Arrow-encoded statistics data. That wouldn't break ABI, so long as we come up with a good way to differentiate the call. Then consumers like DuckDB could fetch the stats up front (if available) and the schema would be known to them (if we standardize on a single schema for this). |
Hmm. It may be better that we provide a separated API to get statistics like #38837 (comment) approach. |
We talked about this a little, but what about approach (2) from Kou's comment above, but for now only defining table-level statistics (primarily row count)? AIUI, row count is the important statistic to have for @pdet's use case, and it is simple to define. We can wait and see on more complicated or column-level statistics. Also for the ArrowDeviceArray, there is some room for extension: Lines 134 to 135 in 14c54bb
Could that be useful here? (Unfortunately there isn't any room for extension on ArrowDeviceArrayStream.) |
Hey guys, Thank you very much for starting the design of Arrow statistics! That's exciting! We are currently interested in up-front full-column statistics. Specially:
As a clarification, we also utilize row-group min-max for filtering optimizations in DuckDB tables, but these cannot benefit Arrow. In Arrow, we either pushdown filters to an Arrow Scanner or create a filter node on top of the scanner, and we do not utilize Mix-Max of chunks for filter optimization. For a more detailed picture of what we hold for statistics you can also look in our statistics folder. But I think that approx count distinct, cardinality, and min-max are enough for a first iteration. |
Table cardinality would be table level right? But of course the others are column level. Hmm. We didn't leave ourselves room in ArrowDeviceArrayStream... And just to be clear "up front" means at the start of the stream, not per-batch, right? |
Exactly!
Yes, at the start of the stream. |
@lidavidm technically the C Device structs are still marked as Experimental on the documentation and haven't been adopted by much of the ecosystem yet (as we're still adding more tooling in libarrow and the pycapsule definitions for using them) so adding room in ArrowDeviceArrayStream should still be viable without breaking people. Either by adding members or an entire separate callback function? |
I think it would be interesting to add an extra callback to get the statistics, yeah. |
It's hard though, because arguably it's kind of outside the core intent of the C Data Interface. But on the other hand, they kind of need to be here to have any hope of being supported more broadly. |
Hmmm I don't know should we represent this "Approximate" |
It seems that DuckDB uses HyperLogLog for computing distinct count: https://github.com/duckdb/duckdb/blob/d26007417b7770860ae78278c898d2ecf13f08fd/src/include/duckdb/storage/statistics/distinct_statistics.hpp#L25-L26 It may be the reason why "Approximate" is included here. |
Nice, I mean should "distinct count" ( or ndv ) be "estimated" in our data interface? And if we add a "estimated", should we add an "exact" ndv or just "estimated" is ok here? In (1) #38837 (comment) , would adding a key here be heavy? |
The ADBC encoding allows the producer to mark statistics as exact/approximate, fwiw
See #38837 (comment) |
Apologies if I am misunderstanding but, given David's point here, aren't we talking about two different API calls? For example, from the perspective of duckdb it is often something like (this is maybe over-simplifying things)...
In other words, first I get the statistics (and no data), then I do some optimization / calculation with the statistics, and then I might choose to actually load the data. If these are two separate API calls then the statistics are just another record batch. What is the advantage of making a single stream / message that contains both data and statistics? |
Thanks for joining this discussion. My understanding of the "up front" is the same as you explained. #38837 (comment) uses
Anyway, it seems that |
Coming from the mailing list, I caught up on some of the comments above. I agree that a separate API call is nicer than packing statistics into the schema. Packing into the schema doesn't seem bad to me, but it certainly seems more limited. An additional case to consider is that a separate API call provides additional flexibility of where to get the statistics in case they may come from a different source than the schema (in the case of a table view or a distributed table). I also interpret Dewey's comment as: the schema should describe the "structure" of the data, whereas the statistics describes the "content" of the data. This aligns with Weston's point that the statistics are essentially "just another record batch" (or at least more similar to the data itself than to the schema). I agree with both of these. Below are some relevant points I tried considering, and I see no downside to using an additional API call:
It's likely relevant that the code referenced above is how duckdb interacts with Arrow that is already in memory (as far as I know). In the future, I can imagine that duckdb will still keep a similar sequence when requesting Arrow data from a data source (file or remote connection) by binding to the schema, then accessing statistics, then accessing the data. |
Thanks for your comment. Could you explain more about a separate API call? Do you imagine a language specific API such as DataFusion's Or do you imagine an API that transmits Arrow data (arrays or a record batch) as statistics like ADBC does? |
In DuckDB, these statistics are created as a callback function that exists in the scanner. For example, in our Arrow integration, the statistics of the scanner are basically empty, as you can see here. In practice, we need to be able to get the statistics at bind time. The code at this link is not really related to cardinality estimation. This code sets the size of the current data chunk we scanned, which will later be pushed to upper nodes. |
Ah, I misunderstood then.
this is why I figured either approach to getting statistics (schema or API call) is viable, since the callback should be able to accommodate either.
I just meant a function call of the scanner API (or I guess something like ADBC, but I don't know that API at all). I interpreted weston's comment to mean that a function call to get statistics is comparable to a function call to get a record batch. But when I say it provides extra flexibility, I mean that it provides a standard way for an independent producer to specify logic to a consumer for how to get statistics. This could allow for things like storing statistics independently from the data stream itself. I also feel like this doesn't preclude statistics metadata being packed into the schema (maybe in some application-specific way). |
I see now, I had misunderstood and thought we were talking about I think both approaches make sense but this is the tricky part:
There are ways to solve this but they all seem like a lot of work for alignment and maintenance. I don't think the benefit (combining |
Right. #38837 (comment) uses |
I don't want to derail the discussion, but given that formats might provide statistics also at record-batch level (think of parquet row groups), what would be the solution in that case? Binding the statistics to the Schema seems to make hard to provide per-batch statistics in |
Are per-batch statistics even useful? You've presumably already made the query plan and done the I/O, so there's little opportunity to make use of the statistics at that point. |
If per-batch statistics are needed, the separated API approach (schema for statistics is only defined) is better than using |
When exchanging data via C-Data I don't think you can take for granted how that data was fetched, so while in the majority of cases you are correct, there might be cases where you are receiving the data from a source that doesn't know anything about the filtering that the receiver will be going to apply. In that case there is a value in passing the statistics-per-batch. |
Summary for the discussion on the mailing list so far: https://lists.apache.org/thread/6m9xrhfktnt0nnyss7qo333n0cl76ypc |
Thank you for the summary @kou. I replied to that thread [1] with a statistics encoding scheme that avoids free-form strings. Strings lead to bloat of metadata and parsing bugs when consumers match only on prefixes (or other "clever" key matching ideas) of string identifiers hindering format evolution. The value of a standard for statistics comes from producers and consumers agreeing on the statistics available, so I think centralizing the definitions in the Arrow C Data Interface specification is better than letting the ecosystem work out the naming of statistics from different producers. I'm trying to avoid a query processors (consumers of statistics) having to deal with quirks based on the system producing the statistics. In the scheme I proposed, a single array would contain statistics for the table/batch and all the columns. That's more friendly to the memory allocator and allows for all the statistics to be processed in a single loop. If the array grows too big, it can be chunked at any position allowing for the streaming of statistics if big values start being produced. @kou proposed this schema for an
I will post what I tried to explain in the mailing list [1] in this format so the comparison is easier:
Like @kou's proposal it uses [1] https://lists.apache.org/thread/gnjq46wn7dbkj2145dskr9tkgfg1ncdw // Statistics values are identified by specified int32-valued keys
// so that producers and consumers can agree on physical
// encoding and semantics. Statistics can be about a column,
// a record batch, or both.
typedef ArrowStatKind int32_t;
// Used for non-standard statistic kinds.
// Value must be a struct<name: utf8, value: dense_union<...>>
#define ARROW_STAT_ANY 0
// Exact number of nulls in a column. Value must be int32 or int64.
#define ARROW_STAT_NULL_COUNT_EXACT 1
// Approximate number of nulls in a column. Value must be float32 or float64.
#define ARROW_STAT_NULL_COUNT_APPROX 2
// The minimum and maximum values of a column.
// Value must be the same type of the column or a wider type.
// Supported types are: ...
#define ARROW_STAT_MIN_APPROX 3
#define ARROW_STAT_MIN_EXACT 4
#define ARROW_STAT_MIN_APPROX 5
#define ARROW_STAT_MAX_EXACT 6
#define ARROW_STAT_CARDINALITY_APPROX 7
#define ARROW_STAT_CARDINALITY_EXACT 8
#define ARROW_STAT_COUNT_DISTINCT_APPROX 9
#define ARROW_STAT_COUNT_DISTINCT_EXACT 10
// ... Represented as a
// list<
// struct<quantile: float32 | float64,
// sum: "same as column type or a type with wider precision">>
#define ARROW_STAT_CUMULATIVE_QUANTILES 11 |
We'll use the following schema based on the mailing list discussion:
|
…ough the Arrow C data interface
…ough the Arrow C data interface
…ough the Arrow C data interface
…ough the Arrow C data interface
…ough the Arrow C data interface
…ough the Arrow C data interface
### Rationale for this change Statistics are useful for fast query processing. Many query engines use statistics to optimize their query plan. Apache Arrow format doesn't have statistics but other formats that can be read as Apache Arrow data may have statistics. For example, Apache Parquet C++ can read Apache Parquet file as Apache Arrow data and Apache Parquet file may have statistics. One of the Apache Arrow C streaming interface use cases is the following: 1. Module A reads Apache Parquet file as Apache Arrow data 2. Module A passes the read Apache Arrow data to module B through the Arrow C data interface 3. Module B processes the passed Apache Arrow data If module A can pass the statistics associated with the Apache Parquet file to module B, module B can use the statistics to optimize its query plan. ### What changes are included in this PR? We standardize how to represent statistics as an Apache Arrow array for easy to exchange. We don't standardize how to pass the statistics array. You can use any interface for it. For example, you can us ethe Apache Arrow C data interface. ### Are these changes tested? Yes. ### Are there any user-facing changes? Yes. * GitHub Issue: #38837 Lead-authored-by: Sutou Kouhei <[email protected]> Co-authored-by: Sutou Kouhei <[email protected]> Co-authored-by: Felipe Oliveira Carvalho <[email protected]> Signed-off-by: Sutou Kouhei <[email protected]>
Issue resolved by pull request 45058 |
@pdet please feel free to leave comments here about your experiences implementing the experimental statistics schema specification in DuckDB. |
Describe the enhancement requested
Is there any standard or convention for passing column statistics through the C data interface?
For example, say there is a module that reads a Parquet file into memory in Arrow format then passes the data arrays to another module through the C data interface. If the Parquet file metadata includes Parquet column statistics such as
distinct_count
,max
,min
, andnull_count
, can the sending module pass those statistics through the C data interface, to allow the receiving module to use the statistics to perform computations more efficiently?Component(s)
Format
The text was updated successfully, but these errors were encountered: