Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Asynchronous geoparquet reader #493

Merged
merged 13 commits into from
Feb 5, 2024
13 changes: 13 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ geos = ["dep:geos"]
geozero = ["dep:geozero"]
gdal = ["dep:gdal"]
parquet = ["dep:parquet"]
parquet_async = ["parquet", "parquet/async", "dep:futures"]
parquet_compression = [
"parquet/snap",
"parquet/brotli",
Expand Down Expand Up @@ -82,7 +83,7 @@ gdal = { version = "0.16", features = ["bindgen"] }
geozero = { version = "0.11", features = ["with-wkb"] }
parquet = "50"
sqlx = { version = "0.7", default-features = false, features = ["postgres"] }
tokio = { version = "1.9", features = ["macros"] }
tokio = { version = "1.9", features = ["macros", "fs", "rt-multi-thread"] }

[lib]
doctest = true
Expand Down
92 changes: 89 additions & 3 deletions src/io/parquet/geoparquet_metadata.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,16 @@
use std::collections::HashMap;
use std::collections::{HashMap, HashSet};
use std::sync::Arc;

use crate::array::CoordType;
use crate::datatypes::GeoDataType;
use crate::error::{GeoArrowError, Result};

use arrow_schema::Schema;
use parquet::arrow::arrow_reader::ArrowReaderBuilder;
use parquet::file::metadata::FileMetaData;
use serde::{Deserialize, Serialize};
use serde_json::Value;

use crate::error::{GeoArrowError, Result};

#[derive(Serialize, Deserialize)]
pub struct GeoParquetMetadata {
pub version: String,
Expand Down Expand Up @@ -43,3 +48,84 @@ impl GeoParquetMetadata {
))
}
}

// TODO: deduplicate with `resolve_types` in `downcast.rs`
fn infer_geo_data_type(
geometry_types: &HashSet<&str>,
coord_type: CoordType,
) -> Result<Option<GeoDataType>> {
if geometry_types.iter().any(|t| t.contains(" Z")) {
return Err(GeoArrowError::General(
"3D coordinates not currently supported".to_string(),
));
}

match geometry_types.len() {
0 => Ok(None),
1 => Ok(Some(match *geometry_types.iter().next().unwrap() {
"Point" => GeoDataType::Point(coord_type),
"LineString" => GeoDataType::LineString(coord_type),
"Polygon" => GeoDataType::Polygon(coord_type),
"MultiPoint" => GeoDataType::MultiPoint(coord_type),
"MultiLineString" => GeoDataType::MultiLineString(coord_type),
"MultiPolygon" => GeoDataType::MultiPolygon(coord_type),
"GeometryCollection" => GeoDataType::GeometryCollection(coord_type),
_ => unreachable!(),
})),
2 => {
if geometry_types.contains("Point") && geometry_types.contains("MultiPoint") {
Ok(Some(GeoDataType::MultiPoint(coord_type)))
} else if geometry_types.contains("LineString")
&& geometry_types.contains("MultiLineString")
{
Ok(Some(GeoDataType::MultiLineString(coord_type)))
} else if geometry_types.contains("Polygon") && geometry_types.contains("MultiPolygon")
{
Ok(Some(GeoDataType::MultiPolygon(coord_type)))
} else {
Ok(Some(GeoDataType::Mixed(coord_type)))
}
}
_ => Ok(Some(GeoDataType::Mixed(coord_type))),
}
}

fn parse_geoparquet_metadata(
metadata: &FileMetaData,
schema: &Schema,
coord_type: CoordType,
) -> Result<(usize, Option<GeoDataType>)> {
let meta = GeoParquetMetadata::from_parquet_meta(metadata)?;
let column_meta = meta
.columns
.get(&meta.primary_column)
.ok_or(GeoArrowError::General(format!(
"Expected {} in GeoParquet column metadata",
&meta.primary_column
)))?;

let geometry_column_index = schema
.fields()
.iter()
.position(|field| field.name() == &meta.primary_column)
.unwrap();
let mut geometry_types = HashSet::with_capacity(column_meta.geometry_types.len());
column_meta.geometry_types.iter().for_each(|t| {
geometry_types.insert(t.as_str());
});
Ok((
geometry_column_index,
infer_geo_data_type(&geometry_types, coord_type)?,
))
}

