Skip to content

Commit

Permalink
Add support for rust-s3 backend. (#30)
Browse files Browse the repository at this point in the history
* Add support for `rust-s3` backend.

* Significantly simplify error code.

* Re-export s3 and reqwest crates.

* Update README.
  • Loading branch information
lseelenbinder authored Feb 2, 2024
1 parent e95a0a3 commit 29be5e0
Show file tree
Hide file tree
Showing 14 changed files with 190 additions and 52 deletions.
2 changes: 2 additions & 0 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -29,4 +29,6 @@ jobs:
- run: cargo test --features http-async
- run: cargo test --features mmap-async-tokio
- run: cargo test --features tilejson
- run: cargo test --features s3-async-native
- run: cargo test --features s3-async-rustls
- run: cargo test
5 changes: 4 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "pmtiles"
version = "0.5.2"
version = "0.6.0"
edition = "2021"
authors = ["Luke Seelenbinder <[email protected]>"]
license = "MIT OR Apache-2.0"
Expand All @@ -13,6 +13,8 @@ categories = ["science::geo"]
[features]
default = []
http-async = ["dep:tokio", "dep:reqwest"]
s3-async-native = ["dep:tokio", "dep:rust-s3", "rust-s3/tokio-native-tls"]
s3-async-rustls = ["dep:tokio", "dep:rust-s3", "rust-s3/tokio-rustls-tls"]
mmap-async-tokio = ["dep:tokio", "dep:fmmap", "fmmap?/tokio-async"]
tilejson = ["dep:tilejson", "dep:serde", "dep:serde_json"]

Expand All @@ -34,6 +36,7 @@ thiserror = "1"
tilejson = { version = "0.4", optional = true }
tokio = { version = "1", default-features = false, features = ["io-util"], optional = true }
varint-rs = "2"
rust-s3 = { version = "0.33.0", optional = true, default-features = false, features = ["fail-on-err"] }

[dev-dependencies]
flate2 = "1"
Expand Down
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ originally created by Brandon Liu for Protomaps.
- Backends supported:
- Async `mmap` (Tokio) for local files
- Async `http` and `https` (Reqwuest + Tokio) for URLs
- Async `s3` (Rust-S3 + Tokio) for S3-compatible buckets

## Plans & TODOs

Expand Down
2 changes: 2 additions & 0 deletions justfile
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ test:
cargo test --features http-async
cargo test --features mmap-async-tokio
cargo test --features tilejson
cargo test --features s3-async-native
cargo test --features s3-async-rustls
cargo test
RUSTDOCFLAGS="-D warnings" cargo doc --no-deps

Expand Down
52 changes: 45 additions & 7 deletions src/async_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,19 +10,31 @@ use async_trait::async_trait;
use bytes::Bytes;
#[cfg(feature = "http-async")]
use reqwest::{Client, IntoUrl};
#[cfg(any(feature = "http-async", feature = "mmap-async-tokio"))]
#[cfg(any(
feature = "http-async",
feature = "mmap-async-tokio",
feature = "s3-async-rustls",
feature = "s3-async-native"
))]
use tokio::io::AsyncReadExt;

#[cfg(feature = "http-async")]
use crate::backend::HttpBackend;
#[cfg(feature = "mmap-async-tokio")]
use crate::backend::MmapBackend;
#[cfg(any(feature = "s3-async-rustls", feature = "s3-async-native"))]
use crate::backend::S3Backend;
use crate::cache::DirCacheResult;
#[cfg(any(feature = "http-async", feature = "mmap-async-tokio"))]
#[cfg(any(
feature = "http-async",
feature = "mmap-async-tokio",
feature = "s3-async-native",
feature = "s3-async-rustls"
))]
use crate::cache::{DirectoryCache, NoCache};
use crate::directory::{DirEntry, Directory};
use crate::error::{PmtError, PmtResult};
use crate::header::{HEADER_SIZE, MAX_INITIAL_BYTES};
#[cfg(feature = "http-async")]
use crate::http::HttpBackend;
#[cfg(feature = "mmap-async-tokio")]
use crate::mmap::MmapBackend;
use crate::tile::tile_id;
use crate::{Compression, Header};

Expand Down Expand Up @@ -263,6 +275,32 @@ impl<C: DirectoryCache + Sync + Send> AsyncPmTilesReader<MmapBackend, C> {
}
}

#[cfg(any(feature = "s3-async-native", feature = "s3-async-rustls"))]
impl AsyncPmTilesReader<S3Backend, NoCache> {
/// Creates a new `PMTiles` reader from a URL using the Reqwest backend.
///
/// Fails if [url] does not exist or is an invalid archive. (Note: HTTP requests are made to validate it.)
pub async fn new_with_bucket_path(bucket: s3::Bucket, path: String) -> PmtResult<Self> {
Self::new_with_cached_bucket_path(NoCache, bucket, path).await
}
}

