From 3bda91afb65167c15b60ceeedebcebfc852f6fed Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Sun, 6 Aug 2023 09:11:17 -0700 Subject: [PATCH] Use tokio only if running from a multi-thread tokio context (#7205) * Use tokio only if running from a multi-thread tokio context * Fix clippy --- datafusion/core/src/physical_plan/common.rs | 35 +++++++++++---------- 1 file changed, 19 insertions(+), 16 deletions(-) diff --git a/datafusion/core/src/physical_plan/common.rs b/datafusion/core/src/physical_plan/common.rs index f6b8fb33c91d..46dbc9ef6204 100644 --- a/datafusion/core/src/physical_plan/common.rs +++ b/datafusion/core/src/physical_plan/common.rs @@ -97,24 +97,27 @@ pub(crate) fn spawn_buffered( mut input: SendableRecordBatchStream, buffer: usize, ) -> SendableRecordBatchStream { - // Use tokio only if running from a tokio context (#2201) - if tokio::runtime::Handle::try_current().is_err() { - return input; - }; - - let mut builder = RecordBatchReceiverStream::builder(input.schema(), buffer); - - let sender = builder.tx(); + // Use tokio only if running from a multi-thread tokio context + match tokio::runtime::Handle::try_current() { + Ok(handle) + if handle.runtime_flavor() == tokio::runtime::RuntimeFlavor::MultiThread => + { + let mut builder = RecordBatchReceiverStream::builder(input.schema(), buffer); + + let sender = builder.tx(); + + builder.spawn(async move { + while let Some(item) = input.next().await { + if sender.send(item).await.is_err() { + return; + } + } + }); - builder.spawn(async move { - while let Some(item) = input.next().await { - if sender.send(item).await.is_err() { - return; - } + builder.build() } - }); - - builder.build() + _ => input, + } } /// Computes the statistics for an in-memory RecordBatch