Skip to content
This repository has been archived by the owner on Feb 18, 2024. It is now read-only.

page_stream_to_array replacement? #1048

Open
wooden-worm opened this issue Jun 5, 2022 · 4 comments
Open

page_stream_to_array replacement? #1048

wooden-worm opened this issue Jun 5, 2022 · 4 comments
Labels
question Further information is requested

Comments

@wooden-worm
Copy link

In arrow2 0.9 there's page_stream_to_array function to deserialize DataPage into Array, which was removed in the latest version of arrow2. How would I achieve the same thing as page_stream_to_array now?

I'm using it to implement an async RecordReader as suggested in #331 (comment)

@jorgecarleitao
Copy link
Owner

Just to make sure I understand the goal: you have a stream of pages from parquet2 and want to convert each page to a single Arrow array?

@jorgecarleitao jorgecarleitao added the question Further information is requested label Jun 5, 2022
@wooden-worm
Copy link
Author

Just to make sure I understand the goal: you have a stream of pages from parquet2 and want to convert each page to a single Arrow array?

Yes, I got a stream of pages from doing get_page_stream and decompress, like described in #331 (comment)

@jorgecarleitao
Copy link
Owner

jorgecarleitao commented Jun 7, 2022

Thanks. So, unfortunately we need a backward incompatible change (#1055), but the gist is this:

use std::sync::Arc;

use arrow2::array::Array;
use arrow2::datatypes::Schema;
use async_stream::stream;
use futures::stream::BoxStream;
use futures::{io, pin_mut, StreamExt};

use arrow2::error::Error;
use arrow2::io::parquet::read;
use arrow2::io::parquet::read::fallible_streaming_iterator;

async fn get_stream<'a, R: futures::AsyncRead + Unpin + Send + futures::AsyncSeek>(
    reader: &'a mut R,
    row_group: &'a read::RowGroupMetaData,
    schema: &'a Schema,
    column: usize,
) -> Result<BoxStream<'a, Result<Arc<dyn Array>, Error>>, Error> {
    let type_ = row_group.columns()[column]
        .descriptor()
        .descriptor
        .primitive_type
        .clone();
    let column = &row_group.columns()[column];

    let pages =
        read::get_page_stream(column, reader, vec![], std::sync::Arc::new(|_, _| true)).await?;

    Ok(stream! {
        pin_mut!(pages);
        for await maybe_page in pages {
            let field = schema.fields[0].clone();

            let compressed_page = maybe_page.unwrap();

            let encoded_page = read::decompress(compressed_page, &mut vec![]).unwrap();
            let iter = fallible_streaming_iterator::convert(std::iter::once(Ok(&encoded_page)));
            let mut iter = read::column_iter_to_arrays(vec![iter], vec![&type_], field, None).unwrap();
            yield iter.next().unwrap();
        }
    }.boxed())
}

#[tokio::main(flavor = "current_thread")]
async fn main() -> Result<(), Error> {
    let mut reader = io::Cursor::new(vec![]);
    let metadata = read::read_metadata_async(&mut reader).await?;

    let schema = read::infer_schema(&metadata)?;

    let row_group = &metadata.row_groups[0];

    let stream = get_stream(&mut reader, row_group, &schema, 0).await?;

    Ok(())
}

@jorgecarleitao
Copy link
Owner

#1055 has been merged. I think that you should be able to use the example above in main.

Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
question Further information is requested
Projects
None yet
Development

No branches or pull requests

2 participants