pub fn build_arrow_schema<T>(
builder: &ArrowReaderBuilder<T>,
coord_type: &CoordType,
) -> Result<(Arc<Schema>, usize, Option<GeoDataType>)> {
let parquet_meta = builder.metadata();
let arrow_schema = builder.schema().clone();
let (geometry_column_index, target_geo_data_type) =
parse_geoparquet_metadata(parquet_meta.file_metadata(), &arrow_schema, *coord_type)?;
Ok((arrow_schema, geometry_column_index, target_geo_data_type))
}
37 changes: 37 additions & 0 deletions src/io/parquet/mod.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,43 @@
//! Read the [GeoParquet](https://github.com/opengeospatial/geoparquet) format.
//!
//! # Examples of reading GeoParquet file into a GeoTable
//!
//! ## Synchronous reader
//!
//! ```rust
//! use geoarrow::io::parquet::read_geoparquet;
//! use geoarrow::io::parquet::GeoParquetReaderOptions;
//! use std::fs::File;
//!
//! let file = File::open("fixtures/geoparquet/nybb.parquet").unwrap();
//! let options = GeoParquetReaderOptions::new(65536, Default::default());
//! let output_geotable = read_geoparquet(file, options).unwrap();
//! println!("GeoTable schema: {}", output_geotable.schema());
//! ```
//!
//! ## Asynchronous reader
//!
//! ```rust
//! use geoarrow::io::parquet::read_geoparquet_async;
//! use geoarrow::io::parquet::GeoParquetReaderOptions;
//! use tokio::fs::File;
//!
//! #[tokio::main]
//! async fn main() {
//! let file = File::open("fixtures/geoparquet/nybb.parquet")
//! .await
//! .unwrap();
//! let options = GeoParquetReaderOptions::new(65536, Default::default());
//! let output_geotable = read_geoparquet_async(file, options).await.unwrap();
//! println!("GeoTable schema: {}", output_geotable.schema());
//! }
//! ```

mod geoparquet_metadata;
mod reader;
#[cfg(feature = "parquet_async")]
mod reader_async;

pub use reader::{read_geoparquet, GeoParquetReaderOptions};
#[cfg(feature = "parquet_async")]
pub use reader_async::read_geoparquet_async;
96 changes: 7 additions & 89 deletions src/io/parquet/reader.rs
Original file line number Diff line number Diff line change
@@ -1,89 +1,14 @@
use std::collections::HashSet;

use crate::array::CoordType;
use crate::datatypes::GeoDataType;
use crate::error::{GeoArrowError, Result};
use crate::error::Result;
use crate::io::parquet::geoparquet_metadata::build_arrow_schema;
use crate::table::GeoTable;

use crate::io::parquet::geoparquet_metadata::GeoParquetMetadata;
use arrow_schema::Schema;
use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder;
use parquet::file::metadata::FileMetaData;
use parquet::file::reader::ChunkReader;

// TODO: deduplicate with `resolve_types` in `downcast.rs`
fn infer_geo_data_type(
geometry_types: &HashSet<&str>,
coord_type: CoordType,
) -> Result<Option<GeoDataType>> {
if geometry_types.iter().any(|t| t.contains(" Z")) {
return Err(GeoArrowError::General(
"3D coordinates not currently supported".to_string(),
));
}

match geometry_types.len() {
0 => Ok(None),
1 => Ok(Some(match *geometry_types.iter().next().unwrap() {
"Point" => GeoDataType::Point(coord_type),
"LineString" => GeoDataType::LineString(coord_type),
"Polygon" => GeoDataType::Polygon(coord_type),
"MultiPoint" => GeoDataType::MultiPoint(coord_type),
"MultiLineString" => GeoDataType::MultiLineString(coord_type),
"MultiPolygon" => GeoDataType::MultiPolygon(coord_type),
"GeometryCollection" => GeoDataType::GeometryCollection(coord_type),
_ => unreachable!(),
})),
2 => {
if geometry_types.contains("Point") && geometry_types.contains("MultiPoint") {
Ok(Some(GeoDataType::MultiPoint(coord_type)))
} else if geometry_types.contains("LineString")
&& geometry_types.contains("MultiLineString")
{
Ok(Some(GeoDataType::MultiLineString(coord_type)))
} else if geometry_types.contains("Polygon") && geometry_types.contains("MultiPolygon")
{
Ok(Some(GeoDataType::MultiPolygon(coord_type)))
} else {
Ok(Some(GeoDataType::Mixed(coord_type)))
}
}
_ => Ok(Some(GeoDataType::Mixed(coord_type))),
}
}

