Skip to content

Commit

Permalink
Use tokio only if running from a multi-thread tokio context (#7205)
Browse files Browse the repository at this point in the history
* Use tokio only if running from a multi-thread tokio context

* Fix clippy
  • Loading branch information
viirya authored Aug 6, 2023
1 parent fa1b21c commit 3bda91a
Showing 1 changed file with 19 additions and 16 deletions.
35 changes: 19 additions & 16 deletions datafusion/core/src/physical_plan/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,24 +97,27 @@ pub(crate) fn spawn_buffered(
mut input: SendableRecordBatchStream,
buffer: usize,
) -> SendableRecordBatchStream {
// Use tokio only if running from a tokio context (#2201)
if tokio::runtime::Handle::try_current().is_err() {
return input;
};

let mut builder = RecordBatchReceiverStream::builder(input.schema(), buffer);

let sender = builder.tx();
// Use tokio only if running from a multi-thread tokio context
match tokio::runtime::Handle::try_current() {
Ok(handle)
if handle.runtime_flavor() == tokio::runtime::RuntimeFlavor::MultiThread =>
{
let mut builder = RecordBatchReceiverStream::builder(input.schema(), buffer);

let sender = builder.tx();

builder.spawn(async move {
while let Some(item) = input.next().await {
if sender.send(item).await.is_err() {
return;
}
}
});

builder.spawn(async move {
while let Some(item) = input.next().await {
if sender.send(item).await.is_err() {
return;
}
builder.build()
}
});

builder.build()
_ => input,
}
}

/// Computes the statistics for an in-memory RecordBatch
Expand Down

0 comments on commit 3bda91a

Please sign in to comment.