Skip to content

Commit

Permalink
Add ParquetObjectReader::with_runtime (#6612)
Browse files Browse the repository at this point in the history
* Add ParquetObjectReader::with_runtime (#6248)

* Add test for ParquetObjectReader::with_runtime and fix clippy complaints

* Switch ParquetObjectReader runtime tests to not depend on tokio_unstable anymore

* Add doc-comment for test_runtime_thread_id_different

Co-authored-by: Andrew Lamb <[email protected]>

* - Add comment about why we don't use spawn for metadata
- Remove outdated comment about target_has_atomic
- Add test to verify reader fails when spawned on a shutdown runtime

* Avoid use of Infallable and From conversion

---------

Co-authored-by: Raphael Taylor-Davies <[email protected]>
Co-authored-by: Andrew Lamb <[email protected]>
  • Loading branch information
3 people authored Nov 2, 2024
1 parent 6859c87 commit 22bc772
Show file tree
Hide file tree
Showing 5 changed files with 162 additions and 32 deletions.
2 changes: 1 addition & 1 deletion arrow-json/src/writer/encoder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -454,7 +454,7 @@ impl Encoder for ArrayFormatter<'_> {
/// A newtype wrapper around [`ArrayFormatter`] that skips surrounding the value with `"`
struct RawArrayFormatter<'a>(ArrayFormatter<'a>);

impl<'a> Encoder for RawArrayFormatter<'a> {
impl Encoder for RawArrayFormatter<'_> {
fn encode(&mut self, idx: usize, out: &mut Vec<u8>) {
let _ = write!(out, "{}", self.0.value(idx));
}
Expand Down
2 changes: 1 addition & 1 deletion arrow-string/src/predicate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -239,7 +239,7 @@ fn equals_kernel((n, h): (&u8, &u8)) -> bool {
}

fn equals_ignore_ascii_case_kernel((n, h): (&u8, &u8)) -> bool {
n.to_ascii_lowercase() == h.to_ascii_lowercase()
n.eq_ignore_ascii_case(h)
}

/// Transforms a like `pattern` to a regex compatible pattern. To achieve that, it does:
Expand Down
2 changes: 1 addition & 1 deletion parquet/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ lz4_flex = { version = "0.11", default-features = false, features = ["std", "fra
zstd = { version = "0.13", default-features = false }
serde_json = { version = "1.0", features = ["std"], default-features = false }
arrow = { workspace = true, features = ["ipc", "test_utils", "prettyprint", "json"] }
tokio = { version = "1.0", default-features = false, features = ["macros", "rt", "io-util", "fs"] }
tokio = { version = "1.0", default-features = false, features = ["macros", "rt-multi-thread", "io-util", "fs"] }
rand = { version = "0.8", default-features = false, features = ["std", "std_rng"] }
object_store = { version = "0.11.0", default-features = false, features = ["azure"] }

Expand Down
187 changes: 159 additions & 28 deletions parquet/src/arrow/async_reader/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,17 +15,15 @@
// specific language governing permissions and limitations
// under the License.

use std::ops::Range;
use std::sync::Arc;
use std::{ops::Range, sync::Arc};

use bytes::Bytes;
use futures::future::BoxFuture;
use futures::{FutureExt, TryFutureExt};

use object_store::{ObjectMeta, ObjectStore};
use futures::{future::BoxFuture, FutureExt, TryFutureExt};
use object_store::{path::Path, ObjectMeta, ObjectStore};
use tokio::runtime::Handle;

use crate::arrow::async_reader::AsyncFileReader;
use crate::errors::Result;
use crate::errors::{ParquetError, Result};
use crate::file::metadata::{ParquetMetaData, ParquetMetaDataReader};

/// Reads Parquet files in object storage using [`ObjectStore`].
Expand Down Expand Up @@ -59,6 +57,7 @@ pub struct ParquetObjectReader {
metadata_size_hint: Option<usize>,
preload_column_index: bool,
preload_offset_index: bool,
runtime: Option<Handle>,
}

impl ParquetObjectReader {
Expand All @@ -72,6 +71,7 @@ impl ParquetObjectReader {
metadata_size_hint: None,
preload_column_index: false,
preload_offset_index: false,
runtime: None,
}
}

Expand Down Expand Up @@ -99,29 +99,70 @@ impl ParquetObjectReader {
..self
}
}

/// Perform IO on the provided tokio runtime
///
/// Tokio is a cooperative scheduler, and relies on tasks yielding in a timely manner
/// to service IO. Therefore, running IO and CPU-bound tasks, such as parquet decoding,
/// on the same tokio runtime can lead to degraded throughput, dropped connections and
/// other issues. For more information see [here].
///
/// [here]: https://www.influxdata.com/blog/using-rustlangs-async-tokio-runtime-for-cpu-bound-tasks/
pub fn with_runtime(self, handle: Handle) -> Self {
Self {
runtime: Some(handle),
..self
}
}

fn spawn<F, O, E>(&self, f: F) -> BoxFuture<'_, Result<O>>
where
F: for<'a> FnOnce(&'a Arc<dyn ObjectStore>, &'a Path) -> BoxFuture<'a, Result<O, E>>
+ Send
+ 'static,
O: Send + 'static,
E: Into<ParquetError> + Send + 'static,
{
match &self.runtime {
Some(handle) => {
let path = self.meta.location.clone();
let store = Arc::clone(&self.store);
handle
.spawn(async move { f(&store, &path).await })
.map_ok_or_else(
|e| match e.try_into_panic() {
Err(e) => Err(ParquetError::External(Box::new(e))),
Ok(p) => std::panic::resume_unwind(p),
},
|res| res.map_err(|e| e.into()),
)
.boxed()
}
None => f(&self.store, &self.meta.location)
.map_err(|e| e.into())
.boxed(),
}
}
}

impl AsyncFileReader for ParquetObjectReader {
fn get_bytes(&mut self, range: Range<usize>) -> BoxFuture<'_, Result<Bytes>> {
self.store
.get_range(&self.meta.location, range)
.map_err(|e| e.into())
.boxed()
self.spawn(|store, path| store.get_range(path, range))
}

fn get_byte_ranges(&mut self, ranges: Vec<Range<usize>>) -> BoxFuture<'_, Result<Vec<Bytes>>>
where
Self: Send,
{
async move {
self.store
.get_ranges(&self.meta.location, &ranges)
.await
.map_err(|e| e.into())
}
.boxed()
self.spawn(|store, path| async move { store.get_ranges(path, &ranges).await }.boxed())
}

// This method doesn't directly call `self.spawn` because all of the IO that is done down the
// line due to this method call is done through `self.get_bytes` and/or `self.get_byte_ranges`.
// When `self` is passed into `ParquetMetaDataReader::load_and_finish`, it treats it as
// an `impl MetadataFetch` and calls those methods to get data from it. Due to `Self`'s impl of
// `AsyncFileReader`, the calls to `MetadataFetch::fetch` are just delegated to
// `Self::get_bytes`.
fn get_metadata(&mut self) -> BoxFuture<'_, Result<Arc<ParquetMetaData>>> {
Box::pin(async move {
let file_size = self.meta.size;
Expand All @@ -138,38 +179,52 @@ impl AsyncFileReader for ParquetObjectReader {

#[cfg(test)]
mod tests {
use std::sync::Arc;
use std::sync::{
atomic::{AtomicUsize, Ordering},
Arc,
};

use futures::TryStreamExt;

use arrow::util::test_util::parquet_test_data;
use futures::FutureExt;
use object_store::local::LocalFileSystem;
use object_store::path::Path;
use object_store::ObjectStore;
use object_store::{ObjectMeta, ObjectStore};

use crate::arrow::async_reader::ParquetObjectReader;
use crate::arrow::async_reader::{AsyncFileReader, ParquetObjectReader};
use crate::arrow::ParquetRecordBatchStreamBuilder;
use crate::errors::ParquetError;

#[tokio::test]
async fn test_simple() {
async fn get_meta_store() -> (ObjectMeta, Arc<dyn ObjectStore>) {
let res = parquet_test_data();
let store = LocalFileSystem::new_with_prefix(res).unwrap();

let mut meta = store
let meta = store
.head(&Path::from("alltypes_plain.parquet"))
.await
.unwrap();

let store = Arc::new(store) as Arc<dyn ObjectStore>;
let object_reader = ParquetObjectReader::new(Arc::clone(&store), meta.clone());
(meta, Arc::new(store) as Arc<dyn ObjectStore>)
}

#[tokio::test]
async fn test_simple() {
let (meta, store) = get_meta_store().await;
let object_reader = ParquetObjectReader::new(store, meta);

let builder = ParquetRecordBatchStreamBuilder::new(object_reader)
.await
.unwrap();
let batches: Vec<_> = builder.build().unwrap().try_collect().await.unwrap();

assert_eq!(batches.len(), 1);
assert_eq!(batches[0].num_rows(), 8);
}

#[tokio::test]
async fn test_not_found() {
let (mut meta, store) = get_meta_store().await;
meta.location = Path::from("I don't exist.parquet");

let object_reader = ParquetObjectReader::new(store, meta);
Expand All @@ -180,10 +235,86 @@ mod tests {
let err = e.to_string();
assert!(
err.contains("not found: No such file or directory (os error 2)"),
"{}",
err
"{err}",
);
}
}
}

#[tokio::test]
async fn test_runtime_is_used() {
let num_actions = Arc::new(AtomicUsize::new(0));

let (a1, a2) = (num_actions.clone(), num_actions.clone());
let rt = tokio::runtime::Builder::new_multi_thread()
.on_thread_park(move || {
a1.fetch_add(1, Ordering::Relaxed);
})
.on_thread_unpark(move || {
a2.fetch_add(1, Ordering::Relaxed);
})
.build()
.unwrap();

let (meta, store) = get_meta_store().await;

let initial_actions = num_actions.load(Ordering::Relaxed);

let reader = ParquetObjectReader::new(store, meta).with_runtime(rt.handle().clone());

let builder = ParquetRecordBatchStreamBuilder::new(reader).await.unwrap();
let batches: Vec<_> = builder.build().unwrap().try_collect().await.unwrap();

// Just copied these assert_eqs from the `test_simple` above
assert_eq!(batches.len(), 1);
assert_eq!(batches[0].num_rows(), 8);

assert!(num_actions.load(Ordering::Relaxed) - initial_actions > 0);

// Runtimes have to be dropped in blocking contexts, so we need to move this one to a new
// blocking thread to drop it.
tokio::runtime::Handle::current().spawn_blocking(move || drop(rt));
}

/// Unit test that `ParquetObjectReader::spawn`spawns on the provided runtime
#[tokio::test]
async fn test_runtime_thread_id_different() {
let rt = tokio::runtime::Builder::new_multi_thread()
.worker_threads(1)
.build()
.unwrap();

let (meta, store) = get_meta_store().await;

let reader = ParquetObjectReader::new(store, meta).with_runtime(rt.handle().clone());

let current_id = std::thread::current().id();

let other_id = reader
.spawn(|_, _| async move { Ok::<_, ParquetError>(std::thread::current().id()) }.boxed())
.await
.unwrap();

assert_ne!(current_id, other_id);

tokio::runtime::Handle::current().spawn_blocking(move || drop(rt));
}

#[tokio::test]
async fn io_fails_on_shutdown_runtime() {
let rt = tokio::runtime::Builder::new_multi_thread()
.worker_threads(1)
.build()
.unwrap();

let (meta, store) = get_meta_store().await;

let mut reader = ParquetObjectReader::new(store, meta).with_runtime(rt.handle().clone());

rt.shutdown_background();

let err = reader.get_bytes(0..1).await.unwrap_err().to_string();

assert!(err.to_string().contains("was cancelled"));
}
}
1 change: 0 additions & 1 deletion parquet/src/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,6 @@ impl From<str::Utf8Error> for ParquetError {
ParquetError::External(Box::new(e))
}
}

#[cfg(feature = "arrow")]
impl From<ArrowError> for ParquetError {
fn from(e: ArrowError) -> ParquetError {
Expand Down

0 comments on commit 22bc772

Please sign in to comment.