Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Allow custom configuration of S3Client #102

Merged
merged 10 commits into from
Apr 16, 2024
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,12 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## [Unreleased](https://github.com/near/near-lake-framework/compare/v0.7.7...HEAD)

* Expose `s3_client::S3Client` trait so that custom S3 client implementations can be configured via `LakeConfigBuilder::s3_client`

### Breaking Change

* Errors returned from public `s3_fetcher` methods have changed slightly to support the newly exposed `S3Client`. As these methods serve a rare use-case this is only a minor bump.

## [0.7.7](https://github.com/near/near-lake-framework/compare/v0.7.6...0.7.7)

* Refactor `s3_fetchers` module to allow use `list_block_heights` outside of the framework
Expand Down
5 changes: 1 addition & 4 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ rust-version = "1.75.0"

# cargo-workspaces
[workspace.metadata.workspaces]
version = "0.7.7"
version = "0.7.8"

[dependencies]
anyhow = "1.0.79"
Expand All @@ -34,8 +34,5 @@ tracing = "0.1.40"

near-indexer-primitives = "0.20.0"

[dev-dependencies]
aws-smithy-http = "0.60.3"

[lib]
doctest = false
32 changes: 18 additions & 14 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -253,6 +253,7 @@ pub use near_indexer_primitives;
pub use aws_credential_types::Credentials;
pub use types::{LakeConfig, LakeConfigBuilder};

pub mod s3_client;
pub mod s3_fetchers;
pub(crate) mod types;

Expand Down Expand Up @@ -288,7 +289,7 @@ pub fn streamer(
}

fn stream_block_heights<'a: 'b, 'b>(
lake_s3_client: &'a s3_fetchers::LakeS3Client,
lake_s3_client: &'a dyn s3_client::S3Client,
s3_bucket_name: &'a str,
mut start_from_block_height: crate::types::BlockHeight,
) -> impl futures::Stream<Item = u64> + 'b {
Expand Down Expand Up @@ -384,16 +385,19 @@ async fn start(
) -> anyhow::Result<()> {
let mut start_from_block_height = config.start_block_height;

let s3_client = if let Some(config) = config.s3_config {
aws_sdk_s3::Client::from_conf(config)
} else {
let aws_config = aws_config::from_env().load().await;
let s3_config = aws_sdk_s3::config::Builder::from(&aws_config)
.region(aws_types::region::Region::new(config.s3_region_name))
.build();
aws_sdk_s3::Client::from_conf(s3_config)
};
let lake_s3_client = s3_fetchers::LakeS3Client::new(s3_client.clone());
let lake_s3_client: Box<dyn crate::s3_client::S3Client> =
if let Some(s3_client) = config.s3_client {
s3_client
} else if let Some(config) = config.s3_config {
Box::new(s3_fetchers::LakeS3Client::from_conf(config))
} else {
let aws_config = aws_config::from_env().load().await;
let s3_config = aws_sdk_s3::config::Builder::from(&aws_config)
.region(aws_types::region::Region::new(config.s3_region_name))
.build();

Box::new(s3_fetchers::LakeS3Client::from_conf(s3_config))
};

let mut last_processed_block_hash: Option<near_indexer_primitives::CryptoHash> = None;

Expand All @@ -406,7 +410,7 @@ async fn start(
// We require to stream blocks consistently, so we need to try to load the block again.

let pending_block_heights = stream_block_heights(
&lake_s3_client,
&*lake_s3_client,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Dereference the Box, and pass a reference to the underlying trait object.

&config.s3_bucket_name,
start_from_block_height,
);
Expand All @@ -429,7 +433,7 @@ async fn start(
.into_iter()
.map(|block_height| {
s3_fetchers::fetch_streamer_message(
&lake_s3_client,
&*lake_s3_client,
&config.s3_bucket_name,
block_height,
)
Expand Down Expand Up @@ -530,7 +534,7 @@ async fn start(
.into_iter()
.map(|block_height| {
s3_fetchers::fetch_streamer_message(
&lake_s3_client,
&*lake_s3_client,
&config.s3_bucket_name,
block_height,
)
Expand Down
82 changes: 82 additions & 0 deletions src/s3_client.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
use std::error::Error;
use std::sync::Arc;

use async_trait::async_trait;

pub type S3ClientError = Arc<dyn Error + Send + Sync + 'static>;

#[derive(Debug, thiserror::Error, Clone)]
pub struct GetObjectBytesError(pub S3ClientError);

impl std::ops::Deref for GetObjectBytesError {
type Target = S3ClientError;

fn deref(&self) -> &Self::Target {
&self.0
}
}

impl std::fmt::Display for GetObjectBytesError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "GetObjectBytesError: {}", self.0)
}
}

impl From<aws_sdk_s3::error::SdkError<aws_sdk_s3::operation::get_object::GetObjectError>>
for GetObjectBytesError
{
fn from(
error: aws_sdk_s3::error::SdkError<aws_sdk_s3::operation::get_object::GetObjectError>,
) -> Self {
Self(Arc::new(error))
}
}

impl From<aws_smithy_types::byte_stream::error::Error> for GetObjectBytesError {
fn from(error: aws_smithy_types::byte_stream::error::Error) -> Self {
Self(Arc::new(error))
}
}

#[derive(Debug, thiserror::Error, Clone)]
pub struct ListCommonPrefixesError(pub S3ClientError);

impl std::fmt::Display for ListCommonPrefixesError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "ListCommonPrefixesError: {}", self.0)
}
}

impl std::ops::Deref for ListCommonPrefixesError {
type Target = S3ClientError;
fn deref(&self) -> &Self::Target {
&self.0
}
}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Essentially some helpers to make these errors easier to work with


impl From<aws_sdk_s3::error::SdkError<aws_sdk_s3::operation::list_objects_v2::ListObjectsV2Error>>
for ListCommonPrefixesError
{
fn from(
error: aws_sdk_s3::error::SdkError<
aws_sdk_s3::operation::list_objects_v2::ListObjectsV2Error,
>,
) -> Self {
Self(Arc::new(error))
}
}

#[async_trait]
pub trait S3Client: Send + Sync {
async fn get_object_bytes(
&self,
bucket: &str,
prefix: &str,
) -> Result<Vec<u8>, GetObjectBytesError>;

async fn list_common_prefixes(
&self,
bucket: &str,
start_after_prefix: &str,
) -> Result<Vec<String>, ListCommonPrefixesError>;
}
Loading
Loading