Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Extract ReceiverStreamBuilder #7817

Merged
merged 6 commits into from
Oct 16, 2023
Merged

Conversation

tustvold
Copy link
Contributor

@tustvold tustvold commented Oct 13, 2023

Which issue does this PR close?

Closes #7800

Rationale for this change

An alternative to #7800 that instead of trying to make RecordBatchReceiverStreamBuilder handle non-RecordBatch inputs, instead extracts the lower-level stream management logic that I believe is what we're actually wanting to DRY

What changes are included in this PR?

Are these changes tested?

Are there any user-facing changes?

@tustvold tustvold requested a review from alamb October 13, 2023 14:45
@@ -38,6 +38,113 @@ use tokio::task::JoinSet;
use super::metrics::BaselineMetrics;
use super::{ExecutionPlan, RecordBatchStream, SendableRecordBatchStream};

/// Creates a stream from a collection of producing tasks, routing panics to the stream
pub(crate) struct ReceiverStreamBuilder<O> {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I made this pub(crate) but it could easily be made public should we wish to do so

/// underlying tasks correctly.
///
/// Use [`Self::builder`] to construct one.
pub struct RecordBatchReceiverStream {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This type doesn't really make a lot of sense, given it isn't actually what the builder returns

schema,
join_set: JoinSet::new(),
inner: ReceiverStreamBuilder::new(capacity),
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seemed unnecessary / undesirable to burden ReceiverStreamBuilder with a notion of Schema

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree

// don't need tx
drop(tx);

// future that checks the result of the join set, and propagates panic if seen
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the interesting logic that needs DRYing

@@ -110,7 +211,7 @@ impl RecordBatchReceiverStreamBuilder {
) {
let output = self.tx();

self.spawn(async move {
self.inner.spawn(async move {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This logic by comparison is rather ExecutionPlan specifi, and I don't think valuable to DRY

Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This makes sense to me -- thank you @tustvold

What do you think @wiedld ?

schema,
join_set: JoinSet::new(),
inner: ReceiverStreamBuilder::new(capacity),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree

inner: BoxStream<'static, Result<RecordBatch>>,
}
#[doc(hidden)]
pub struct RecordBatchReceiverStream {}

impl RecordBatchReceiverStream {
/// Create a builder with an internal buffer of capacity batches.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, this method documentation also seems incorrect at this time.

Copy link
Member

@waynexia waynexia left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This implementation looks good to me.

Compared to ReceiverStream from tokio-stream, the only difference is this ReceiverStreamBuilder provides methods to the user to bound and "detach" their tasks (spawn() and spawn_blocking()).

But considering this is also not tough to achieve without ReceiverStreamBuilder, is it possible to use the one tokio provides instead?

@tustvold
Copy link
Contributor Author

AFAICT the tokio version doesn't provide panic propagation nor cancellation?

@waynexia
Copy link
Member

Tokio version doesn't propagate panics. A panicked task will only panic the env it runs. It provides cancellation from the mpsc, dropped consumer (or the stream itself) will result in errors in senders.

@tustvold
Copy link
Contributor Author

Given panic propagation and task cancellation are the entire purpose of this construction, I think we're therefore not duplicating something upstream?

@waynexia
Copy link
Member

Thanks for explaining, LGTM 👍

@alamb
Copy link
Contributor

alamb commented Oct 14, 2023

I tried to encode this conversation into comments

@wiedld
Copy link
Contributor

wiedld commented Oct 14, 2023

What do you think @wiedld ?

Looks great. TY.

@alamb
Copy link
Contributor

alamb commented Oct 15, 2023

I plan to merge this PR when the CI passes

@tustvold tustvold merged commit fa2bb6c into apache:main Oct 16, 2023
22 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants