From dfa9e1e6a37e957b5d01a961b9a4121e42080793 Mon Sep 17 00:00:00 2001 From: Morgan Mccauley Date: Mon, 15 Apr 2024 11:20:39 +1200 Subject: [PATCH 01/10] feat: Allow custom configuration of `S3Client` --- src/types.rs | 27 +++++++++++++++++++++++++-- 1 file changed, 25 insertions(+), 2 deletions(-) diff --git a/src/types.rs b/src/types.rs index fe255d8..07f5da0 100644 --- a/src/types.rs +++ b/src/types.rs @@ -15,8 +15,8 @@ pub type BlockHeight = u64; /// .expect("Failed to build LakeConfig"); /// # } /// ``` -#[derive(Default, Builder, Debug)] -#[builder(pattern = "owned")] +#[derive(Default, Builder)] +#[builder(pattern = "owned", build_fn(validate = "Self::validate"))] pub struct LakeConfig { /// AWS S3 Bucket name #[builder(setter(into))] @@ -50,13 +50,36 @@ pub struct LakeConfig { /// .expect("Failed to build LakeConfig"); /// # } /// ``` + /// + /// This field is mutually exclusive with [LakeConfigBuilder::s3_client]. #[builder(setter(strip_option), default)] pub s3_config: Option, + /// Provide a custom S3 client which implements the s3_fetchers::S3Client trait. This is useful + /// if you need more control over the requests made to S3, e.g. you want to add cache. + /// + /// This field is mutually exclusive with [LakeConfigBuilder::s3_config]. + #[builder(setter(strip_option, custom), default)] + pub(crate) s3_client: Option>, #[builder(default = "100")] pub(crate) blocks_preload_pool_size: usize, } impl LakeConfigBuilder { + fn validate(&self) -> Result<(), String> { + if self.s3_config.is_some() && self.s3_client.is_some() { + return Err("Cannot provide both s3_config and s3_client".to_string()); + } + + Ok(()) + } + + pub fn s3_client(self, s3_client: T) -> Self { + Self { + s3_client: Some(Some(Box::new(s3_client))), + ..self + } + } + /// Shortcut to set up [LakeConfigBuilder::s3_bucket_name] for mainnet /// ``` /// use near_lake_framework::LakeConfigBuilder; From 15cde979a8cd7f3319678b951b979f09acded43f Mon Sep 17 00:00:00 2001 From: Morgan Mccauley Date: Mon, 15 Apr 2024 11:22:27 +1200 Subject: [PATCH 02/10] feat: Make `S3Client` easier to work with from outside --- src/s3_client.rs | 82 +++++++++++++ src/s3_fetchers.rs | 287 ++++++++++++++++++++------------------------- src/types.rs | 20 ++-- 3 files changed, 220 insertions(+), 169 deletions(-) create mode 100644 src/s3_client.rs diff --git a/src/s3_client.rs b/src/s3_client.rs new file mode 100644 index 0000000..fe28fbd --- /dev/null +++ b/src/s3_client.rs @@ -0,0 +1,82 @@ +use std::error::Error; +use std::sync::Arc; + +use async_trait::async_trait; + +pub type S3ClientError = Arc; + +#[derive(Debug, thiserror::Error)] +pub struct GetObjectBytesError(pub S3ClientError); + +impl std::ops::Deref for GetObjectBytesError { + type Target = S3ClientError; + + fn deref(&self) -> &Self::Target { + &self.0 + } +} + +impl std::fmt::Display for GetObjectBytesError { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "GetObjectBytesError: {}", self.0) + } +} + +impl From> + for GetObjectBytesError +{ + fn from( + error: aws_sdk_s3::error::SdkError, + ) -> Self { + Self(Arc::new(error)) + } +} + +impl From for GetObjectBytesError { + fn from(error: aws_smithy_types::byte_stream::error::Error) -> Self { + Self(Arc::new(error)) + } +} + +#[derive(Debug, thiserror::Error)] +pub struct ListCommonPrefixesError(pub S3ClientError); + +impl std::fmt::Display for ListCommonPrefixesError { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "ListCommonPrefixesError: {}", self.0) + } +} + +impl std::ops::Deref for ListCommonPrefixesError { + type Target = S3ClientError; + fn deref(&self) -> &Self::Target { + &self.0 + } +} + +impl From> + for ListCommonPrefixesError +{ + fn from( + error: aws_sdk_s3::error::SdkError< + aws_sdk_s3::operation::list_objects_v2::ListObjectsV2Error, + >, + ) -> Self { + Self(Arc::new(error)) + } +} + +#[async_trait] +pub trait S3Client: Send + Sync { + async fn get_object_bytes( + &self, + bucket: &str, + prefix: &str, + ) -> Result, GetObjectBytesError>; + + async fn list_common_prefixes( + &self, + bucket: &str, + start_after_prefix: &str, + ) -> Result, ListCommonPrefixesError>; +} diff --git a/src/s3_fetchers.rs b/src/s3_fetchers.rs index d5bfdf5..4bc8382 100644 --- a/src/s3_fetchers.rs +++ b/src/s3_fetchers.rs @@ -1,26 +1,8 @@ -use async_trait::async_trait; use std::str::FromStr; -#[async_trait] -pub trait S3Client { - async fn get_object( - &self, - bucket: &str, - prefix: &str, - ) -> Result< - aws_sdk_s3::operation::get_object::GetObjectOutput, - aws_sdk_s3::error::SdkError, - >; +use async_trait::async_trait; - async fn list_objects( - &self, - bucket: &str, - start_after: &str, - ) -> Result< - aws_sdk_s3::operation::list_objects_v2::ListObjectsV2Output, - aws_sdk_s3::error::SdkError, - >; -} +use crate::s3_client::{GetObjectBytesError, ListCommonPrefixesError, S3Client}; #[derive(Clone, Debug)] pub struct LakeS3Client { @@ -33,82 +15,88 @@ impl LakeS3Client { } } +impl LakeS3Client { + pub fn from_conf(config: aws_sdk_s3::config::Config) -> Self { + let s3_client = aws_sdk_s3::Client::from_conf(config); + + Self { s3: s3_client } + } +} + #[async_trait] impl S3Client for LakeS3Client { - async fn get_object( + async fn get_object_bytes( &self, bucket: &str, prefix: &str, - ) -> Result< - aws_sdk_s3::operation::get_object::GetObjectOutput, - aws_sdk_s3::error::SdkError, - > { - Ok(self + ) -> Result, GetObjectBytesError> { + let object = self .s3 .get_object() .bucket(bucket) .key(prefix) .request_payer(aws_sdk_s3::types::RequestPayer::Requester) .send() - .await?) + .await?; + + let bytes = object.body.collect().await?.into_bytes().to_vec(); + + Ok(bytes) } - async fn list_objects( + async fn list_common_prefixes( &self, bucket: &str, - start_after: &str, - ) -> Result< - aws_sdk_s3::operation::list_objects_v2::ListObjectsV2Output, - aws_sdk_s3::error::SdkError, - > { - Ok(self + start_after_prefix: &str, + ) -> Result, ListCommonPrefixesError> { + let response = self .s3 .list_objects_v2() .max_keys(1000) // 1000 is the default and max value for this parameter .delimiter("/".to_string()) - .start_after(start_after) + .start_after(start_after_prefix) .request_payer(aws_sdk_s3::types::RequestPayer::Requester) .bucket(bucket) .send() - .await?) + .await?; + + let prefixes = match response.common_prefixes { + None => vec![], + Some(common_prefixes) => common_prefixes + .into_iter() + .filter_map(|common_prefix| common_prefix.prefix) + .collect::>() + .into_iter() + .filter_map(|prefix_string| prefix_string.split('/').next().map(String::from)) + .collect(), + }; + + Ok(prefixes) } } /// Queries the list of the objects in the bucket, grouped by "/" delimiter. /// Returns the list of block heights that can be fetched pub async fn list_block_heights( - lake_s3_client: &impl S3Client, + lake_s3_client: &dyn S3Client, s3_bucket_name: &str, start_from_block_height: crate::types::BlockHeight, -) -> Result< - Vec, - crate::types::LakeError, -> { +) -> Result, crate::types::LakeError> { tracing::debug!( target: crate::LAKE_FRAMEWORK, "Fetching block heights from S3, after #{}...", start_from_block_height ); - let response = lake_s3_client - .list_objects(s3_bucket_name, &format!("{:0>12}", start_from_block_height)) + + let prefixes = lake_s3_client + .list_common_prefixes(s3_bucket_name, &format!("{:0>12}", start_from_block_height)) .await?; - Ok(match response.common_prefixes { - None => vec![], - Some(common_prefixes) => common_prefixes - .into_iter() - .filter_map(|common_prefix| common_prefix.prefix) - .collect::>() - .into_iter() - .filter_map(|prefix_string| { - prefix_string - .split('/') - .next() - .map(u64::from_str) - .and_then(|num| num.ok()) - }) - .collect(), - }) + Ok(prefixes + .iter() + .map(|folder| u64::from_str(folder.as_str())) + .filter_map(|num| num.ok()) + .collect()) } /// By the given block height gets the objects: @@ -117,13 +105,10 @@ pub async fn list_block_heights( /// Reads the content of the objects and parses as a JSON. /// Returns the result in `near_indexer_primitives::StreamerMessage` pub(crate) async fn fetch_streamer_message( - lake_s3_client: &impl S3Client, + lake_s3_client: &dyn S3Client, s3_bucket_name: &str, block_height: crate::types::BlockHeight, -) -> Result< - near_indexer_primitives::StreamerMessage, - crate::types::LakeError, -> { +) -> Result { let block_view = fetch_block_or_retry(lake_s3_client, s3_bucket_name, block_height).await?; let fetch_shards_futures = (0..block_view.chunks.len() as u64) @@ -143,56 +128,52 @@ pub(crate) async fn fetch_streamer_message( /// Fetches the block data JSON from AWS S3 and returns the `BlockView` pub async fn fetch_block( - lake_s3_client: &impl S3Client, + lake_s3_client: &dyn S3Client, s3_bucket_name: &str, block_height: crate::types::BlockHeight, -) -> Result< - near_indexer_primitives::views::BlockView, - crate::types::LakeError, -> { - let body_bytes = lake_s3_client - .get_object(s3_bucket_name, &format!("{:0>12}/block.json", block_height)) - .await? - .body - .collect() - .await? - .into_bytes(); +) -> Result { + let bytes = lake_s3_client + .get_object_bytes(s3_bucket_name, &format!("{:0>12}/block.json", block_height)) + .await?; Ok(serde_json::from_slice::< near_indexer_primitives::views::BlockView, - >(body_bytes.as_ref())?) + >(&bytes)?) } /// Fetches the block data JSON from AWS S3 and returns the `BlockView` retrying until it succeeds (indefinitely) pub async fn fetch_block_or_retry( - lake_s3_client: &impl S3Client, + lake_s3_client: &dyn S3Client, s3_bucket_name: &str, block_height: crate::types::BlockHeight, -) -> Result< - near_indexer_primitives::views::BlockView, - crate::types::LakeError, -> { +) -> Result { loop { match fetch_block(lake_s3_client, s3_bucket_name, block_height).await { Ok(block_view) => break Ok(block_view), - Err(err) => match err { - crate::types::LakeError::AwsError { .. } => { - tracing::debug!( - target: crate::LAKE_FRAMEWORK, - "Block #{:0>12} not found. Retrying in immediately...\n{:#?}", - block_height, - err, - ); - } - crate::types::LakeError::AwsSmithyError { .. } => { - tracing::debug!( - target: crate::LAKE_FRAMEWORK, - "Failed to read bytes from the block #{:0>12} response. Retrying immediately.\n{:#?}", - block_height, - err, - ); - } - _ => { + Err(err) => { + if let crate::types::LakeError::S3GetError { ref error } = err { + if let Some(get_object_error) = + error.downcast_ref::() + { + tracing::debug!( + target: crate::LAKE_FRAMEWORK, + "Block #{:0>12} not found. Retrying in immediately...\n{:#?}", + block_height, + get_object_error, + ); + } + + if let Some(bytes_error) = + error.downcast_ref::() + { + tracing::debug!( + target: crate::LAKE_FRAMEWORK, + "Failed to read bytes from the block #{:0>12} response. Retrying immediately.\n{:#?}", + block_height, + bytes_error, + ); + } + tracing::debug!( target: crate::LAKE_FRAMEWORK, "Failed to fetch block #{}, retrying immediately\n{:#?}", @@ -200,70 +181,66 @@ pub async fn fetch_block_or_retry( err ); } - }, + } } } } /// Fetches the shard data JSON from AWS S3 and returns the `IndexerShard` pub async fn fetch_shard( - lake_s3_client: &impl S3Client, + lake_s3_client: &dyn S3Client, s3_bucket_name: &str, block_height: crate::types::BlockHeight, shard_id: u64, -) -> Result< - near_indexer_primitives::IndexerShard, - crate::types::LakeError, -> { - let body_bytes = lake_s3_client - .get_object( +) -> Result { + let bytes = lake_s3_client + .get_object_bytes( s3_bucket_name, &format!("{:0>12}/shard_{}.json", block_height, shard_id), ) - .await? - .body - .collect() - .await? - .into_bytes(); + .await?; Ok(serde_json::from_slice::< near_indexer_primitives::IndexerShard, - >(body_bytes.as_ref())?) + >(&bytes)?) } /// Fetches the shard data JSON from AWS S3 and returns the `IndexerShard` pub async fn fetch_shard_or_retry( - lake_s3_client: &impl S3Client, + lake_s3_client: &dyn S3Client, s3_bucket_name: &str, block_height: crate::types::BlockHeight, shard_id: u64, -) -> Result< - near_indexer_primitives::IndexerShard, - crate::types::LakeError, -> { +) -> Result { loop { match fetch_shard(lake_s3_client, s3_bucket_name, block_height, shard_id).await { Ok(shard) => break Ok(shard), - Err(err) => match err { - crate::types::LakeError::AwsError { .. } => { - tracing::debug!( - target: crate::LAKE_FRAMEWORK, - "Shard {} of block #{:0>12} not found. Retrying in immediately...\n{:#?}", - shard_id, - block_height, - err, - ); - } - crate::types::LakeError::AwsSmithyError { .. } => { - tracing::debug!( - target: crate::LAKE_FRAMEWORK, - "Failed to read bytes from the shard {} of block #{:0>12} response. Retrying immediately.\n{:#?}", - shard_id, - block_height, - err, - ); - } - _ => { + Err(err) => { + if let crate::types::LakeError::S3ListError { ref error } = err { + if let Some(list_objects_error) = + error.downcast_ref::() + { + tracing::debug!( + target: crate::LAKE_FRAMEWORK, + "Shard {} of block #{:0>12} not found. Retrying in immediately...\n{:#?}", + shard_id, + block_height, + list_objects_error, + ); + } + + if let Some(bytes_error) = + error.downcast_ref::() + { + tracing::debug!( + target: crate::LAKE_FRAMEWORK, + "Failed to read bytes from the shard {} of block #{:0>12} response. Retrying immediately.\n{:#?}", + shard_id, + block_height, + bytes_error, + ); + } + tracing::debug!( target: crate::LAKE_FRAMEWORK, "Failed to fetch shard {} of block #{}, retrying immediately\n{:#?}", @@ -272,7 +249,7 @@ pub async fn fetch_shard_or_retry( err ); } - }, + } } } } @@ -281,42 +258,32 @@ pub async fn fetch_shard_or_retry( mod test { use super::*; - use async_trait::async_trait; - - use aws_sdk_s3::operation::get_object::GetObjectOutput; - use aws_sdk_s3::operation::list_objects_v2::ListObjectsV2Output; - use aws_sdk_s3::primitives::ByteStream; + use std::sync::Arc; - use aws_smithy_types::body::SdkBody; + use async_trait::async_trait; #[derive(Clone, Debug)] pub struct LakeS3Client {} #[async_trait] impl S3Client for LakeS3Client { - async fn get_object( + async fn get_object_bytes( &self, _bucket: &str, prefix: &str, - ) -> Result< - aws_sdk_s3::operation::get_object::GetObjectOutput, - aws_sdk_s3::error::SdkError, - > { + ) -> Result, GetObjectBytesError> { let path = format!("{}/blocks/{}", env!("CARGO_MANIFEST_DIR"), prefix); - let file_bytes = tokio::fs::read(path).await.unwrap(); - let stream = ByteStream::new(SdkBody::from(file_bytes)); - Ok(GetObjectOutput::builder().body(stream).build()) + tokio::fs::read(path) + .await + .map_err(|e| GetObjectBytesError(Arc::new(e))) } - async fn list_objects( + async fn list_common_prefixes( &self, _bucket: &str, _start_after: &str, - ) -> Result< - aws_sdk_s3::operation::list_objects_v2::ListObjectsV2Output, - aws_sdk_s3::error::SdkError, - > { - Ok(ListObjectsV2Output::builder().build()) + ) -> Result, ListCommonPrefixesError> { + Ok(Vec::new()) } } diff --git a/src/types.rs b/src/types.rs index 07f5da0..5a0e5b3 100644 --- a/src/types.rs +++ b/src/types.rs @@ -1,3 +1,5 @@ +use crate::s3_client::{GetObjectBytesError, ListCommonPrefixesError, S3Client}; + /// Type alias represents the block height pub type BlockHeight = u64; @@ -137,25 +139,25 @@ impl LakeConfigBuilder { #[allow(clippy::enum_variant_names)] #[derive(thiserror::Error, Debug)] -pub enum LakeError { +pub enum LakeError { #[error("Failed to parse structure from JSON: {error_message:?}")] ParseError { #[from] error_message: serde_json::Error, }, - #[error("AWS S3 error: {error:?}")] - AwsError { + #[error("Get object error: {error:?}")] + S3GetError { + #[from] + error: GetObjectBytesError, + }, + #[error("List objects error: {error:?}")] + S3ListError { #[from] - error: aws_sdk_s3::error::SdkError, + error: ListCommonPrefixesError, }, #[error("Failed to convert integer: {error:?}")] IntConversionError { #[from] error: std::num::TryFromIntError, }, - #[error("AWS Smithy byte_stream error: {error:?}")] - AwsSmithyError { - #[from] - error: aws_smithy_types::byte_stream::error::Error, - }, } From 5907da969fe778b3acd367451f862abf4edbe39e Mon Sep 17 00:00:00 2001 From: Morgan Mccauley Date: Mon, 15 Apr 2024 11:22:50 +1200 Subject: [PATCH 03/10] feat: Use custom `S3Client` if configured --- src/lib.rs | 32 ++++++++++++++++++-------------- 1 file changed, 18 insertions(+), 14 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index 314456a..11f711d 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -253,6 +253,7 @@ pub use near_indexer_primitives; pub use aws_credential_types::Credentials; pub use types::{LakeConfig, LakeConfigBuilder}; +pub mod s3_client; pub mod s3_fetchers; pub(crate) mod types; @@ -288,7 +289,7 @@ pub fn streamer( } fn stream_block_heights<'a: 'b, 'b>( - lake_s3_client: &'a s3_fetchers::LakeS3Client, + lake_s3_client: &'a dyn s3_client::S3Client, s3_bucket_name: &'a str, mut start_from_block_height: crate::types::BlockHeight, ) -> impl futures::Stream + 'b { @@ -384,16 +385,19 @@ async fn start( ) -> anyhow::Result<()> { let mut start_from_block_height = config.start_block_height; - let s3_client = if let Some(config) = config.s3_config { - aws_sdk_s3::Client::from_conf(config) - } else { - let aws_config = aws_config::from_env().load().await; - let s3_config = aws_sdk_s3::config::Builder::from(&aws_config) - .region(aws_types::region::Region::new(config.s3_region_name)) - .build(); - aws_sdk_s3::Client::from_conf(s3_config) - }; - let lake_s3_client = s3_fetchers::LakeS3Client::new(s3_client.clone()); + let lake_s3_client: Box = + if let Some(s3_client) = config.s3_client { + s3_client + } else if let Some(config) = config.s3_config { + Box::new(s3_fetchers::LakeS3Client::from_conf(config)) + } else { + let aws_config = aws_config::from_env().load().await; + let s3_config = aws_sdk_s3::config::Builder::from(&aws_config) + .region(aws_types::region::Region::new(config.s3_region_name)) + .build(); + + Box::new(s3_fetchers::LakeS3Client::from_conf(s3_config)) + }; let mut last_processed_block_hash: Option = None; @@ -406,7 +410,7 @@ async fn start( // We require to stream blocks consistently, so we need to try to load the block again. let pending_block_heights = stream_block_heights( - &lake_s3_client, + &*lake_s3_client, &config.s3_bucket_name, start_from_block_height, ); @@ -429,7 +433,7 @@ async fn start( .into_iter() .map(|block_height| { s3_fetchers::fetch_streamer_message( - &lake_s3_client, + &*lake_s3_client, &config.s3_bucket_name, block_height, ) @@ -530,7 +534,7 @@ async fn start( .into_iter() .map(|block_height| { s3_fetchers::fetch_streamer_message( - &lake_s3_client, + &*lake_s3_client, &config.s3_bucket_name, block_height, ) From a38284340e1db8c6228e12cdadb49b652eaebff6 Mon Sep 17 00:00:00 2001 From: Morgan Mccauley Date: Mon, 15 Apr 2024 13:50:51 +1200 Subject: [PATCH 04/10] chore: Remove unused crates --- Cargo.toml | 3 --- 1 file changed, 3 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 1ca0c02..21b0b01 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -34,8 +34,5 @@ tracing = "0.1.40" near-indexer-primitives = "0.20.0" -[dev-dependencies] -aws-smithy-http = "0.60.3" - [lib] doctest = false From 88f23a41aab7b3c4bb01c7ca4fac3d7b81aee8dc Mon Sep 17 00:00:00 2001 From: Morgan Mccauley Date: Mon, 15 Apr 2024 14:21:27 +1200 Subject: [PATCH 05/10] refactor: Merge duplicate `impl`s --- src/s3_fetchers.rs | 2 -- 1 file changed, 2 deletions(-) diff --git a/src/s3_fetchers.rs b/src/s3_fetchers.rs index 4bc8382..595f51f 100644 --- a/src/s3_fetchers.rs +++ b/src/s3_fetchers.rs @@ -13,9 +13,7 @@ impl LakeS3Client { pub fn new(s3: aws_sdk_s3::Client) -> Self { Self { s3 } } -} -impl LakeS3Client { pub fn from_conf(config: aws_sdk_s3::config::Config) -> Self { let s3_client = aws_sdk_s3::Client::from_conf(config); From 120b1b06d38c7eadd9392adb47ee7ac6a52d54d6 Mon Sep 17 00:00:00 2001 From: Morgan McCauley Date: Mon, 15 Apr 2024 20:23:45 +1200 Subject: [PATCH 06/10] Update src/s3_fetchers.rs Co-authored-by: Bohdan Khorolets --- src/s3_fetchers.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/s3_fetchers.rs b/src/s3_fetchers.rs index 595f51f..c2b299f 100644 --- a/src/s3_fetchers.rs +++ b/src/s3_fetchers.rs @@ -220,7 +220,7 @@ pub async fn fetch_shard_or_retry( { tracing::debug!( target: crate::LAKE_FRAMEWORK, - "Shard {} of block #{:0>12} not found. Retrying in immediately...\n{:#?}", + "Shard {} of block #{:0>12} not found. Retrying immediately...\n{:#?}", shard_id, block_height, list_objects_error, From 039a6093f67026a434a019bc4a41eaaa7e84a522 Mon Sep 17 00:00:00 2001 From: Morgan McCauley Date: Mon, 15 Apr 2024 20:23:51 +1200 Subject: [PATCH 07/10] Update src/s3_fetchers.rs Co-authored-by: Bohdan Khorolets --- src/s3_fetchers.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/s3_fetchers.rs b/src/s3_fetchers.rs index c2b299f..a29bb1e 100644 --- a/src/s3_fetchers.rs +++ b/src/s3_fetchers.rs @@ -155,7 +155,7 @@ pub async fn fetch_block_or_retry( { tracing::debug!( target: crate::LAKE_FRAMEWORK, - "Block #{:0>12} not found. Retrying in immediately...\n{:#?}", + "Block #{:0>12} not found. Retrying immediately...\n{:#?}", block_height, get_object_error, ); From 58bcb7d33bc27342dc010487e840579bc174d22a Mon Sep 17 00:00:00 2001 From: Morgan Mccauley Date: Mon, 15 Apr 2024 20:55:51 +1200 Subject: [PATCH 08/10] feat: derive `Clone` for `S3Client` errors --- src/s3_client.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/s3_client.rs b/src/s3_client.rs index fe28fbd..6d718b4 100644 --- a/src/s3_client.rs +++ b/src/s3_client.rs @@ -5,7 +5,7 @@ use async_trait::async_trait; pub type S3ClientError = Arc; -#[derive(Debug, thiserror::Error)] +#[derive(Debug, thiserror::Error, Clone)] pub struct GetObjectBytesError(pub S3ClientError); impl std::ops::Deref for GetObjectBytesError { @@ -38,7 +38,7 @@ impl From for GetObjectBytesError { } } -#[derive(Debug, thiserror::Error)] +#[derive(Debug, thiserror::Error, Clone)] pub struct ListCommonPrefixesError(pub S3ClientError); impl std::fmt::Display for ListCommonPrefixesError { From 421b70ab229991ff08c9c369336a3469068b366b Mon Sep 17 00:00:00 2001 From: Morgan Mccauley Date: Tue, 16 Apr 2024 21:01:50 +1200 Subject: [PATCH 09/10] chore: Update `CHANGELOG.md` --- CHANGELOG.md | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index bf3a5c7..ead01b0 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,12 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased](https://github.com/near/near-lake-framework/compare/v0.7.7...HEAD) +* Expose `s3_client::S3Client` trait so that custom S3 client implementations can be configured via `LakeConfigBuilder::s3_client` + +### Breaking Change + +* Errors returned from public `s3_fetcher` methods have changed slightly to support the newly exposed `S3Client`. As these methods serve a rare use-case this is only a minor bump. + ## [0.7.7](https://github.com/near/near-lake-framework/compare/v0.7.6...0.7.7) * Refactor `s3_fetchers` module to allow use `list_block_heights` outside of the framework From e6d2e47944596bf765bf16753de08971e31e001e Mon Sep 17 00:00:00 2001 From: Bohdan Khorolets Date: Tue, 16 Apr 2024 14:56:15 +0300 Subject: [PATCH 10/10] Bump the version to 0.7.8 --- Cargo.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Cargo.toml b/Cargo.toml index 21b0b01..1a97e07 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -12,7 +12,7 @@ rust-version = "1.75.0" # cargo-workspaces [workspace.metadata.workspaces] -version = "0.7.7" +version = "0.7.8" [dependencies] anyhow = "1.0.79"