Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Asynchronous geoparquet reader #493

Merged
merged 13 commits into from
Feb 5, 2024
Merged

Conversation

weiji14
Copy link
Contributor

@weiji14 weiji14 commented Feb 2, 2024

Implementing an asynchronous GeoParquet file reader using ParquetRecordBatchStream.

TODO:

  • Initial implementation in src/io/parquet/reader.rs
  • Fix trait bounds
  • Refactor to have both read_geoparquet and read_geoparquet_async functions parse the GeoParquet metadata using the same function
  • Bring in object-store crate to read from URL (if it gets complicated, maybe split it into a separate PR)
  • Document new function
  • Add unit test

Addresses #492

P.S. This is my first ever Rust PR, so take it easy 🙈

Comment on lines 157 to 158
let stream: ParquetRecordBatchStream<T> = builder.build().unwrap();
let batches: Vec<RecordBatch> = stream.try_collect::<Vec<_>>().await.unwrap();
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Getting this error error about unsatisfied trait bounds:

error[E0599]: the method `try_collect` exists for struct `ParquetRecordBatchStream<T>`, but its trait bounds were not satisfied
   --> src/io/parquet/reader.rs:158:44
    |
158 |     let batches: Vec<RecordBatch> = stream.try_collect::<Vec<_>>().await.unwrap();
    |                                            ^^^^^^^^^^^ method cannot be called on `ParquetRecordBatchStream<T>` due to unsatisfied trait bounds
    |
   ::: ~/.cargo/registry/src/index.crates.io-6f17d22bba15001f/parquet-50.0.0/src/arrow/async_reader/mod.rs:554:1

A little unsure about how to handle this, do I set the bounds on L157's ParquetRecordBatchStream<T> or somewhere else?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree it can be a confusing error message that is hard to debug. In this case, I went to the docs of ParquetRecordBatchStream and looked at its implementation of Stream. In that docstring, the bound on T is T: AsyncFileReader + Unpin + Send + 'static, so the inference is that the bound on T in this function needs to additionally have Unpin, which indeed fixed the compilation issue.

