-
Notifications
You must be signed in to change notification settings - Fork 1.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Execute LogicalPlan on DBMS directly #970
Comments
I think implementing a custom Specifically, you could push filters down here into the underlying DBMS and avoid materializing the entire table: |
Hi @alamb , Like your proposal, projection/filters/limit can be converted back to SQL to directly execute to DBMS in My expected workflow:
|
I also lean towards this approach. @hu6360567 see the With SQL, we still need a way of the source declaring what pushdown it supports (and for datafusion to support pushing aggregates and joins to source).
I believe we already have this, as each The
I haven't thought about it much lately (I attempted an arrow-datafusion-sql thingy early last year), but the biggest change would be allowing datasources to declare if they support certain plans. For example, if there's a window function and an aggregation, and say MySQL supports them, there should be a
It could be a good exercise to see how Spark implements this. Many years ago when I was working with Spark + Impala data source, I saw that certain joins could be pushed to source via JDBC. It was a multi-source virtualised setup (SAP HANA via Spark; don't ask why), but if 2 or more joins came from the same catalog/schema in the same data source, they could be pushed down as a joined SQL query instead of only filtering the input tables. So perhaps a good process could be:
While converting a whole logical plan to SQL might be the easier approach, I don't know if it would be generally suitable as there are going to be some queries that datafusion executes differently to different SQL engines, and it's better for datasources to have more control of what gets pushed down. |
Hi @nevi-me , In a word, the plan can be optimized as much as possible, if capability is not known; the limit of capability is resolved when it is known. For example, with a recursive architecture, the exeuction node can be a planner of exeuction group recursively. |
This is a good point @hu6360567 -- I think what you are describing is called "query federation" and is the main usecase for systems like https://trino.io/ (formerly Presto). It might help to look at that project to get some inspiration. DataFusion could certainly be used for this usecase, but as you point out a non trivial number of additional work is required (like group and join pushdown). It would be interesting to see what you have in mind for |
One idea is to add a cache between the SQL engine and disk store, but this proposal requests a high cache hit ratio. In other words, It depends on your business scenario. |
I think with some extension to our existing table provider abstraction, this kind of cross table compute push down could be achieved within our logical or physical plan optimizer? Following @hu6360567 's logic of splitting logical plans into sub plans, we could perform a rewrite in the plan optimizer to group the query plan tree into sub trees by database instance referenced in table scans. If a sub tree only reads tables from the same database, then we can safely convert that sub tree into a SQL query and push the query down to that database directly. This can be done by rewriting the sub tree into a single plan node that represents a remote SQL query execution. To achieve this, we need to extend the table providers to supply the following info:
Database name/identifier and type helps the planner decide how to group plan into sub trees by database instance. Database compute capability helps the planner to further filter down on which subset of the sub tree can be pushed down. A trimmed down sub plan gets passed down from planner to the table provider's native query compiler to resolve the final query. It doesn't need to be sql and can be any native query supported by the corresponding database type. Lastly, the planner remove the trimmed down sub plan with a single remote query execution node with the compiled native query. All of the above can be handled within during the planning stage.
Do we really need to detect data source capability at runtime? Similar to how we define filter pushdown capability using |
@houqp, the idea of using an optimizer pass is a great one. We might even be able to do it without any changes to the TableProvider as of now. For example, if the input plan was like this:
I could imagine a custom optimizer pass that recoginzed that
The specifics of what types of subplans could be converted is probably specific to the database being pushed to, so it isn't clear that logic belongs in the main datafusion crate. |
Thanks @alamb for adding the diagrams, really helps to visualize the idea :)
Question is without the table providers telling the planner that On top of that, we also need a way to let the planner know what compute plan nodes are supported by a particular database type. I think the table provider could be a good abstraction to provide this info.
I feel like this should be defined inside the table provider implementation, which should be maintained as plugins outside of datafusion core. Datafusion core should just maintain the current memory and listing table providers. Or maybe those two can be moved out of the core one day :) |
That is a good point. I guess I was saying that trying to encode the wonderful complexity of many different external databases into I was wondering if perhaps mapping information like My concern with making additional changes to |
Off-topic, what was the tool used to draw this picture? @alamb |
I agree. Perhaps something along the lines of |
I took a look at how Presto handles this. Presto uses the concept of a Aside from allowing table scans, the I expect it will likely be hard to find a good abstraction to express remote compute capabilities, due to their high verity. Presto's approach seems reasonable to allow granular federation without having to explicitly express federation capabilities. It also limits the number of new concepts that need to be added. DF can choose to ship basic remote providers (E.g. for ADBC) and others can be provided by 3rd parties. Would this be a sensible approach to take? |
@backkem The basic idea seems reasonable to me. Given your description it seems like the same thing could be accomplished today in DataFusion by implementing and registering the appropriate Logical OptimizerRule or ExecutionPlan PhysicalOptimizerRule that would walk the plans, identify subtrees that they knew how to push down, and rewrite them to use specialized DataFusion is already setup to run arbitrary ExecutionPlans / Sources / Passes. I wonder if there is anything specific we need to add 🤔 Perhaps an example and improve the docs would suffice |
Ok, I can see how that would work. Thank you for the input. I wonder if any of the following are worth upstreaming as a canonical way to handle this 'query federation' case:
|
I wonder if
I am not sure how much benefit this would have over just a normal optimizer pass (I probably don't fully understand the proposal -- maybe some code would help explain it in more detail if you would like more specific feedback)
An example that shows how to replace some part of the plan with a FlightSQL or ADBC implementation would be awesome. |
Based on the discussion above and in #7871 and #8699, I wanted to further explore the idea of the // Optimizer rule that facilitates federation.
struct FederationOptimizerRule {}
impl OptimizerRule for FederationOptimizerRule {
fn try_optimize(
&self,
plan: &LogicalPlan,
config: &dyn OptimizerConfig,
) -> Result<Option<LogicalPlan>> {
// Walk over the plan, look for the largest subtrees with only
// FederatedTableSource's all using the same FederationProvider.
// Pass each subtrees to its respective FederationProvider.optimizer
// The job of this optimizer is to 'absorb' part of the plan and replace it with
// one or more virtual TableScan's that wrap the piece of the plan it will federate.
}
}
trait FederationProvider {
fn optimizer(&self) -> Option<Arc<Optimizer>>;
// Add Analyzer & PhysicalOptimizer if needed.
}
// FederatedTableSource that helps the FederationOptimizerRule find the 'largest subtrees'
// in a plan and run the corresponding FederationProvider.optimizer
trait FederatedTableSource: TableSource {
fn federation_provider(&self) -> Arc<dyn FederationProvider>;
fn table_type(&self) -> TableType {
TableType::Temporary
}
}
// TableProvider (or a new trait) is simplified since the virtual TableScan
// injected into the plan by the FederationProvider Optimizer already knows what to do.
trait TableProvider {
async fn scan(&self, state: &SessionState) -> Result<Arc<dyn ExecutionPlan>>;
} It should be feasible to create a wrapper between the old and the new |
This code makes sense to me It also seems like the same effect can be achieved by implementing an |
Technically, I agree it can already be done. However, having a canonical approach for this creates a focus point for creators of the |
Maybe one more thought in line with your comment: the API suggestion allows one to register the providers in one go, as is done today, without having to separately wire up the additional optimizers. I see it as a quality of life improvement. |
I agree it would be better for people creating federated engines, but I think for people using the most common limit/filter/projection pushdown it might be more complicated. What if we made an example like |
Yea, I agree there is a trade-off between ease-of-use and flexibility/capability. It's not obvious to me where to draw the line as mentioned in the sort pushdown discussion. If anyone is interested in the DBMS-level federation, let me know. I'd be open to spinning up a project/crate to be the library equivaled of Trino-type connectors for DataFusion. |
@backkem Thanks for the thoughts/example code above. I would be interested in contributing to / using such a sub-project to support Trino-style connectors. |
I put together a POC based on the concept above (example). It's by no means complete but helps determine viability. |
Update here is I think there is ongoing work in https://github.com/datafusion-contrib/datafusion-federation/ to support this usecase |
I think https://github.com/datafusion-contrib/datafusion-federation/ is proceeding nicely -- let's continue the conversation there |
Is your feature request related to a problem or challenge? Please describe what you are trying to do.
I'm implementing a distributed query system, similar with Ballista, but data sources are changing over time, located at MySQL/Postresql Servers.
Query is parsed and optimized into LogicalPlan at scheduler, and distributed sub-plans to different executors which have access to the sepcific data sources.
The key problem is that not acceptable to load entire table from DBMS to memory and execute the plan, when the table is huge.
Pushdown LogicalPlan directly to the DBMS can effectively reduce the size of in-memory data from the botton of all computation process.
Describe the solution you'd like
One possible solution is define a new DBExecuteContext which associate with a sql connection, that can convert LogicalPlan back to Plain SQL inside ExecutionPlan and execute it directly to the associated sql connection.
libpg_parser
provids the ablity convert from AST back to plain SQL query, butsqlparser-rs
hasn't.First step can be taken is
datafusion
converts LogicalPlan back to Statements, and the second is thatsqlparser
converts Statements back to sql.Describe alternatives you've considered
Not yet.
Additional context
Add any other context or screenshots about the feature request here.
The text was updated successfully, but these errors were encountered: