Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add AWS S3 #44

Merged
merged 19 commits into from
Sep 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 8 additions & 18 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -16,32 +16,28 @@ jobs:
steps:
- name: Checkout sources
uses: actions/checkout@v4
- uses: taiki-e/install-action@v2
with: { tool: just }
- uses: Swatinem/rust-cache@v2
if: github.event_name != 'release' && github.event_name != 'workflow_dispatch'

- run: |
rustc --version
cargo --version
rustup --version
- run: cargo check
- run: rustup component add clippy rustfmt
- run: cargo fmt --all -- --check
- run: cargo clippy --all-targets --all-features -- -D warnings
- run: cargo test --all-targets --all-features
- run: cargo test --features http-async
- run: cargo test --features mmap-async-tokio
- run: cargo test --features tilejson
- run: cargo test --features s3-async-native
- run: cargo test --features s3-async-rustls
- run: cargo test
- run: just test-all
- name: Check semver
uses: obi1kenobi/cargo-semver-checks-action@v2
with:
feature-group: only-explicit-features
features: "http-async,mmap-async-tokio,tilejson,s3-async-rustls,aws-s3-async"

msrv:
name: Test MSRV
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- uses: extractions/setup-just@v2
- uses: Swatinem/rust-cache@v2
if: github.event_name != 'release' && github.event_name != 'workflow_dispatch'
- name: Read crate metadata
Expand All @@ -51,10 +47,4 @@ jobs:
uses: dtolnay/rust-toolchain@stable
with:
toolchain: ${{ steps.metadata.outputs.rust-version }}
- run: cargo test --all-targets --all-features
- run: cargo test --features http-async
- run: cargo test --features mmap-async-tokio
- run: cargo test --features tilejson
- run: cargo test --features s3-async-native
- run: cargo test --features s3-async-rustls
- run: cargo test
- run: just test
11 changes: 7 additions & 4 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
[package]
name = "pmtiles"
version = "0.10.0"
version = "0.11.0"
edition = "2021"
authors = ["Luke Seelenbinder <[email protected]>"]
license = "MIT OR Apache-2.0"
description = "Implementation of the PMTiles v3 spec with multiple sync and async backends."
repository = "https://github.com/stadiamaps/pmtiles-rs"
keywords = ["pmtiles", "gis", "geo"]
rust-version = "1.77.0"
rust-version = "1.81.0"
categories = ["science::geo"]

[features]
Expand All @@ -16,6 +16,7 @@ http-async = ["__async", "dep:reqwest"]
mmap-async-tokio = ["__async", "dep:fmmap", "fmmap?/tokio-async"]
s3-async-native = ["__async-s3", "__async-s3-nativetls"]
s3-async-rustls = ["__async-s3", "__async-s3-rustls"]
aws-s3-async = ["__async-aws-s3"]
tilejson = ["dep:tilejson", "dep:serde", "dep:serde_json"]

# Forward some of the common features to reqwest dependency
Expand All @@ -28,17 +29,19 @@ reqwest-rustls-tls-webpki-roots = ["reqwest?/rustls-tls-webpki-roots"]
# Internal features, do not use
__async = ["dep:tokio", "async-compression/tokio"]
__async-s3 = ["__async", "dep:rust-s3"]
__async-s3-nativetls = ["rust-s3?/tokio-native-tls"]
__async-s3-nativetls = ["rust-s3?/use-tokio-native-tls"]
__async-s3-rustls = ["rust-s3?/tokio-rustls-tls"]
__async-aws-s3 = ["__async", "dep:aws-sdk-s3"]

[dependencies]
# TODO: determine how we want to handle compression in async & sync environments
aws-sdk-s3 = { version = "1.49.0", optional = true }
async-compression = { version = "0.4", features = ["gzip"] }
bytes = "1"
fmmap = { version = "0.3", default-features = false, optional = true }
hilbert_2d = "1"
reqwest = { version = "0.12.4", default-features = false, optional = true }
rust-s3 = { version = "0.33.0", optional = true, default-features = false, features = ["fail-on-err"] }
rust-s3 = { version = "0.35.1", optional = true, default-features = false, features = ["fail-on-err"] }
serde = { version = "1", optional = true }
serde_json = { version = "1", optional = true }
thiserror = "1"
Expand Down
32 changes: 23 additions & 9 deletions justfile
Original file line number Diff line number Diff line change
Expand Up @@ -3,32 +3,46 @@
@_default:
just --list --unsorted

# Run all tests
test:
# These are the same tests that are run on CI. Eventually CI should just call into justfile
# Run cargo check
check:
cargo check

_add_tools:
lseelenbinder marked this conversation as resolved.
Show resolved Hide resolved
rustup component add clippy rustfmt
cargo fmt --all -- --check
cargo clippy --all-targets --all-features -- -D warnings
cargo test --all-targets --all-features

