Skip to content

Commit

Permalink
feat: Allow custom configuration of S3Client (#102)
Browse files Browse the repository at this point in the history
* feat: Allow custom configuration of `S3Client`

* feat: Make `S3Client` easier to work with from outside

* feat: Use custom `S3Client` if configured

* chore: Remove unused crates

* refactor: Merge duplicate `impl`s

* Update src/s3_fetchers.rs

Co-authored-by: Bohdan Khorolets <[email protected]>

* Update src/s3_fetchers.rs

Co-authored-by: Bohdan Khorolets <[email protected]>

* feat: derive `Clone` for `S3Client` errors

* chore: Update `CHANGELOG.md`

* Bump the version to 0.7.8

---------

Co-authored-by: Bohdan Khorolets <[email protected]>
  • Loading branch information
morgsmccauley and khorolets authored Apr 16, 2024
1 parent 464cf7d commit f440546
Show file tree
Hide file tree
Showing 6 changed files with 268 additions and 189 deletions.
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,12 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## [Unreleased](https://github.com/near/near-lake-framework/compare/v0.7.7...HEAD)

* Expose `s3_client::S3Client` trait so that custom S3 client implementations can be configured via `LakeConfigBuilder::s3_client`

### Breaking Change

* Errors returned from public `s3_fetcher` methods have changed slightly to support the newly exposed `S3Client`. As these methods serve a rare use-case this is only a minor bump.

## [0.7.7](https://github.com/near/near-lake-framework/compare/v0.7.6...0.7.7)

* Refactor `s3_fetchers` module to allow use `list_block_heights` outside of the framework
Expand Down
5 changes: 1 addition & 4 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ rust-version = "1.75.0"

# cargo-workspaces
[workspace.metadata.workspaces]
version = "0.7.7"
version = "0.7.8"

[dependencies]
anyhow = "1.0.79"
Expand All @@ -34,8 +34,5 @@ tracing = "0.1.40"

near-indexer-primitives = "0.20.0"

[dev-dependencies]
aws-smithy-http = "0.60.3"

[lib]
doctest = false
32 changes: 18 additions & 14 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -253,6 +253,7 @@ pub use near_indexer_primitives;
pub use aws_credential_types::Credentials;
pub use types::{LakeConfig, LakeConfigBuilder};

pub mod s3_client;
pub mod s3_fetchers;
pub(crate) mod types;

Expand Down Expand Up @@ -288,7 +289,7 @@ pub fn streamer(
}

fn stream_block_heights<'a: 'b, 'b>(
lake_s3_client: &'a s3_fetchers::LakeS3Client,
lake_s3_client: &'a dyn s3_client::S3Client,
s3_bucket_name: &'a str,
mut start_from_block_height: crate::types::BlockHeight,
) -> impl futures::Stream<Item = u64> + 'b {
Expand Down Expand Up @@ -384,16 +385,19 @@ async fn start(
) -> anyhow::Result<()> {
let mut start_from_block_height = config.start_block_height;

let s3_client = if let Some(config) = config.s3_config {
aws_sdk_s3::Client::from_conf(config)
} else {
let aws_config = aws_config::from_env().load().await;
let s3_config = aws_sdk_s3::config::Builder::from(&aws_config)
.region(aws_types::region::Region::new(config.s3_region_name))
.build();
aws_sdk_s3::Client::from_conf(s3_config)
};
let lake_s3_client = s3_fetchers::LakeS3Client::new(s3_client.clone());
let lake_s3_client: Box<dyn crate::s3_client::S3Client> =
if let Some(s3_client) = config.s3_client {
s3_client
} else if let Some(config) = config.s3_config {
Box::new(s3_fetchers::LakeS3Client::from_conf(config))
} else {
let aws_config = aws_config::from_env().load().await;
let s3_config = aws_sdk_s3::config::Builder::from(&aws_config)
.region(aws_types::region::Region::new(config.s3_region_name))
.build();

Box::new(s3_fetchers::LakeS3Client::from_conf(s3_config))
};

let mut last_processed_block_hash: Option<near_indexer_primitives::CryptoHash> = None;

Expand All @@ -406,7 +410,7 @@ async fn start(
// We require to stream blocks consistently, so we need to try to load the block again.

let pending_block_heights = stream_block_heights(
&lake_s3_client,
&*lake_s3_client,
&config.s3_bucket_name,
start_from_block_height,
);
Expand All @@ -429,7 +433,7 @@ async fn start(
.into_iter()
.map(|block_height| {
s3_fetchers::fetch_streamer_message(
&lake_s3_client,
&*lake_s3_client,
&config.s3_bucket_name,
block_height,
)
Expand Down Expand Up @@ -530,7 +534,7 @@ async fn start(
.into_iter()
.map(|block_height| {
s3_fetchers::fetch_streamer_message(
&lake_s3_client,
&*lake_s3_client,
&config.s3_bucket_name,
block_height,
)
Expand Down
82 changes: 82 additions & 0 deletions src/s3_client.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
use std::error::Error;
use std::sync::Arc;

use async_trait::async_trait;

pub type S3ClientError = Arc<dyn Error + Send + Sync + 'static>;

#[derive(Debug, thiserror::Error, Clone)]
pub struct GetObjectBytesError(pub S3ClientError);

impl std::ops::Deref for GetObjectBytesError {
type Target = S3ClientError;

fn deref(&self) -> &Self::Target {
&self.0
}
}

impl std::fmt::Display for GetObjectBytesError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "GetObjectBytesError: {}", self.0)
}
}

impl From<aws_sdk_s3::error::SdkError<aws_sdk_s3::operation::get_object::GetObjectError>>
for GetObjectBytesError
{
fn from(
error: aws_sdk_s3::error::SdkError<aws_sdk_s3::operation::get_object::GetObjectError>,
) -> Self {
Self(Arc::new(error))
}
}

impl From<aws_smithy_types::byte_stream::error::Error> for GetObjectBytesError {
fn from(error: aws_smithy_types::byte_stream::error::Error) -> Self {
Self(Arc::new(error))
}
}

#[derive(Debug, thiserror::Error, Clone)]
pub struct ListCommonPrefixesError(pub S3ClientError);

impl std::fmt::Display for ListCommonPrefixesError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "ListCommonPrefixesError: {}", self.0)
}
}

impl std::ops::Deref for ListCommonPrefixesError {
type Target = S3ClientError;
fn deref(&self) -> &Self::Target {
&self.0
}
}

impl From<aws_sdk_s3::error::SdkError<aws_sdk_s3::operation::list_objects_v2::ListObjectsV2Error>>
for ListCommonPrefixesError
{
fn from(
error: aws_sdk_s3::error::SdkError<
aws_sdk_s3::operation::list_objects_v2::ListObjectsV2Error,
>,
) -> Self {
Self(Arc::new(error))
}
}

#[async_trait]
pub trait S3Client: Send + Sync {
async fn get_object_bytes(
&self,
bucket: &str,
prefix: &str,
) -> Result<Vec<u8>, GetObjectBytesError>;

async fn list_common_prefixes(
&self,
bucket: &str,
start_after_prefix: &str,
) -> Result<Vec<String>, ListCommonPrefixesError>;
}
Loading

0 comments on commit f440546

Please sign in to comment.