Skip to content

Commit

Permalink
Add ParquetObjectReader::with_runtime (apache#6248)
Browse files Browse the repository at this point in the history
  • Loading branch information
tustvold committed Aug 14, 2024
1 parent ee55721 commit cad06ea
Showing 1 changed file with 47 additions and 14 deletions.
61 changes: 47 additions & 14 deletions parquet/src/arrow/async_reader/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,12 @@ use bytes::Bytes;
use futures::future::BoxFuture;
use futures::{FutureExt, TryFutureExt};

use object_store::{ObjectMeta, ObjectStore};

use crate::arrow::async_reader::{AsyncFileReader, MetadataLoader};
use crate::errors::Result;
use crate::errors::{ParquetError, Result};
use crate::file::metadata::ParquetMetaData;
use object_store::path::Path;
use object_store::{ObjectMeta, ObjectStore};
use tokio::runtime::Handle;

/// Reads Parquet files in object storage using [`ObjectStore`].
///
Expand Down Expand Up @@ -59,6 +60,7 @@ pub struct ParquetObjectReader {
metadata_size_hint: Option<usize>,
preload_column_index: bool,
preload_offset_index: bool,
runtime: Option<Handle>,
}

impl ParquetObjectReader {
Expand All @@ -72,6 +74,7 @@ impl ParquetObjectReader {
metadata_size_hint: None,
preload_column_index: false,
preload_offset_index: false,
runtime: None,
}
}

Expand Down Expand Up @@ -99,27 +102,57 @@ impl ParquetObjectReader {
..self
}
}

/// Perform IO on the provided tokio runtime
///
/// Tokio is a cooperative scheduler, and relies on tasks yielding in a timely manner
/// to service IO. Therefore, running IO and CPU-bound tasks, such as parquet decoding,
/// on the same tokio runtime can lead to degraded throughput, dropped connections and
/// other issues. For more information see [here].
///
/// [here]: https://www.influxdata.com/blog/using-rustlangs-async-tokio-runtime-for-cpu-bound-tasks/
pub fn with_runtime(self, handle: Handle) -> Self {
Self {
runtime: Some(handle),
..self
}
}

fn spawn<F, O>(&self, f: F) -> BoxFuture<'_, Result<O>>
where
F: for<'a> FnOnce(&'a Arc<dyn ObjectStore>, &'a Path) -> BoxFuture<'a, Result<O>>
+ Send
+ 'static,
O: Send + 'static,
{
match &self.runtime {
Some(handle) => {
let path = self.meta.location.clone();
let store = Arc::clone(&self.store);
let fut = handle.spawn(async move { f(&store, &path).await });
fut.unwrap_or_else(|e| match e.try_into_panic() {
Ok(p) => std::panic::resume_unwind(p),
Err(e) => Err(ParquetError::External(Box::new(e))),
})
.boxed()
}
None => f(&self.store, &self.meta.location),
}
}
}

impl AsyncFileReader for ParquetObjectReader {
fn get_bytes(&mut self, range: Range<usize>) -> BoxFuture<'_, Result<Bytes>> {
self.store
.get_range(&self.meta.location, range)
.map_err(|e| e.into())
.boxed()
self.spawn(|store, path| store.get_range(path, range).map_err(|e| e.into()).boxed())
}

fn get_byte_ranges(&mut self, ranges: Vec<Range<usize>>) -> BoxFuture<'_, Result<Vec<Bytes>>>
where
Self: Send,
{
async move {
self.store
.get_ranges(&self.meta.location, &ranges)
.await
.map_err(|e| e.into())
}
.boxed()
self.spawn(move |store, path| {
async move { store.get_ranges(path, &ranges).await.map_err(|e| e.into()) }.boxed()
})
}

fn get_metadata(&mut self) -> BoxFuture<'_, Result<Arc<ParquetMetaData>>> {
Expand Down

0 comments on commit cad06ea

Please sign in to comment.