Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(udf): support multiprocess pool for CPU-bound Python UDFs #13838

Closed
wants to merge 5 commits into from

Conversation

wangrunji0408
Copy link
Contributor

I hereby agree to the terms of the RisingWave Labs, Inc. Contributor License Agreement.

What's changed and what's your intention?

partially-resolve #13744

This PR adds a multiprocess pool to run CPU bound Python functions. Users can add the workers option in @udf or @udtf to enable parallel execution:

# a function that takes a long time to execute
@udf(input_types=["BIGINT"], result_type="BIGINT", workers=10)
def square(x: int) -> int:
    sum = 0
    for _ in range(x):
        sum += x
    return sum

Timing results:

dev=> select square(x::int8) from generate_series(10000, 11000) t(x);
time245.066 ms (single)
time294.942 ms (workers=10)

dev=> select square(x::int8) from generate_series(100000, 101000) t(x);
time2377.646 ms (single)
time377.491 ms (workers=10)

Checklist

  • I have written necessary rustdoc comments
  • I have added necessary unit tests and integration tests
  • I have added test labels as necessary. See details.
  • I have added fuzzing tests or opened an issue to track them. (Optional, recommended for new SQL features Sqlsmith: Sql feature generation #7934).
  • My PR contains breaking changes. (If it deprecates some features, please create a tracking issue to remove them in the future).
  • All checks passed in ./risedev check (or alias, ./risedev c)
  • My PR changes performance-critical code. (Please run macro/micro-benchmarks and show the results.)
  • My PR contains critical fixes that are necessary to be merged into the latest release. (Please check out the details)

Documentation

  • My PR needs documentation updates. (Please use the Release note section below to summarize the impact on users)

Release note

Python UDFs now support multiple workers to run CPU-bound functions.

Comment on lines 55 to 58
if io_threads is not None:
self._executor = ThreadPoolExecutor(max_workers=io_threads)
elif workers is not None:
self._executor = get_reusable_executor(max_workers=workers)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm pondering whether io_threads can also be replaced by this...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. But using threads is more efficient than processes, because multiprocess needs pickling (serializing code) and data transfer between processes. That's why it's even slower than single thread mode for not very computationally heavy functions. 🤪

Copy link
Member

@xxchan xxchan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you explain a bit about the "Timing results"? Why the first one is slower with worker=10 🤪

BTW, might also trying compare throughputs, like #11508 (comment) does with nexmark generator

@wangrunji0408
Copy link
Contributor Author

wangrunji0408 commented Dec 20, 2023

The overhead of data exchange between processes looks too large. I'm looking for some other ways, like spawning separate Arrow Flight servers on each process and enabling the SO_REUSEADDR option to allow them listening on the same port. apache/arrow-cookbook#236 (comment)

@wangrunji0408
Copy link
Contributor Author

wangrunji0408 commented Dec 22, 2023

Closed this PR as the overhead of multiprocessing is too large. We would like to recommend another approach to scale Python UDF server: start a server on each CPU core and use a proxy such as Nginx to do load balance.

See: https://github.com/risingwavelabs/risingwave-docs/pull/1680

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

udf: improve parallelism of Python UDFs
2 participants