# Run all tests
test:
cargo test --features http-async
cargo test --features mmap-async-tokio
cargo test --features tilejson
cargo test --features s3-async-native
cargo test --features s3-async-rustls
cargo test --features aws-s3-async
cargo test
RUSTDOCFLAGS="-D warnings" cargo doc --no-deps

# Run all tests and checks
test-all: check fmt clippy

# Run cargo fmt and cargo clippy
lint: fmt clippy

# Run cargo fmt
fmt:
fmt: _add_tools
cargo fmt --all -- --check

# Run cargo fmt using Rust nightly
fmt-nightly:
cargo +nightly fmt -- --config imports_granularity=Module,group_imports=StdExternalCrate

# Run cargo clippy
clippy:
cargo clippy --workspace --all-targets --all-features --bins --tests --lib --benches -- -D warnings
clippy: _add_tools
cargo clippy --workspace --all-targets --features http-async
cargo clippy --workspace --all-targets --features mmap-async-tokio
cargo clippy --workspace --all-targets --features tilejson
cargo clippy --workspace --all-targets --features s3-async-native
cargo clippy --workspace --all-targets --features s3-async-rustls
cargo clippy --workspace --all-targets --features aws-s3-async

# Build and open code documentation
docs:
Expand Down
18 changes: 17 additions & 1 deletion src/async_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,23 @@ pub trait AsyncBackend {
&self,
offset: usize,
length: usize,
) -> impl Future<Output = PmtResult<Bytes>> + Send;
) -> impl Future<Output = PmtResult<Bytes>> + Send
where
Self: Sync,
{
async move {
let data = self.read(offset, length).await?;

if data.len() == length {
Ok(data)
} else {
Err(PmtError::UnexpectedNumberOfBytesReturned(
length,
data.len(),
))
}
}
}

/// Reads up to `length` bytes starting at `offset`.
fn read(&self, offset: usize, length: usize) -> impl Future<Output = PmtResult<Bytes>> + Send;
Expand Down
89 changes: 89 additions & 0 deletions src/backend_aws_s3.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
use crate::{
async_reader::{AsyncBackend, AsyncPmTilesReader},
cache::{DirectoryCache, NoCache},
PmtError, PmtResult,
};
use aws_sdk_s3::Client;
use bytes::Bytes;

impl AsyncPmTilesReader<AwsS3Backend, NoCache> {
/// Creates a new `PMTiles` reader from a client, bucket and key to the
/// archive using the `aws-sdk-s3` backend.
///
/// Fails if the [bucket] or [key] does not exist or is an invalid
/// archive.
/// (Note: S3 requests are made to validate it.)
pub async fn new_with_client_bucket_and_path(
client: Client,
bucket: String,
key: String,
) -> PmtResult<Self> {
Self::new_with_cached_client_bucket_and_path(NoCache, client, bucket, key).await
}
}

impl<C: DirectoryCache + Sync + Send> AsyncPmTilesReader<AwsS3Backend, C> {
/// Creates a new `PMTiles` reader from a client, bucket and key to the
/// archive using the `aws-sdk-s3` backend. Caches using the designated
/// [cache].
///
/// Fails if the [bucket] or [key] does not exist or is an invalid
/// archive.
/// (Note: S3 requests are made to validate it.)
pub async fn new_with_cached_client_bucket_and_path(
cache: C,
client: Client,
bucket: String,
key: String,
) -> PmtResult<Self> {
let backend = AwsS3Backend::from(client, bucket, key);

Self::try_from_cached_source(backend, cache).await
}
}

pub struct AwsS3Backend {
client: Client,
bucket: String,
key: String,
}

impl AwsS3Backend {
#[must_use]
pub fn from(client: Client, bucket: String, key: String) -> Self {
Self {
client,
bucket,
key,
}
}
}

impl AsyncBackend for AwsS3Backend {
async fn read(&self, offset: usize, length: usize) -> PmtResult<Bytes> {
let range_end = offset + length - 1;
let range = format!("bytes={offset}-{range_end}");

let obj = self
.client
.get_object()
.bucket(self.bucket.clone())
.key(self.key.clone())
.range(range)
.send()
.await?;

let response_bytes = obj
.body
.collect()
.await
.map_err(|e| PmtError::Reading(e.into()))?
.into_bytes();

if response_bytes.len() > length {
Err(PmtError::ResponseBodyTooLong(response_bytes.len(), length))
} else {
Ok(response_bytes)
}
}
}
13 changes: 0 additions & 13 deletions src/backend_http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,19 +46,6 @@ impl HttpBackend {
}

impl AsyncBackend for HttpBackend {
async fn read_exact(&self, offset: usize, length: usize) -> PmtResult<Bytes> {
let data = self.read(offset, length).await?;

if data.len() == length {
Ok(data)
} else {
Err(PmtError::UnexpectedNumberOfBytesReturned(
length,
data.len(),
))
}
}

async fn read(&self, offset: usize, length: usize) -> PmtResult<Bytes> {
let end = offset + length - 1;
let range = format!("bytes={offset}-{end}");
Expand Down
31 changes: 13 additions & 18 deletions src/backend_s3.rs
Original file line number Diff line number Diff line change
@@ -1,24 +1,29 @@
use bytes::Bytes;
use s3::Bucket;

use crate::async_reader::{AsyncBackend, AsyncPmTilesReader};
use crate::cache::{DirectoryCache, NoCache};
use crate::error::PmtError::{ResponseBodyTooLong, UnexpectedNumberOfBytesReturned};
use crate::PmtResult;
use crate::{
async_reader::{AsyncBackend, AsyncPmTilesReader},
cache::{DirectoryCache, NoCache},
error::PmtError::ResponseBodyTooLong,
PmtResult,
};

impl AsyncPmTilesReader<S3Backend, NoCache> {
/// Creates a new `PMTiles` reader from a URL using the Reqwest backend.
/// Creates a new `PMTiles` reader from a bucket and path to the
/// archive using the `rust-s3` backend.
///
/// Fails if [url] does not exist or is an invalid archive. (Note: HTTP requests are made to validate it.)
/// Fails if [bucket] or [path] does not exist or is an invalid archive. (Note: S3 requests are made to validate it.)
pub async fn new_with_bucket_path(bucket: Bucket, path: String) -> PmtResult<Self> {
Self::new_with_cached_bucket_path(NoCache, bucket, path).await
}
}

impl<C: DirectoryCache + Sync + Send> AsyncPmTilesReader<S3Backend, C> {
/// Creates a new `PMTiles` reader with cache from a URL using the Reqwest backend.
/// Creates a new `PMTiles` reader from a bucket and path to the
/// archive using the `rust-s3` backend with a given [cache] backend.
///
/// Fails if [url] does not exist or is an invalid archive. (Note: HTTP requests are made to validate it.)
/// Fails if [bucket] or [path] does not exist or is an invalid archive. (Note: S3 requests are made to validate it.)
/// Creates a new `PMTiles` reader with cache from a URL using the Reqwest backend.
pub async fn new_with_cached_bucket_path(
cache: C,
bucket: Bucket,
Expand All @@ -43,16 +48,6 @@ impl S3Backend {
}

impl AsyncBackend for S3Backend {
async fn read_exact(&self, offset: usize, length: usize) -> PmtResult<Bytes> {
let data = self.read(offset, length).await?;

if data.len() == length {
Ok(data)
} else {
Err(UnexpectedNumberOfBytesReturned(length, data.len()))
}
}

async fn read(&self, offset: usize, length: usize) -> PmtResult<Bytes> {
let response = self
.bucket
Expand Down
2 changes: 1 addition & 1 deletion src/directory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ impl Directory {
/// Get an estimated byte size of the directory object. Use this for cache eviction.
#[must_use]
pub fn get_approx_byte_size(&self) -> usize {
self.entries.capacity() * std::mem::size_of::<DirEntry>()
self.entries.capacity() * size_of::<DirEntry>()
}
}

Expand Down
12 changes: 10 additions & 2 deletions src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,13 +33,16 @@ pub enum PmtError {
#[cfg(feature = "mmap-async-tokio")]
#[error("Unable to open mmap file")]
UnableToOpenMmapFile,
#[cfg(any(feature = "http-async", feature = "__async-s3"))]
#[error("Unexpected number of bytes returned [expected: {0}, received: {1}].")]
UnexpectedNumberOfBytesReturned(usize, usize),
#[cfg(feature = "http-async")]
#[error("Range requests unsupported")]
RangeRequestsUnsupported,
#[cfg(any(feature = "http-async", feature = "__async-s3"))]
#[cfg(any(
feature = "http-async",
feature = "__async-s3",
feature = "__async-aws-s3"
))]
#[error("HTTP response body is too long, Response {0}B > requested {1}B")]
ResponseBodyTooLong(usize, usize),
#[cfg(feature = "http-async")]
Expand All @@ -51,4 +54,9 @@ pub enum PmtError {
#[cfg(feature = "__async-s3")]
#[error(transparent)]
S3(#[from] s3::error::S3Error),
#[cfg(feature = "__async-aws-s3")]
#[error(transparent)]
AwsS3Request(
#[from] aws_sdk_s3::error::SdkError<aws_sdk_s3::operation::get_object::GetObjectError>,
),
}
Loading