Skip to content
This repository has been archived by the owner on Feb 18, 2024. It is now read-only.

Make parquet_read_record support async #331

Closed
jackwener opened this issue Aug 24, 2021 · 3 comments
Closed

Make parquet_read_record support async #331

jackwener opened this issue Aug 24, 2021 · 3 comments
Labels
question Further information is requested

Comments

@jackwener
Copy link

jackwener commented Aug 24, 2021

Hi, I meet some problem when I make parquet_read_record in example async.

I have try to async the function but error occur.

14 |     let reader = read::RecordReader::try_new(reader, None, None, Arc::new(|_, _| true), None)?;
   |                  ^^^^^^^^^^^^^^^^^^^^^^^^^^^ the trait `std::io::Read` is not implemented for `tokio::fs::File`

I'd like to fix it and contribute it to example.

@jackwener
Copy link
Author

/label question

@jorgecarleitao
Copy link
Owner

Hey! So, the RecordReader is the sync version of the API and we do not have an async version. The reason is that it is a bit unclear what the best solution is: do we want to push CPU-bounded work to async tasks? Do we want to use one-shot channels with Rayon?

Generally, this is handled in different ways depending on how the query engine schedules IO-bounded and CPU-bounded tasks (i.e. which thread pool picks them up). In this case, the stream from get_page_stream is IO only, the stream from page_stream_to_array is pure CPU only.

There is a natural way of extending the API for async by declaring something like:

pub struct RecordReaderAsync<R: AsyncRead + Unpin + Send + AsyncSeek> {
    reader: R,
    schema: Arc<Schema>,
    indices: Rc<Vec<usize>>,
    buffer: Vec<u8>,
    decompress_buffer: Vec<u8>,
    groups_filter: GroupFilter,
    pages_filter: Option<PageFilter>,
    metadata: Rc<FileMetaData>,
    current_group: usize,
    remaining_rows: usize,
}

and use get_page_stream and page_stream_to_array to produce a stream of RecordBatch (thereby pushing CPU work to the async tasks, but this is not recommended by tokio's developers, as this blocks progress of other tokio-based tasks running on the same thread pool / runtime.

So, as you can read, I also label this as \label question ^_^

@jackwener
Copy link
Author

Thanks for your reply.

@jorgecarleitao jorgecarleitao added the question Further information is requested label Jun 5, 2022
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
question Further information is requested
Projects
None yet
Development

No branches or pull requests

2 participants