(I don't actually know what Unpin does, since I don't do a lot of async. If you'd like to learn more, I'd suggest this deep dive: https://fasterthanli.me/articles/pin-and-suffering)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah cool, thanks for the explanation! Was banging my head trying to figure out what was missing. Now I know more about trait bounds (and how to read the docs a bit better to find the traits) 🧑‍🎓

@kylebarron
Copy link
Member

Thanks! I pushed a commit with the following changes:

  • Fixed the trait bounds, as mentioned above.
  • Removed the calls to unwrap in favor of the ? (try) operator. unwrap means essentially "assert there was no error", while ? means "continue if there was no error; otherwise propagate the error upwards, immediately returning an error from this function". The ? takes a bit of time to get used to but ends up having really clean code with excellent error handling.
  • Removed the explicit types. If you use rust-analyzer, which I recommend, the inferred types are inlaid, so you don't need to explicitly define them everywhere:
    image
  • Added a new feature flag for parquet_async. The async Parquet reader brings in more dependencies than the sync Parquet reader, so it's good to have an option for users to use only the sync version. To complete this process, we need to conditionally compile parquet and async parquet functions only when those feature flags are on. We currently compile the parquet mod only when the parquet compilation flag is on:
    #[cfg(feature = "parquet")]
    pub mod parquet;

    I think the best way forward here is to create a new file src/io/parquet/reader_async.rs and in src/io/parquet/mod.rs conditionally compile reader_async.rs only when cfg(feature = "parquet_async"). Note that the parquet/async syntax in the Cargo.toml means "activate the async feature from within the parquet dependency.

Create a dedicated reader_async.rs file to put read_geoparquet_async, and move the parse_geoparquet_metadata function from reader.rs to geoparquet_metadata.rs.
@weiji14
Copy link
Contributor Author

weiji14 commented Feb 3, 2024

  • Removed the calls to unwrap in favor of the ? (try) operator. unwrap means essentially "assert there was no error", while ? means "continue if there was no error; otherwise propagate the error upwards, immediately returning an error from this function". The ? takes a bit of time to get used to but ends up having really clean code with excellent error handling.

Oh good, my iDE was giving me squiggly red lines under the ? and suggesting to use .unwrap, but I think it was because I was activating the parquet_async feature flag by commenting out the optional statements in cargo.toml locally to get rust-analyzer to work on the file. I think I've set things up a bit better now, so will stick with the ? operator!

  • Removed the explicit types. If you use rust-analyzer, which I recommend, the inferred types are inlaid, so you don't need to explicitly define them everywhere:
    image

Yep, I've got rust-analyzer set-up, but my IDE (Pulsar) only shows those inferred types when I hover over them. Ok to remove them if you prefer it that way.

  • Added a new feature flag for parquet_async. The async Parquet reader brings in more dependencies than the sync Parquet reader, so it's good to have an option for users to use only the sync version. To complete this process, we need to conditionally compile parquet and async parquet functions only when those feature flags are on. We currently compile the parquet mod only when the parquet compilation flag is on:
    #[cfg(feature = "parquet")]
    pub mod parquet;

I think the best way forward here is to create a new file src/io/parquet/reader_async.rs and in src/io/parquet/mod.rs conditionally compile reader_async.rs only when cfg(feature = "parquet_async"). Note that the parquet/async syntax in the Cargo.toml means "activate the async feature from within the parquet dependency.

Thanks for setting up that new feature flag! I was wondering yesterday too if async should be enabled by default with the parquet feature or hidden behind a sub-feature flag. I've moved the geoparquet_reader_async function to reader_async.rs at commit 37b0f6c, and also the parse_geoparquet_metadata to geoparquet_metadata.rs. Gonna refactor things a bit more later to keep things DRY and add some unit tests if possible.

Comment on lines 9 to 14
pub struct GeoParquetReaderOptions {
batch_size: usize,
coord_type: CoordType,
pub batch_size: usize,
pub coord_type: CoordType,
}

impl GeoParquetReaderOptions {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this GeoParquetReaderOptions struct go in a common place like src/io/parquet/structs.rs? Since this will be used in both the sync/async GeoParquet readers?

src/io/parquet/geoparquet_metadata.rs Outdated Show resolved Hide resolved
A common function to get the arrow_schema, geometry_column_index and target_geo_data_type out of the GeoParquet file for both the async/sync readers. Also turned parse_geoparquet_metadata back into a private func.
Needed to use tokio::fs::File which implements the AsyncRead trait, otherwise unit test is similar to the synchronous version in reader.rs.
Copy link
Contributor Author

@weiji14 weiji14 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bring in object-store crate to read from URL (if it gets complicated, maybe split it into a separate PR)

Given how 'big' this PR already is, I'm tempted to bring in object-store in a separate PR. I see that you're already doing some stuff with FlatGeobuf x object-store in #494.

Comment on lines 128 to 132
let Ok((geometry_column_index, target_geo_data_type)) =
parse_geoparquet_metadata(parquet_meta.file_metadata(), &arrow_schema, *coord_type)
else {
panic!("Cannot parse geoparquet metadata");
};
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably should return a GeoArrowError here instead of panic? A little unsure if this let-else syntax is the idiomatic way of doing things...

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You should change the return type of this function to

-> Result<(Arc<Schema>, usize, Option<GeoDataType>)> {

(note that this Result is pointing to crate::error::Result, which shadow's the standard-lib's Result. See

pub type Result<T> = std::result::Result<T, GeoArrowError>;

Then now that this function returns a Result, you can use the ? operator to handle exceptions. So this can change to

Suggested change
let Ok((geometry_column_index, target_geo_data_type)) =
parse_geoparquet_metadata(parquet_meta.file_metadata(), &arrow_schema, *coord_type)
else {
panic!("Cannot parse geoparquet metadata");
};
let (geometry_column_index, target_geo_data_type) =
parse_geoparquet_metadata(parquet_meta.file_metadata(), &arrow_schema, *coord_type)?;
Ok((arrow_schema, geometry_column_index, target_geo_data_type))

That is, we call the first line, unwrapping the result if it's an Ok or propagating the error upwards if it's an Err, and then we return an Ok with the data to return from this function

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Then anywhere that calls build_arrow_schema can use ? to unwrap the error.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah yes, returning Result makes more sense. Done in 97b5911.

Follow what's used by the synchronous read_geoparquet function.
@kylebarron
Copy link
Member

kylebarron commented Feb 3, 2024

Given how 'big' this PR already is, I'm tempted to bring in object-store in a separate PR. I see that you're already doing some stuff with FlatGeobuf x object-store in #494.

The nice thing is that object store integration can be as little as just documentation. This is because ParquetObjectReader implements AsyncFileReader. So following this example a user can just do

let storage_container = Arc::new(MicrosoftAzureBuilder::from_env().build().unwrap());
let location = Path::from("path/to/blob.parquet");
let meta = storage_container.head(&location).await.unwrap();
println!("Found Blob with {}B at {}", meta.size, meta.location);

let reader = ParquetObjectReader::new(storage_container, meta);
let table = read_geoparquet_async(reader, options).await?;

No need to use the let-else statement anymore with a panic!, and allows using `?` operator to handle exceptions when the function is called.
Add module level documentation showing how to use read_geoparquet and read_geoparquet_async to read GeoParquet files into a GeoTable struct.
Copy link
Contributor Author

@weiji14 weiji14 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Given how 'big' this PR already is, I'm tempted to bring in object-store in a separate PR. I see that you're already doing some stuff with FlatGeobuf x object-store in #494.

The nice thing is that object store integration can be as little as just documentation. This is because ParquetObjectReader implements AsyncFileReader.

Ok, this PR should be ready for review! I've added some module-level example docs for read_geoparquet_async and read_geoparquet, but stopped short of mentioning object-store just yet. My idea with bringing in object-store was more on applying it on the Pyo3/Python side around here:

pub fn read_parquet(path: String, batch_size: usize) -> PyGeoArrowResult<GeoTable> {
let file = File::open(path).map_err(|err| PyFileNotFoundError::new_err(err.to_string()))?;
let options = GeoParquetReaderOptions::new(batch_size, Default::default());
let table = _read_geoparquet(file, options)?;
Ok(GeoTable(table))
}

Specifically, making it possible for a user to pass in either a local path or a remote path like s3://bucket/example.geoparquet directly, and there would be a match statement to handle different object storage services (s3/az/gs/etc). There's this object_store::parse_url function that can be used to build the ObjectStore. But let's discuss the implementation in #492 first, I'm not entirely sure if it's better to have this parsing logic in geoarrow-rs, or leave it to the user to handle the object-store part themselves.

Comment on lines +35 to +43
use tokio::fs::File;

#[tokio::test]
async fn nybb() {
let file = File::open("fixtures/geoparquet/nybb.parquet")
.await
.unwrap();
let options = GeoParquetReaderOptions::new(65536, Default::default());
let _output_geotable = read_geoparquet_async(file, options).await.unwrap();
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

... This is because ParquetObjectReader implements AsyncFileReader. So following this example a user can just do

let storage_container = Arc::new(MicrosoftAzureBuilder::from_env().build().unwrap());
let location = Path::from("path/to/blob.parquet");
let meta = storage_container.head(&location).await.unwrap();
println!("Found Blob with {}B at {}", meta.size, meta.location);

let reader = ParquetObjectReader::new(storage_container, meta);
let table = read_geoparquet_async(reader, options).await?;

Oh cool, that's real handy! One thing I'm a little confused about is how we can pass in a tokio::fs::File here, since it only implements Sync + Unpin, but read_geoparquet_async has the trait bound <R: AsyncFileReader + Unpin + Send + 'static>. Is the AsyncFileReader impl not absolutely necessary, or does ParquetRecordBatchStreamBuilder::new do some special handling?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

AsyncFileReader is defined by the parquet crate.

But also note this "blanket" implementation

impl<T: AsyncRead + AsyncSeek + Unpin + Send> AsyncFileReader for T

That means that AsyncFileReader is automatically implemented for any type that already implements AsyncRead and AsyncSeek. A tokio File implements AsyncRead and implements AsyncSeek. Therefore the File (and anything else built for tokio's ecosystem) should automatically work with Parquet.

Copy link
Contributor Author

@weiji14 weiji14 Feb 6, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the explanation, learned something new again! Also starting to 'get' Rust traits a lot more 😀

@weiji14 weiji14 marked this pull request as ready for review February 4, 2024 07:07
@weiji14 weiji14 changed the title WIP: Asynchronous geoparquet reader Asynchronous geoparquet reader Feb 4, 2024
@kylebarron
Copy link
Member

Yeah let's work out the Python bits in a different PR. In particular, I think it makes sense to have both an async def read_parquet_async for power users building a web server or something on top of geoarrow, but also a def read_parquet that handles stuff automatically under the hood.

I'm also not exactly sure what public Python API is ideal, so we should come back to that.

@kylebarron
Copy link
Member

Any piece of code that you put in a docstring is automatically a doctest. So it needs to compile and run (and pass) unless you annotate it with notest.

Copy link
Member

@kylebarron kylebarron left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks!

@kylebarron kylebarron merged commit 8aca782 into geoarrow:main Feb 5, 2024
6 checks passed
@weiji14 weiji14 deleted the async-geoparquet branch February 5, 2024 18:42
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants