Skip to content

Commit

Permalink
feat: multi-threaded async runtime (#11411)
Browse files Browse the repository at this point in the history
  • Loading branch information
ritchie46 authored Sep 29, 2023
1 parent eb43a08 commit e1f75d0
Show file tree
Hide file tree
Showing 7 changed files with 33 additions and 37 deletions.
2 changes: 1 addition & 1 deletion crates/polars-io/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ serde_json = { version = "1", default-features = false, features = ["alloc", "ra
simd-json = { workspace = true, optional = true }
simdutf8 = { workspace = true, optional = true }
smartstring = { workspace = true }
tokio = { workspace = true, features = ["net"], optional = true }
tokio = { workspace = true, features = ["net", "rt-multi-thread"], optional = true }
tokio-util = { workspace = true, features = ["io", "io-util"], optional = true }
url = { workspace = true, optional = true }

Expand Down
4 changes: 3 additions & 1 deletion crates/polars-io/src/cloud/adaptors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -240,6 +240,7 @@ mod tests {
.unwrap()
}

#[test]
fn csv_to_local_objectstore_cloudwriter() {
use crate::csv::CsvWriter;
use crate::prelude::SerWriter;
Expand All @@ -263,7 +264,8 @@ mod tests {

// Skip this tests on Windows since it does not have a convenient /tmp/ location.
#[cfg_attr(target_os = "windows", ignore)]
async fn cloudwriter_from_cloudlocation_test() {
#[test]
fn cloudwriter_from_cloudlocation_test() {
use crate::csv::CsvWriter;
use crate::prelude::SerWriter;

Expand Down
2 changes: 1 addition & 1 deletion crates/polars-io/src/parquet/read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -336,7 +336,7 @@ impl ParquetAsyncReader {

// batched reader deals with slice pushdown
let reader = self.batched(usize::MAX).await?;
let mut iter = reader.iter_async(16);
let mut iter = reader.iter(16);

let mut chunks = Vec::with_capacity(16);
while let Some(result) = iter.next_().await {
Expand Down
30 changes: 1 addition & 29 deletions crates/polars-io/src/parquet/read_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -562,18 +562,6 @@ impl BatchedParquetReader {
batches_per_iter,
inner: self,
current_batch: vec![].into_iter(),
rt: Some(get_runtime()),
}
}

/// Turn the batched reader into an iterator.
#[cfg(feature = "async")]
pub fn iter_async(self, batches_per_iter: usize) -> BatchedParquetIter {
BatchedParquetIter {
batches_per_iter,
inner: self,
current_batch: vec![].into_iter(),
rt: None,
}
}
}
Expand All @@ -583,7 +571,6 @@ pub struct BatchedParquetIter {
batches_per_iter: usize,
inner: BatchedParquetReader,
current_batch: std::vec::IntoIter<DataFrame>,
rt: Option<tokio::runtime::Runtime>,
}

#[cfg(feature = "async")]
Expand All @@ -609,21 +596,6 @@ impl Iterator for BatchedParquetIter {
type Item = PolarsResult<DataFrame>;

fn next(&mut self) -> Option<Self::Item> {
match self.current_batch.next() {
Some(df) => Some(Ok(df)),
None => match self
.rt
.as_ref()
.unwrap()
.block_on(self.inner.next_batches(self.batches_per_iter))
{
Err(e) => Some(Err(e)),
Ok(opt_batch) => {
let batch = opt_batch?;
self.current_batch = batch.into_iter();
self.current_batch.next().map(Ok)
},
},
}
get_runtime().block_on(self.next_())
}
}
11 changes: 9 additions & 2 deletions crates/polars-io/src/pl_async.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,16 @@
use std::ops::Deref;

use once_cell::sync::Lazy;
use tokio::runtime::{Builder, Runtime};

pub fn get_runtime() -> Runtime {
Builder::new_current_thread()
static RUNTIME: Lazy<Runtime> = Lazy::new(|| {
Builder::new_multi_thread()
.enable_io()
.enable_time()
.build()
.unwrap()
});

pub fn get_runtime() -> &'static Runtime {
RUNTIME.deref()
}
4 changes: 1 addition & 3 deletions crates/polars-pipe/src/executors/sources/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ pub struct ParquetSource {
cloud_options: Option<CloudOptions>,
file_info: FileInfo,
verbose: bool,
rt: tokio::runtime::Runtime,
}

impl ParquetSource {
Expand Down Expand Up @@ -121,7 +120,6 @@ impl ParquetSource {
cloud_options,
file_info,
verbose,
rt: get_runtime(),
})
}
}
Expand All @@ -131,7 +129,7 @@ impl Source for ParquetSource {
if self.batched_reader.is_none() {
self.init_reader()?;
}
let batches = self.rt.block_on(
let batches = get_runtime().block_on(
self.batched_reader
.as_mut()
.unwrap()
Expand Down
17 changes: 17 additions & 0 deletions py-polars/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

0 comments on commit e1f75d0

Please sign in to comment.