#[cfg(any(feature = "s3-async-native", feature = "s3-async-rustls"))]
impl<C: DirectoryCache + Sync + Send> AsyncPmTilesReader<S3Backend, C> {
/// Creates a new `PMTiles` reader with cache from a URL using the Reqwest backend.
///
/// Fails if [url] does not exist or is an invalid archive. (Note: HTTP requests are made to validate it.)
pub async fn new_with_cached_bucket_path(
cache: C,
bucket: s3::Bucket,
path: String,
) -> PmtResult<Self> {
let backend = S3Backend::from(bucket, path);

Self::try_from_cached_source(backend, cache).await
}
}

#[async_trait]
pub trait AsyncBackend {
/// Reads exactly `length` bytes starting at `offset`
Expand All @@ -276,7 +314,7 @@ pub trait AsyncBackend {
#[cfg(feature = "mmap-async-tokio")]
mod tests {
use super::AsyncPmTilesReader;
use crate::mmap::MmapBackend;
use crate::backend::MmapBackend;
use crate::tests::{RASTER_FILE, VECTOR_FILE};

#[tokio::test]
Expand Down
22 changes: 13 additions & 9 deletions src/http.rs → src/backend/http.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
use async_trait::async_trait;
use bytes::Bytes;
use reqwest::header::{HeaderValue, RANGE};
use reqwest::{Client, IntoUrl, Method, Request, StatusCode, Url};
use reqwest::{
header::{HeaderValue, RANGE},
Client, IntoUrl, Method, Request, StatusCode, Url,
};

use crate::async_reader::AsyncBackend;
use crate::error::{PmtHttpError, PmtResult};
use crate::{async_reader::AsyncBackend, error::PmtResult, PmtError};

pub struct HttpBackend {
client: Client,
Expand All @@ -28,26 +29,29 @@ impl AsyncBackend for HttpBackend {
if data.len() == length {
Ok(data)
} else {
Err(PmtHttpError::UnexpectedNumberOfBytesReturned(length, data.len()).into())
Err(PmtError::UnexpectedNumberOfBytesReturned(
length,
data.len(),
))
}
}

async fn read(&self, offset: usize, length: usize) -> PmtResult<Bytes> {
let end = offset + length - 1;
let range = format!("bytes={offset}-{end}");
let range = HeaderValue::try_from(range).map_err(PmtHttpError::from)?;
let range = HeaderValue::try_from(range)?;

let mut req = Request::new(Method::GET, self.pmtiles_url.clone());
req.headers_mut().insert(RANGE, range);

let response = self.client.execute(req).await?.error_for_status()?;
if response.status() != StatusCode::PARTIAL_CONTENT {
return Err(PmtHttpError::RangeRequestsUnsupported.into());
return Err(PmtError::RangeRequestsUnsupported);
}

let response_bytes = response.bytes().await?;
if response_bytes.len() > length {
Err(PmtHttpError::ResponseBodyTooLong(response_bytes.len(), length).into())
Err(PmtError::ResponseBodyTooLong(response_bytes.len(), length))
} else {
Ok(response_bytes)
}
Expand All @@ -56,8 +60,8 @@ impl AsyncBackend for HttpBackend {

#[cfg(test)]
mod tests {
use super::*;
use crate::async_reader::AsyncPmTilesReader;
use crate::http::HttpBackend;

static TEST_URL: &str =
"https://protomaps.github.io/PMTiles/protomaps(vector)ODbL_firenze.pmtiles";
Expand Down
File renamed without changes.
17 changes: 17 additions & 0 deletions src/backend/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
#[cfg(any(feature = "s3-async-native", feature = "s3-async-rustls"))]
mod s3;

#[cfg(any(feature = "s3-async-native", feature = "s3-async-rustls"))]
pub use s3::S3Backend;

#[cfg(feature = "http-async")]
mod http;

#[cfg(feature = "http-async")]
pub use http::HttpBackend;

#[cfg(feature = "mmap-async-tokio")]
mod mmap;

#[cfg(feature = "mmap-async-tokio")]
pub use mmap::MmapBackend;
55 changes: 55 additions & 0 deletions src/backend/s3.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
use async_trait::async_trait;
use bytes::Bytes;
use s3::Bucket;

use crate::{
async_reader::AsyncBackend,
error::PmtError::{ResponseBodyTooLong, UnexpectedNumberOfBytesReturned},
};

pub struct S3Backend {
bucket: Bucket,
pmtiles_path: String,
}

impl S3Backend {
#[must_use]
pub fn from(bucket: Bucket, pmtiles_path: String) -> S3Backend {
Self {
bucket,
pmtiles_path,
}
}
}

#[async_trait]
impl AsyncBackend for S3Backend {
async fn read_exact(&self, offset: usize, length: usize) -> crate::error::PmtResult<Bytes> {
let data = self.read(offset, length).await?;

if data.len() == length {
Ok(data)
} else {
Err(UnexpectedNumberOfBytesReturned(length, data.len()))
}
}

async fn read(&self, offset: usize, length: usize) -> crate::error::PmtResult<Bytes> {
let response = self
.bucket
.get_object_range(
self.pmtiles_path.as_str(),
offset as _,
Some((offset + length - 1) as _),
)
.await?;

let response_bytes = response.bytes();

if response_bytes.len() > length {
Err(ResponseBodyTooLong(response_bytes.len(), length))
} else {
Ok(response_bytes.clone())
}
}
}
2 changes: 0 additions & 2 deletions src/directory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ impl Debug for Directory {

impl Directory {
/// Find the directory entry for a given tile ID.
#[cfg(any(feature = "http-async", feature = "mmap-async-tokio"))]
#[must_use]
pub fn find_tile_id(&self, tile_id: u64) -> Option<&DirEntry> {
match self.entries.binary_search_by(|e| e.tile_id.cmp(&tile_id)) {
Expand Down Expand Up @@ -97,7 +96,6 @@ pub struct DirEntry {
pub(crate) run_length: u32,
}

#[cfg(any(feature = "http-async", feature = "mmap-async-tokio"))]
impl DirEntry {
pub(crate) fn is_leaf(&self) -> bool {
self.run_length == 0
Expand Down
36 changes: 18 additions & 18 deletions src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,30 +29,30 @@ pub enum PmtError {
#[cfg(feature = "mmap-async-tokio")]
#[error("Unable to open mmap file")]
UnableToOpenMmapFile,
#[cfg(feature = "http-async")]
#[error("{0}")]
Http(#[from] PmtHttpError),
}

#[cfg(feature = "http-async")]
#[derive(Debug, Error)]
pub enum PmtHttpError {
#[cfg(any(
feature = "http-async",
feature = "s3-async-native",
feature = "s3-async-rustls"
))]
#[error("Unexpected number of bytes returned [expected: {0}, received: {1}].")]
UnexpectedNumberOfBytesReturned(usize, usize),
#[cfg(feature = "http-async")]
#[error("Range requests unsupported")]
RangeRequestsUnsupported,
#[cfg(any(
feature = "http-async",
feature = "s3-async-native",
feature = "s3-async-rustls"
))]
#[error("HTTP response body is too long, Response {0}B > requested {1}B")]
ResponseBodyTooLong(usize, usize),
#[error("HTTP error {0}")]
#[cfg(feature = "http-async")]
#[error(transparent)]
Http(#[from] reqwest::Error),
#[error("{0}")]
#[cfg(feature = "http-async")]
#[error(transparent)]
InvalidHeaderValue(#[from] reqwest::header::InvalidHeaderValue),
}

// This is required because thiserror #[from] does not support two-level conversion.
#[cfg(feature = "http-async")]
impl From<reqwest::Error> for PmtError {
fn from(e: reqwest::Error) -> Self {
Self::Http(PmtHttpError::Http(e))
}
#[cfg(any(feature = "s3-async-rustls", feature = "s3-async-native"))]
#[error(transparent)]
S3(#[from] s3::error::S3Error),
}
2 changes: 0 additions & 2 deletions src/header.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,7 @@ use bytes::{Buf, Bytes};

use crate::error::{PmtError, PmtResult};

#[cfg(any(feature = "http-async", feature = "mmap-async-tokio"))]
pub(crate) const MAX_INITIAL_BYTES: usize = 16_384;
#[cfg(any(feature = "http-async", feature = "mmap-async-tokio", test))]
pub(crate) const HEADER_SIZE: usize = 127;

#[allow(dead_code)]
Expand Down
45 changes: 33 additions & 12 deletions src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,28 +1,49 @@
#![forbid(unsafe_code)]

mod tile;
pub use directory::{DirEntry, Directory};
pub use error::{PmtError, PmtResult};

mod header;
pub use crate::header::{Compression, Header, TileType};
pub use header::{Compression, Header, TileType};

mod directory;
pub use directory::{DirEntry, Directory};
#[cfg(any(feature = "s3-async-rustls", feature = "s3-async-native"))]
pub use backend::S3Backend;

#[cfg(any(feature = "s3-async-rustls", feature = "s3-async-native"))]
pub use s3;

mod error;
#[cfg(feature = "http-async")]
pub use error::PmtHttpError;
pub use error::{PmtError, PmtResult};
pub use backend::HttpBackend;

#[cfg(feature = "http-async")]
pub mod http;
pub use reqwest;

#[cfg(feature = "mmap-async-tokio")]
pub mod mmap;
pub use backend::MmapBackend;

mod tile;

mod header;

mod directory;

mod error;

mod backend;

#[cfg(any(feature = "http-async", feature = "mmap-async-tokio"))]
#[cfg(any(
feature = "http-async",
feature = "mmap-async-tokio",
feature = "s3-async-rustls",
feature = "s3-async-native"
))]
pub mod async_reader;

#[cfg(any(feature = "http-async", feature = "mmap-async-tokio"))]
#[cfg(any(
feature = "http-async",
feature = "mmap-async-tokio",
feature = "s3-async-native",
feature = "s3-async-rustls"
))]
pub mod cache;

#[cfg(test)]
Expand Down
Loading

0 comments on commit 29be5e0

Please sign in to comment.