fn parse_geoparquet_metadata(
metadata: &FileMetaData,
schema: &Schema,
coord_type: CoordType,
) -> Result<(usize, Option<GeoDataType>)> {
let meta = GeoParquetMetadata::from_parquet_meta(metadata)?;
let column_meta = meta
.columns
.get(&meta.primary_column)
.ok_or(GeoArrowError::General(format!(
"Expected {} in GeoParquet column metadata",
&meta.primary_column
)))?;

let geometry_column_index = schema
.fields()
.iter()
.position(|field| field.name() == &meta.primary_column)
.unwrap();
let mut geometry_types = HashSet::with_capacity(column_meta.geometry_types.len());
column_meta.geometry_types.iter().for_each(|t| {
geometry_types.insert(t.as_str());
});
Ok((
geometry_column_index,
infer_geo_data_type(&geometry_types, coord_type)?,
))
}

pub struct GeoParquetReaderOptions {
batch_size: usize,
coord_type: CoordType,
pub batch_size: usize,
pub coord_type: CoordType,
}

impl GeoParquetReaderOptions {
Expand All @@ -95,23 +20,16 @@ impl GeoParquetReaderOptions {
}
}

/// Read a GeoParquet file to a GeoTable.
pub fn read_geoparquet<R: ChunkReader + 'static>(
reader: R,
options: GeoParquetReaderOptions,
) -> Result<GeoTable> {
let builder =
ParquetRecordBatchReaderBuilder::try_new(reader)?.with_batch_size(options.batch_size);

let (arrow_schema, geometry_column_index, target_geo_data_type) = {
let parquet_meta = builder.metadata();
let arrow_schema = builder.schema().clone();
let (geometry_column_index, target_geo_data_type) = parse_geoparquet_metadata(
parquet_meta.file_metadata(),
&arrow_schema,
options.coord_type,
)?;
(arrow_schema, geometry_column_index, target_geo_data_type)
};
let (arrow_schema, geometry_column_index, target_geo_data_type) =
build_arrow_schema(&builder, &options.coord_type)?;

let reader = builder.build()?;

Expand Down
45 changes: 45 additions & 0 deletions src/io/parquet/reader_async.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
use crate::error::Result;
use crate::io::parquet::geoparquet_metadata::build_arrow_schema;
use crate::io::parquet::reader::GeoParquetReaderOptions;
use crate::table::GeoTable;

use futures::stream::TryStreamExt;
use parquet::arrow::async_reader::{AsyncFileReader, ParquetRecordBatchStreamBuilder};

/// Asynchronously read a GeoParquet file to a GeoTable.
pub async fn read_geoparquet_async<R: AsyncFileReader + Unpin + Send + 'static>(
reader: R,
options: GeoParquetReaderOptions,
) -> Result<GeoTable> {
let builder = ParquetRecordBatchStreamBuilder::new(reader)
.await?
.with_batch_size(options.batch_size);

let (arrow_schema, geometry_column_index, target_geo_data_type) =
build_arrow_schema(&builder, &options.coord_type)?;

let stream = builder.build()?;
let batches = stream.try_collect::<_>().await?;

GeoTable::from_arrow(
batches,
arrow_schema,
Some(geometry_column_index),
target_geo_data_type,
)
}

#[cfg(test)]
mod test {
use super::*;
use tokio::fs::File;

#[tokio::test]
async fn nybb() {
let file = File::open("fixtures/geoparquet/nybb.parquet")
.await
.unwrap();
let options = GeoParquetReaderOptions::new(65536, Default::default());
let _output_geotable = read_geoparquet_async(file, options).await.unwrap();
Comment on lines +35 to +43
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

... This is because ParquetObjectReader implements AsyncFileReader. So following this example a user can just do

let storage_container = Arc::new(MicrosoftAzureBuilder::from_env().build().unwrap());
let location = Path::from("path/to/blob.parquet");
let meta = storage_container.head(&location).await.unwrap();
println!("Found Blob with {}B at {}", meta.size, meta.location);

let reader = ParquetObjectReader::new(storage_container, meta);
let table = read_geoparquet_async(reader, options).await?;

Oh cool, that's real handy! One thing I'm a little confused about is how we can pass in a tokio::fs::File here, since it only implements Sync + Unpin, but read_geoparquet_async has the trait bound <R: AsyncFileReader + Unpin + Send + 'static>. Is the AsyncFileReader impl not absolutely necessary, or does ParquetRecordBatchStreamBuilder::new do some special handling?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

AsyncFileReader is defined by the parquet crate.

But also note this "blanket" implementation

impl<T: AsyncRead + AsyncSeek + Unpin + Send> AsyncFileReader for T

That means that AsyncFileReader is automatically implemented for any type that already implements AsyncRead and AsyncSeek. A tokio File implements AsyncRead and implements AsyncSeek. Therefore the File (and anything else built for tokio's ecosystem) should automatically work with Parquet.

Copy link
Contributor Author

@weiji14 weiji14 Feb 6, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the explanation, learned something new again! Also starting to 'get' Rust traits a lot more 😀

}
}
Loading