Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Async User Defined Functions (UDF) #6518

Open
Tracked by #7013
marshauf opened this issue Jun 1, 2023 · 8 comments
Open
Tracked by #7013

Async User Defined Functions (UDF) #6518

marshauf opened this issue Jun 1, 2023 · 8 comments
Labels
enhancement New feature or request

Comments

@marshauf
Copy link

marshauf commented Jun 1, 2023

Is your feature request related to a problem or challenge?

I would like to use async code in an UDF. I couldn't find an example or API documentation on how to do that. It would be nice if it would be possible/documented.

Describe the solution you'd like

datafusion::physical_plan::functions::make_scalar_function() accepts functions which return a Future.

Describe alternatives you've considered

Creating another tokio runtime and offloading the async function onto it.
The main runtime waits in the UDF till async function is done with execution.

Additional context

No response

@marshauf marshauf added the enhancement New feature or request label Jun 1, 2023
@alamb
Copy link
Contributor

alamb commented Jun 1, 2023

I agree there is currently no good way to make a scalar function async.

You could potentially use a table provider and write to your function like INSERT INTO your_table SELECT ... 🤔

That might not work for your usecase however

@alamb alamb changed the title Async UDF Async User Defined Functions (UDF) Jun 1, 2023
@marshauf
Copy link
Author

marshauf commented Jun 1, 2023

My idea was to do something likes this:

SELECT call('localhost:3000', num, letter FROM (SELECT * FROM (VALUES (1, 'one'), (2, 'two'), (3, 'three')) AS t (num,letter))

It would probably work with your example.

CREATE EXTERNAL TABLE your_table STORED AS CSV WITH HEADER ROW LOCATION 'localhost:3000';
INSERT INTO your_table SELECT * FROM (VALUES (1, 'one'), (2, 'two'), (3, 'three')) AS t (num,letter);
SELECT * FROM your_table;

And on insert call endpoint and store returned value. Seems cumbersome to use and implement.

@marshauf
Copy link
Author

@alamb wouldn't it be possible to turn the SQL statement

SELECT call('localhost:3000', num, letter FROM (SELECT * FROM (VALUES (1, 'one'), (2, 'two'), (3, 'three')) AS t (num,letter))

into a LogicalPlan. Rewrite the Expr::ScalarUDF to a Expr::SubQuery with an LogicalPlan::Extension.
The Extension would point to a custom ExecutionPlan, in which I could run async code.
Similar to what is done in datafusion/core/tests/user_defined_plan.rs.

@alamb
Copy link
Contributor

alamb commented Jun 15, 2023

@alamb wouldn't it be possible to turn the SQL statement

Yes, that sounds like it would work (I am sorry I didn't suggest that)

If you get it to work, I think it would be a great example to include in DataFusion to show both the power of the existing extension APIs and custom table functions)

@marshauf
Copy link
Author

I got an example working which replaces an UDF with a user defined Extension.
The extension processes RecordBatches in an async function.

It works great with VALUES as inputs but not with a csv file.

I will clean it up and create a PullRequest. I hope you can help me figure out why inputs from VALUES is processed different to a csv files.

@alamb
Copy link
Contributor

alamb commented Oct 25, 2023

I filed #7926 to track user defined table functions

@edmondop
Copy link
Contributor

edmondop commented Aug 1, 2024

@alamb this is very useful for me now but it is blocking, any chance I can resume the existing work?

@alamb
Copy link
Contributor

alamb commented Aug 1, 2024

Sure -- sounds good to me @edmondop

What I would personally suggest doing is make an example showing what you are trying to do -- and then with that example modify the DataFusion APIs acoordinatly. That way you'll both have the API changes needed as well as an example of what you were trying to do that serves as documentation

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
None yet
Development

Successfully merging a pull request may close this issue.

3 participants