-
Notifications
You must be signed in to change notification settings - Fork 1.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Simplify sort streams #2296
Simplify sort streams #2296
Conversation
receiver, | ||
join_handle, | ||
), | ||
0, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think specifying 0 here is ok, but perhaps @yjshen could confirm?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes. This field is mem_used
for SortedStream
. This 0 is consistent with what we report for Receivers. We currently do not count in memory usage for a single batch for all streams' next()
.
@@ -269,9 +284,6 @@ pub(crate) struct SortPreservingMergeStream { | |||
/// The sorted input streams to merge together | |||
streams: MergingStreams, | |||
|
|||
/// Drop helper for tasks feeding the input [`streams`](Self::streams) | |||
_drop_helper: AbortOnDropMany<()>, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We no longer need this as it is handled by the individual RecordBatchReceiverStream
use futures::Stream; | ||
use tokio::sync::mpsc; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is updated because of the change to spawn_execution
@@ -180,26 +179,23 @@ impl ExecutionPlan for CoalescePartitionsExec { | |||
} | |||
} | |||
|
|||
pin_project! { | |||
struct MergeStream { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Drive by cleanup, this isn't necessary as the types are unpin
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seems like a nice cleanup to me
cc @yjshen
.into_iter() | ||
.map(|s| StreamWrapper::Stream(Some(s))) | ||
.collect(); | ||
let wrappers = streams.into_iter().map(|s| s.stream.fuse()).collect(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is fuse
the magic that avoids the need for the stream wrapper?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's the combination of this and using RecordBatchReceiverStream to convert the mpsc to a SendableRecordBatchStream
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
RecordBatchReceiverStream
is the thing I was missing
receiver, | ||
join_handle, | ||
), | ||
0, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes. This field is mem_used
for SortedStream
. This 0 is consistent with what we report for Receivers. We currently do not count in memory usage for a single batch for all streams' next()
.
Which issue does this PR close?
Part of #2201
Rationale for this change
In preparation for making tokio an optional dependency of SortPreservingMerge, this PR shuffles around some of the stream plumbing.
What changes are included in this PR?
Are there any user-facing changes?
No