diff --git a/CHANGELOG.md b/CHANGELOG.md index 35b21a83e99..09cd4c1b624 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -9,6 +9,10 @@ and this project adheres to [Semantic Versioning](http://semver.org/). ### Changed - [2334](https://github.com/FuelLabs/fuel-core/pull/2334): Prepare the GraphQL service for the switching to `async` methods. +- [2337](https://github.com/FuelLabs/fuel-core/pull/2337): Updated all pagination queries to work with the async stream instead of the sync iterator. + +#### Breaking +- [2337](https://github.com/FuelLabs/fuel-core/pull/2337): The maximum number of processed coins from the `coins_to_spend` query is limited to `max_inputs`. ## [Version 0.39.0] diff --git a/crates/client/assets/schema.sdl b/crates/client/assets/schema.sdl index 67287341c38..b9048362caa 100644 --- a/crates/client/assets/schema.sdl +++ b/crates/client/assets/schema.sdl @@ -938,7 +938,7 @@ type Query { """ owner: Address!, """ - The list of requested assets` coins with asset ids, `target` amount the user wants to reach, and the `max` number of coins in the selection. Several entries with the same asset id are not allowed. + The list of requested assets` coins with asset ids, `target` amount the user wants to reach, and the `max` number of coins in the selection. Several entries with the same asset id are not allowed. The result can't contain more coins than `max_inputs`. """ queryPerAsset: [SpendQueryElementInput!]!, """ diff --git a/crates/fuel-core/src/coins_query.rs b/crates/fuel-core/src/coins_query.rs index 07ca65bd5d6..e07630333f8 100644 --- a/crates/fuel-core/src/coins_query.rs +++ b/crates/fuel-core/src/coins_query.rs @@ -19,7 +19,7 @@ use fuel_core_types::{ Word, }, }; -use itertools::Itertools; +use futures::TryStreamExt; use rand::prelude::*; use std::{ cmp::Reverse, @@ -119,8 +119,13 @@ impl SpendQuery { /// Returns the biggest inputs of the `owner` to satisfy the required `target` of the asset. The /// number of inputs for each asset can't exceed `max_inputs`, otherwise throw an error that query /// can't be satisfied. -pub fn largest_first(query: &AssetQuery) -> Result, CoinsQueryError> { - let mut inputs: Vec<_> = query.coins().try_collect()?; +pub async fn largest_first( + query: AssetQuery<'_>, +) -> Result, CoinsQueryError> { + let target = query.asset.target; + let max = query.asset.max; + let asset_id = query.asset.id; + let mut inputs: Vec = query.coins().try_collect().await?; inputs.sort_by_key(|coin| Reverse(coin.amount())); let mut collected_amount = 0u64; @@ -128,12 +133,12 @@ pub fn largest_first(query: &AssetQuery) -> Result, CoinsQueryErro for coin in inputs { // Break if we don't need any more coins - if collected_amount >= query.asset.target { + if collected_amount >= target { break } // Error if we can't fit more coins - if coins.len() >= query.asset.max { + if coins.len() >= max as usize { return Err(CoinsQueryError::MaxCoinsReached) } @@ -142,9 +147,9 @@ pub fn largest_first(query: &AssetQuery) -> Result, CoinsQueryErro coins.push(coin); } - if collected_amount < query.asset.target { + if collected_amount < target { return Err(CoinsQueryError::InsufficientCoins { - asset_id: query.asset.id, + asset_id, collected_amount, }) } @@ -153,23 +158,31 @@ pub fn largest_first(query: &AssetQuery) -> Result, CoinsQueryErro } // An implementation of the method described on: https://iohk.io/en/blog/posts/2018/07/03/self-organisation-in-coin-selection/ -pub fn random_improve( +// TODO: Reimplement this algorithm to be simpler and faster: +// Instead of selecting random coins first, we can sort them. +// After that, we can split the coins into the part that covers the +// target and the part that does not(by choosing the most expensive coins). +// When the target is satisfied, we can select random coins from the remaining +// coins not used in the target. +pub async fn random_improve( db: &ReadView, spend_query: &SpendQuery, ) -> Result>, CoinsQueryError> { let mut coins_per_asset = vec![]; for query in spend_query.asset_queries(db) { - let mut inputs: Vec<_> = query.coins().try_collect()?; + let target = query.asset.target; + let max = query.asset.max; + + let mut inputs: Vec<_> = query.clone().coins().try_collect().await?; inputs.shuffle(&mut thread_rng()); - inputs.truncate(query.asset.max); + inputs.truncate(max as usize); let mut collected_amount = 0; let mut coins = vec![]; // Set parameters according to spec - let target = query.asset.target; - let upper_target = query.asset.target.saturating_mul(2); + let upper_target = target.saturating_mul(2); for coin in inputs { // Try to improve the result by adding dust to the result. @@ -197,8 +210,8 @@ pub fn random_improve( } // Fallback to largest_first if we can't fit more coins - if collected_amount < query.asset.target { - swap(&mut coins, &mut largest_first(&query)?); + if collected_amount < target { + swap(&mut coins, &mut largest_first(query).await?); } coins_per_asset.push(coins); @@ -266,6 +279,10 @@ mod tests { fuel_asm::Word, fuel_tx::*, }; + use futures::{ + StreamExt, + TryStreamExt, + }; use itertools::Itertools; use rand::{ rngs::StdRng, @@ -324,34 +341,36 @@ mod tests { mod largest_first { use super::*; - fn query( + async fn query( spend_query: &[AssetSpendTarget], owner: &Address, base_asset_id: &AssetId, db: &ServiceDatabase, ) -> Result>, CoinsQueryError> { - let result: Vec<_> = spend_query - .iter() - .map(|asset| { - largest_first(&AssetQuery::new( - owner, - asset, - base_asset_id, - None, - &db.test_view(), - )) - .map(|coins| { - coins - .iter() - .map(|coin| (*coin.asset_id(base_asset_id), coin.amount())) - .collect() - }) - }) - .try_collect()?; - Ok(result) + let mut results = vec![]; + + for asset in spend_query { + let coins = largest_first(AssetQuery::new( + owner, + asset, + base_asset_id, + None, + &db.test_view(), + )) + .await + .map(|coins| { + coins + .iter() + .map(|coin| (*coin.asset_id(base_asset_id), coin.amount())) + .collect() + })?; + results.push(coins); + } + + Ok(results) } - fn single_asset_assert( + async fn single_asset_assert( owner: Address, asset_ids: &[AssetId], base_asset_id: &AssetId, @@ -362,11 +381,12 @@ mod tests { // Query some targets, including higher than the owner's balance for target in 0..20 { let coins = query( - &[AssetSpendTarget::new(asset_id, target, usize::MAX)], + &[AssetSpendTarget::new(asset_id, target, u16::MAX)], &owner, base_asset_id, &db.service_database(), - ); + ) + .await; // Transform result for convenience let coins = coins.map(|coins| { @@ -425,32 +445,33 @@ mod tests { &owner, base_asset_id, &db.service_database(), - ); + ) + .await; assert_matches!(coins, Err(CoinsQueryError::MaxCoinsReached)); } - #[test] - fn single_asset_coins() { + #[tokio::test] + async fn single_asset_coins() { // Setup for coins let (owner, asset_ids, base_asset_id, db) = setup_coins(); - single_asset_assert(owner, &asset_ids, &base_asset_id, db); + single_asset_assert(owner, &asset_ids, &base_asset_id, db).await; } - #[test] - fn single_asset_messages() { + #[tokio::test] + async fn single_asset_messages() { // Setup for messages let (owner, base_asset_id, db) = setup_messages(); - single_asset_assert(owner, &[base_asset_id], &base_asset_id, db); + single_asset_assert(owner, &[base_asset_id], &base_asset_id, db).await; } - #[test] - fn single_asset_coins_and_messages() { + #[tokio::test] + async fn single_asset_coins_and_messages() { // Setup for coins and messages let (owner, asset_ids, base_asset_id, db) = setup_coins_and_messages(); - single_asset_assert(owner, &asset_ids, &base_asset_id, db); + single_asset_assert(owner, &asset_ids, &base_asset_id, db).await; } - fn multiple_assets_helper( + async fn multiple_assets_helper( owner: Address, asset_ids: &[AssetId], base_asset_id: &AssetId, @@ -458,13 +479,14 @@ mod tests { ) { let coins = query( &[ - AssetSpendTarget::new(asset_ids[0], 3, usize::MAX), - AssetSpendTarget::new(asset_ids[1], 6, usize::MAX), + AssetSpendTarget::new(asset_ids[0], 3, u16::MAX), + AssetSpendTarget::new(asset_ids[1], 6, u16::MAX), ], &owner, base_asset_id, &db.service_database(), - ); + ) + .await; let expected = vec![ vec![(asset_ids[0], 5)], vec![(asset_ids[1], 5), (asset_ids[1], 4)], @@ -472,25 +494,25 @@ mod tests { assert_matches!(coins, Ok(coins) if coins == expected); } - #[test] - fn multiple_assets_coins() { + #[tokio::test] + async fn multiple_assets_coins() { // Setup coins let (owner, asset_ids, base_asset_id, db) = setup_coins(); - multiple_assets_helper(owner, &asset_ids, &base_asset_id, db); + multiple_assets_helper(owner, &asset_ids, &base_asset_id, db).await; } - #[test] - fn multiple_assets_coins_and_messages() { + #[tokio::test] + async fn multiple_assets_coins_and_messages() { // Setup coins and messages let (owner, asset_ids, base_asset_id, db) = setup_coins_and_messages(); - multiple_assets_helper(owner, &asset_ids, &base_asset_id, db); + multiple_assets_helper(owner, &asset_ids, &base_asset_id, db).await; } } mod random_improve { use super::*; - fn query( + async fn query( query_per_asset: Vec, owner: Address, asset_ids: &[AssetId], @@ -500,7 +522,8 @@ mod tests { let coins = random_improve( &db.test_view(), &SpendQuery::new(owner, &query_per_asset, None, base_asset_id)?, - ); + ) + .await; // Transform result for convenience coins.map(|coins| { @@ -521,7 +544,7 @@ mod tests { }) } - fn single_asset_assert( + async fn single_asset_assert( owner: Address, asset_ids: &[AssetId], base_asset_id: AssetId, @@ -532,12 +555,13 @@ mod tests { // Query some amounts, including higher than the owner's balance for amount in 0..20 { let coins = query( - vec![AssetSpendTarget::new(asset_id, amount, usize::MAX)], + vec![AssetSpendTarget::new(asset_id, amount, u16::MAX)], owner, asset_ids, base_asset_id, &db.service_database(), - ); + ) + .await; // Transform result for convenience let coins = coins.map(|coins| { @@ -589,32 +613,33 @@ mod tests { asset_ids, base_asset_id, &db.service_database(), - ); + ) + .await; assert_matches!(coins, Err(CoinsQueryError::MaxCoinsReached)); } - #[test] - fn single_asset_coins() { + #[tokio::test] + async fn single_asset_coins() { // Setup for coins let (owner, asset_ids, base_asset_id, db) = setup_coins(); - single_asset_assert(owner, &asset_ids, base_asset_id, db); + single_asset_assert(owner, &asset_ids, base_asset_id, db).await; } - #[test] - fn single_asset_messages() { + #[tokio::test] + async fn single_asset_messages() { // Setup for messages let (owner, base_asset_id, db) = setup_messages(); - single_asset_assert(owner, &[base_asset_id], base_asset_id, db); + single_asset_assert(owner, &[base_asset_id], base_asset_id, db).await; } - #[test] - fn single_asset_coins_and_messages() { + #[tokio::test] + async fn single_asset_coins_and_messages() { // Setup for coins and messages let (owner, asset_ids, base_asset_id, db) = setup_coins_and_messages(); - single_asset_assert(owner, &asset_ids, base_asset_id, db); + single_asset_assert(owner, &asset_ids, base_asset_id, db).await; } - fn multiple_assets_assert( + async fn multiple_assets_assert( owner: Address, asset_ids: &[AssetId], base_asset_id: AssetId, @@ -638,7 +663,8 @@ mod tests { asset_ids, base_asset_id, &db.service_database(), - ); + ) + .await; assert_matches!(coins, Ok(ref coins) if coins.len() <= 6); let coins = coins.unwrap(); assert!( @@ -659,18 +685,18 @@ mod tests { ); } - #[test] - fn multiple_assets_coins() { + #[tokio::test] + async fn multiple_assets_coins() { // Setup coins let (owner, asset_ids, base_asset_id, db) = setup_coins(); - multiple_assets_assert(owner, &asset_ids, base_asset_id, db); + multiple_assets_assert(owner, &asset_ids, base_asset_id, db).await; } - #[test] - fn multiple_assets_coins_and_messages() { + #[tokio::test] + async fn multiple_assets_coins_and_messages() { // Setup coins and messages let (owner, asset_ids, base_asset_id, db) = setup_coins_and_messages(); - multiple_assets_assert(owner, &asset_ids, base_asset_id, db); + multiple_assets_assert(owner, &asset_ids, base_asset_id, db).await; } } @@ -678,7 +704,41 @@ mod tests { use super::*; use fuel_core_types::entities::coins::CoinId; - fn exclusion_assert( + async fn query( + db: &ServiceDatabase, + owner: Address, + base_asset_id: AssetId, + asset_ids: &[AssetId], + query_per_asset: Vec, + excluded_ids: Vec, + ) -> Result, CoinsQueryError> { + let spend_query = SpendQuery::new( + owner, + &query_per_asset, + Some(excluded_ids), + base_asset_id, + )?; + let coins = random_improve(&db.test_view(), &spend_query).await; + + // Transform result for convenience + coins.map(|coins| { + coins + .into_iter() + .flat_map(|coin| { + coin.into_iter() + .map(|coin| (*coin.asset_id(&base_asset_id), coin.amount())) + .sorted_by_key(|(asset_id, amount)| { + ( + asset_ids.iter().position(|c| c == asset_id).unwrap(), + Reverse(*amount), + ) + }) + }) + .collect() + }) + } + + async fn exclusion_assert( owner: Address, asset_ids: &[AssetId], base_asset_id: AssetId, @@ -687,47 +747,17 @@ mod tests { ) { let asset_id = asset_ids[0]; - let query = |query_per_asset: Vec, - excluded_ids: Vec| - -> Result, CoinsQueryError> { - let spend_query = SpendQuery::new( - owner, - &query_per_asset, - Some(excluded_ids), - base_asset_id, - )?; - let coins = - random_improve(&db.service_database().test_view(), &spend_query); - - // Transform result for convenience - coins.map(|coins| { - coins - .into_iter() - .flat_map(|coin| { - coin.into_iter() - .map(|coin| { - (*coin.asset_id(&base_asset_id), coin.amount()) - }) - .sorted_by_key(|(asset_id, amount)| { - ( - asset_ids - .iter() - .position(|c| c == asset_id) - .unwrap(), - Reverse(*amount), - ) - }) - }) - .collect() - }) - }; - // Query some amounts, including higher than the owner's balance for amount in 0..20 { let coins = query( - vec![AssetSpendTarget::new(asset_id, amount, usize::MAX)], + &db.service_database(), + owner, + base_asset_id, + asset_ids, + vec![AssetSpendTarget::new(asset_id, amount, u16::MAX)], excluded_ids.clone(), - ); + ) + .await; // Transform result for convenience let coins = coins.map(|coins| { @@ -769,52 +799,56 @@ mod tests { } } - #[test] - fn exclusion_coins() { + #[tokio::test] + async fn exclusion_coins() { // Setup coins let (owner, asset_ids, base_asset_id, db) = setup_coins(); // Exclude largest coin IDs let excluded_ids = db .owned_coins(&owner) + .await .into_iter() .filter(|coin| coin.amount == 5) .map(|coin| CoinId::Utxo(coin.utxo_id)) .collect_vec(); - exclusion_assert(owner, &asset_ids, base_asset_id, db, excluded_ids); + exclusion_assert(owner, &asset_ids, base_asset_id, db, excluded_ids).await; } - #[test] - fn exclusion_messages() { + #[tokio::test] + async fn exclusion_messages() { // Setup messages let (owner, base_asset_id, db) = setup_messages(); // Exclude largest messages IDs let excluded_ids = db .owned_messages(&owner) + .await .into_iter() .filter(|message| message.amount() == 5) .map(|message| CoinId::Message(*message.id())) .collect_vec(); - exclusion_assert(owner, &[base_asset_id], base_asset_id, db, excluded_ids); + exclusion_assert(owner, &[base_asset_id], base_asset_id, db, excluded_ids) + .await; } - #[test] - fn exclusion_coins_and_messages() { + #[tokio::test] + async fn exclusion_coins_and_messages() { // Setup coins and messages let (owner, asset_ids, base_asset_id, db) = setup_coins_and_messages(); // Exclude largest messages IDs, because coins only 1 and 2 let excluded_ids = db .owned_messages(&owner) + .await .into_iter() .filter(|message| message.amount() == 5) .map(|message| CoinId::Message(*message.id())) .collect_vec(); - exclusion_assert(owner, &asset_ids, base_asset_id, db, excluded_ids); + exclusion_assert(owner, &asset_ids, base_asset_id, db, excluded_ids).await; } } @@ -822,7 +856,7 @@ mod tests { struct TestCase { db_amount: Vec, target_amount: u64, - max_coins: usize, + max_coins: u16, } pub enum CoinType { @@ -830,7 +864,7 @@ mod tests { Message, } - fn test_case_run( + async fn test_case_run( case: TestCase, coin_type: CoinType, base_asset_id: AssetId, @@ -866,23 +900,26 @@ mod tests { None, base_asset_id, )?, - )?; + ) + .await?; assert_eq!(coins.len(), 1); Ok(coins[0].len()) } - #[test] - fn insufficient_coins_returns_error() { + #[tokio::test] + async fn insufficient_coins_returns_error() { let test_case = TestCase { db_amount: vec![0], target_amount: u64::MAX, - max_coins: usize::MAX, + max_coins: u16::MAX, }; let mut rng = StdRng::seed_from_u64(0xF00DF00D); let base_asset_id = rng.gen(); - let coin_result = test_case_run(test_case.clone(), CoinType::Coin, base_asset_id); - let message_result = test_case_run(test_case, CoinType::Message, base_asset_id); + let coin_result = + test_case_run(test_case.clone(), CoinType::Coin, base_asset_id).await; + let message_result = + test_case_run(test_case, CoinType::Message, base_asset_id).await; assert_eq!(coin_result, message_result); assert_matches!( coin_result, @@ -897,7 +934,7 @@ mod tests { TestCase { db_amount: vec![u64::MAX, u64::MAX], target_amount: u64::MAX, - max_coins: usize::MAX, + max_coins: u16::MAX, } => Ok(1) ; "Enough coins in the DB to reach target(u64::MAX) by 1 coin" @@ -920,11 +957,13 @@ mod tests { => Err(CoinsQueryError::MaxCoinsReached) ; "Enough coins in the DB to reach target(u64::MAX) but limit is zero" )] - fn corner_cases(case: TestCase) -> Result { + #[tokio::test] + async fn corner_cases(case: TestCase) -> Result { let mut rng = StdRng::seed_from_u64(0xF00DF00D); let base_asset_id = rng.gen(); - let coin_result = test_case_run(case.clone(), CoinType::Coin, base_asset_id); - let message_result = test_case_run(case, CoinType::Message, base_asset_id); + let coin_result = + test_case_run(case.clone(), CoinType::Coin, base_asset_id).await; + let message_result = test_case_run(case, CoinType::Message, base_asset_id).await; assert_eq!(coin_result, message_result); coin_result } @@ -1001,23 +1040,25 @@ mod tests { message } - pub fn owned_coins(&self, owner: &Address) -> Vec { + pub async fn owned_coins(&self, owner: &Address) -> Vec { let query = self.service_database(); let query = query.test_view(); query .owned_coins_ids(owner, None, IterDirection::Forward) .map(|res| res.map(|id| query.coin(id).unwrap())) .try_collect() + .await .unwrap() } - pub fn owned_messages(&self, owner: &Address) -> Vec { + pub async fn owned_messages(&self, owner: &Address) -> Vec { let query = self.service_database(); let query = query.test_view(); query .owned_message_ids(owner, None, IterDirection::Forward) .map(|res| res.map(|id| query.message(&id).unwrap())) .try_collect() + .await .unwrap() } } diff --git a/crates/fuel-core/src/graphql_api/database.rs b/crates/fuel-core/src/graphql_api/database.rs index dfaabd42733..3f254032b0c 100644 --- a/crates/fuel-core/src/graphql_api/database.rs +++ b/crates/fuel-core/src/graphql_api/database.rs @@ -61,6 +61,7 @@ use fuel_core_types::{ txpool::TransactionStatus, }, }; +use futures::Stream; use std::{ borrow::Cow, sync::Arc, @@ -249,8 +250,8 @@ impl ReadView { &self, start_message_id: Option, direction: IterDirection, - ) -> BoxedIter<'_, StorageResult> { - self.on_chain.all_messages(start_message_id, direction) + ) -> impl Stream> + '_ { + futures::stream::iter(self.on_chain.all_messages(start_message_id, direction)) } pub fn message_exists(&self, nonce: &Nonce) -> StorageResult { @@ -269,9 +270,12 @@ impl ReadView { contract: ContractId, start_asset: Option, direction: IterDirection, - ) -> BoxedIter> { - self.on_chain - .contract_balances(contract, start_asset, direction) + ) -> impl Stream> + '_ { + futures::stream::iter(self.on_chain.contract_balances( + contract, + start_asset, + direction, + )) } pub fn da_height(&self) -> StorageResult { @@ -306,18 +310,23 @@ impl ReadView { owner: &Address, start_coin: Option, direction: IterDirection, - ) -> BoxedIter<'_, StorageResult> { - self.off_chain.owned_coins_ids(owner, start_coin, direction) + ) -> impl Stream> + '_ { + let iter = self.off_chain.owned_coins_ids(owner, start_coin, direction); + + futures::stream::iter(iter) } - pub fn owned_message_ids( - &self, - owner: &Address, + pub fn owned_message_ids<'a>( + &'a self, + owner: &'a Address, start_message_id: Option, direction: IterDirection, - ) -> BoxedIter<'_, StorageResult> { - self.off_chain - .owned_message_ids(owner, start_message_id, direction) + ) -> impl Stream> + 'a { + futures::stream::iter(self.off_chain.owned_message_ids( + owner, + start_message_id, + direction, + )) } pub fn owned_transactions_ids( @@ -325,9 +334,11 @@ impl ReadView { owner: Address, start: Option, direction: IterDirection, - ) -> BoxedIter> { - self.off_chain - .owned_transactions_ids(owner, start, direction) + ) -> impl Stream> + '_ { + futures::stream::iter( + self.off_chain + .owned_transactions_ids(owner, start, direction), + ) } pub fn contract_salt(&self, contract_id: &ContractId) -> StorageResult { diff --git a/crates/fuel-core/src/query/balance.rs b/crates/fuel-core/src/query/balance.rs index 34e7d5c8d95..1a715b74522 100644 --- a/crates/fuel-core/src/query/balance.rs +++ b/crates/fuel-core/src/query/balance.rs @@ -5,11 +5,7 @@ use asset_query::{ AssetsQuery, }; use fuel_core_storage::{ - iter::{ - BoxedIter, - IntoBoxedIter, - IterDirection, - }, + iter::IterDirection, Result as StorageResult, }; use fuel_core_types::{ @@ -19,7 +15,12 @@ use fuel_core_types::{ }, services::graphql_api::AddressBalance, }; -use itertools::Itertools; +use futures::{ + FutureExt, + Stream, + StreamExt, + TryStreamExt, +}; use std::{ cmp::Ordering, collections::HashMap, @@ -28,7 +29,7 @@ use std::{ pub mod asset_query; impl ReadView { - pub fn balance( + pub async fn balance( &self, owner: Address, asset_id: AssetId, @@ -36,21 +37,20 @@ impl ReadView { ) -> StorageResult { let amount = AssetQuery::new( &owner, - &AssetSpendTarget::new(asset_id, u64::MAX, usize::MAX), + &AssetSpendTarget::new(asset_id, u64::MAX, u16::MAX), &base_asset_id, None, self, ) .coins() .map(|res| res.map(|coins| coins.amount())) - .try_fold(0u64, |mut balance, res| -> StorageResult<_> { - let amount = res?; - - // Increase the balance - balance = balance.saturating_add(amount); - - Ok(balance) - })?; + .try_fold(0u64, |balance, amount| { + async move { + // Increase the balance + Ok(balance.saturating_add(amount)) + } + }) + .await?; Ok(AddressBalance { owner, @@ -59,54 +59,52 @@ impl ReadView { }) } - pub fn balances( - &self, - owner: Address, + pub fn balances<'a>( + &'a self, + owner: &'a Address, direction: IterDirection, - base_asset_id: AssetId, - ) -> BoxedIter> { - let mut amounts_per_asset = HashMap::new(); - let mut errors = vec![]; + base_asset_id: &'a AssetId, + ) -> impl Stream> + 'a { + let query = AssetsQuery::new(owner, None, None, self, base_asset_id); + let stream = query.coins(); - for coin in AssetsQuery::new(&owner, None, None, self, &base_asset_id).coins() { - match coin { - Ok(coin) => { + stream + .try_fold( + HashMap::new(), + move |mut amounts_per_asset, coin| async move { let amount: &mut u64 = amounts_per_asset - .entry(*coin.asset_id(&base_asset_id)) + .entry(*coin.asset_id(base_asset_id)) .or_default(); *amount = amount.saturating_add(coin.amount()); - } - Err(err) => { - errors.push(err); - } - } - } - - let mut balances = amounts_per_asset - .into_iter() - .map(|(asset_id, amount)| AddressBalance { - owner, - amount, - asset_id, - }) - .collect_vec(); + Ok(amounts_per_asset) + }, + ) + .into_stream() + .try_filter_map(move |amounts_per_asset| async move { + let mut balances = amounts_per_asset + .into_iter() + .map(|(asset_id, amount)| AddressBalance { + owner: *owner, + amount, + asset_id, + }) + .collect::>(); - balances.sort_by(|l, r| { - if l.asset_id < r.asset_id { - Ordering::Less - } else { - Ordering::Greater - } - }); + balances.sort_by(|l, r| { + if l.asset_id < r.asset_id { + Ordering::Less + } else { + Ordering::Greater + } + }); - if direction == IterDirection::Reverse { - balances.reverse(); - } + if direction == IterDirection::Reverse { + balances.reverse(); + } - balances - .into_iter() - .map(Ok) - .chain(errors.into_iter().map(Err)) - .into_boxed() + Ok(Some(futures::stream::iter(balances))) + }) + .map_ok(|stream| stream.map(Ok)) + .try_flatten() } } diff --git a/crates/fuel-core/src/query/balance/asset_query.rs b/crates/fuel-core/src/query/balance/asset_query.rs index e9ecf206aed..bbed21c5568 100644 --- a/crates/fuel-core/src/query/balance/asset_query.rs +++ b/crates/fuel-core/src/query/balance/asset_query.rs @@ -1,4 +1,5 @@ use crate::graphql_api::database::ReadView; +use fuel_core_services::stream::IntoBoxStream; use fuel_core_storage::{ iter::IterDirection, Error as StorageError, @@ -14,19 +15,20 @@ use fuel_core_types::{ AssetId, }, }; -use itertools::Itertools; +use futures::Stream; use std::collections::HashSet; +use tokio_stream::StreamExt; /// At least required `target` of the query per asset's `id` with `max` coins. #[derive(Clone)] pub struct AssetSpendTarget { pub id: AssetId, pub target: u64, - pub max: usize, + pub max: u16, } impl AssetSpendTarget { - pub fn new(id: AssetId, target: u64, max: usize) -> Self { + pub fn new(id: AssetId, target: u64, max: u16) -> Self { Self { id, target, max } } } @@ -48,6 +50,7 @@ impl Exclude { } } +#[derive(Clone)] pub struct AssetsQuery<'a> { pub owner: &'a Address, pub assets: Option>, @@ -73,13 +76,18 @@ impl<'a> AssetsQuery<'a> { } } - fn coins_iter(&self) -> impl Iterator> + '_ { + fn coins_iter(mut self) -> impl Stream> + 'a { + let assets = self.assets.take(); self.database .owned_coins_ids(self.owner, None, IterDirection::Forward) .map(|id| id.map(CoinId::from)) - .filter_ok(|id| { - if let Some(exclude) = self.exclude { - !exclude.coin_ids.contains(id) + .filter(move |result| { + if let Ok(id) = result { + if let Some(exclude) = self.exclude { + !exclude.coin_ids.contains(id) + } else { + true + } } else { true } @@ -91,27 +99,34 @@ impl<'a> AssetsQuery<'a> { } else { return Err(anyhow::anyhow!("The coin is not UTXO").into()); }; + // TODO: Fetch coin in a separate thread let coin = self.database.coin(id)?; Ok(CoinType::Coin(coin)) }) }) - .filter_ok(|coin| { - if let CoinType::Coin(coin) = coin { - self.has_asset(&coin.asset_id) + .filter(move |result| { + if let Ok(CoinType::Coin(coin)) = result { + has_asset(&assets, &coin.asset_id) } else { true } }) } - fn messages_iter(&self) -> impl Iterator> + '_ { + fn messages_iter(&self) -> impl Stream> + 'a { + let exclude = self.exclude; + let database = self.database; self.database .owned_message_ids(self.owner, None, IterDirection::Forward) .map(|id| id.map(CoinId::from)) - .filter_ok(|id| { - if let Some(exclude) = self.exclude { - !exclude.coin_ids.contains(id) + .filter(move |result| { + if let Ok(id) = result { + if let Some(e) = exclude { + !e.coin_ids.contains(id) + } else { + true + } } else { true } @@ -123,11 +138,18 @@ impl<'a> AssetsQuery<'a> { } else { return Err(anyhow::anyhow!("The coin is not a message").into()); }; - let message = self.database.message(&id)?; + // TODO: Fetch message in a separate thread + let message = database.message(&id)?; Ok(message) }) }) - .filter_ok(|message| message.data().is_empty()) + .filter(|result| { + if let Ok(message) = result { + message.data().is_empty() + } else { + true + } + }) .map(|result| { result.map(|message| { CoinType::MessageCoin( @@ -139,28 +161,23 @@ impl<'a> AssetsQuery<'a> { }) } - fn has_asset(&self, asset_id: &AssetId) -> bool { - self.assets - .as_ref() - .map(|assets| assets.contains(asset_id)) - .unwrap_or(true) - } - /// Returns the iterator over all valid(spendable, allowed by `exclude`) coins of the `owner`. /// /// # Note: The coins of different type are not grouped by the `asset_id`. // TODO: Optimize this by creating an index // https://github.com/FuelLabs/fuel-core/issues/588 - pub fn coins(&self) -> impl Iterator> + '_ { - let has_base_asset = self.has_asset(self.base_asset_id); - let messages_iter = has_base_asset - .then(|| self.messages_iter()) - .into_iter() - .flatten(); - self.coins_iter().chain(messages_iter) + pub fn coins(self) -> impl Stream> + 'a { + let has_base_asset = has_asset(&self.assets, self.base_asset_id); + if has_base_asset { + let message_iter = self.messages_iter(); + self.coins_iter().chain(message_iter).into_boxed_ref() + } else { + self.coins_iter().into_boxed_ref() + } } } +#[derive(Clone)] pub struct AssetQuery<'a> { pub owner: &'a Address, pub asset: &'a AssetSpendTarget, @@ -196,7 +213,14 @@ impl<'a> AssetQuery<'a> { /// Returns the iterator over all valid(spendable, allowed by `exclude`) coins of the `owner` /// for the `asset_id`. - pub fn coins(&self) -> impl Iterator> + '_ { + pub fn coins(self) -> impl Stream> + 'a { self.query.coins() } } + +fn has_asset(assets: &Option>, asset_id: &AssetId) -> bool { + assets + .as_ref() + .map(|assets| assets.contains(asset_id)) + .unwrap_or(true) +} diff --git a/crates/fuel-core/src/query/block.rs b/crates/fuel-core/src/query/block.rs index 6ef7cf38afa..3b725ab8b49 100644 --- a/crates/fuel-core/src/query/block.rs +++ b/crates/fuel-core/src/query/block.rs @@ -1,15 +1,13 @@ use crate::fuel_core_graphql_api::database::ReadView; use fuel_core_storage::{ - iter::{ - BoxedIter, - IterDirection, - }, + iter::IterDirection, Result as StorageResult, }; use fuel_core_types::{ blockchain::block::CompressedBlock, fuel_types::BlockHeight, }; +use futures::Stream; impl ReadView { pub fn latest_block_height(&self) -> StorageResult { @@ -24,7 +22,7 @@ impl ReadView { &self, height: Option, direction: IterDirection, - ) -> BoxedIter> { - self.blocks(height, direction) + ) -> impl Stream> + '_ { + futures::stream::iter(self.blocks(height, direction)) } } diff --git a/crates/fuel-core/src/query/coin.rs b/crates/fuel-core/src/query/coin.rs index fc944a623f5..c6d52001ddd 100644 --- a/crates/fuel-core/src/query/coin.rs +++ b/crates/fuel-core/src/query/coin.rs @@ -1,10 +1,6 @@ use crate::fuel_core_graphql_api::database::ReadView; use fuel_core_storage::{ - iter::{ - BoxedIter, - IntoBoxedIter, - IterDirection, - }, + iter::IterDirection, not_found, tables::Coins, Result as StorageResult, @@ -15,6 +11,10 @@ use fuel_core_types::{ fuel_tx::UtxoId, fuel_types::Address, }; +use futures::{ + Stream, + StreamExt, +}; impl ReadView { pub fn coin(&self, utxo_id: UtxoId) -> StorageResult { @@ -34,9 +34,13 @@ impl ReadView { owner: &Address, start_coin: Option, direction: IterDirection, - ) -> BoxedIter> { + ) -> impl Stream> + '_ { self.owned_coins_ids(owner, start_coin, direction) - .map(|res| res.and_then(|id| self.coin(id))) - .into_boxed() + .map(|res| { + res.and_then(|id| { + // TODO: Move fetching of the coin to a separate thread + self.coin(id) + }) + }) } } diff --git a/crates/fuel-core/src/query/message.rs b/crates/fuel-core/src/query/message.rs index aedadd47949..89cd21b8ae7 100644 --- a/crates/fuel-core/src/query/message.rs +++ b/crates/fuel-core/src/query/message.rs @@ -5,7 +5,6 @@ use crate::fuel_core_graphql_api::{ use fuel_core_storage::{ iter::{ BoxedIter, - IntoBoxedIter, IterDirection, }, not_found, @@ -38,6 +37,10 @@ use fuel_core_types::{ }, services::txpool::TransactionStatus, }; +use futures::{ + Stream, + StreamExt, +}; use itertools::Itertools; use std::borrow::Cow; @@ -78,15 +81,19 @@ impl ReadView { .map(Cow::into_owned) } - pub fn owned_messages( - &self, - owner: &Address, + pub fn owned_messages<'a>( + &'a self, + owner: &'a Address, start_message_id: Option, direction: IterDirection, - ) -> BoxedIter> { + ) -> impl Stream> + 'a { self.owned_message_ids(owner, start_message_id, direction) - .map(|result| result.and_then(|id| self.message(&id))) - .into_boxed() + .map(|result| { + result.and_then(|id| { + // TODO: Move `message` fetching to a separate thread + self.message(&id) + }) + }) } } diff --git a/crates/fuel-core/src/query/tx.rs b/crates/fuel-core/src/query/tx.rs index 894d05d9c6f..8989efc9bd2 100644 --- a/crates/fuel-core/src/query/tx.rs +++ b/crates/fuel-core/src/query/tx.rs @@ -1,11 +1,6 @@ use crate::fuel_core_graphql_api::database::ReadView; - use fuel_core_storage::{ - iter::{ - BoxedIter, - IntoBoxedIter, - IterDirection, - }, + iter::IterDirection, not_found, tables::Transactions, Result as StorageResult, @@ -20,6 +15,10 @@ use fuel_core_types::{ fuel_types::Address, services::txpool::TransactionStatus, }; +use futures::{ + Stream, + StreamExt, +}; impl ReadView { pub fn receipts(&self, tx_id: &TxId) -> StorageResult> { @@ -42,15 +41,15 @@ impl ReadView { owner: Address, start: Option, direction: IterDirection, - ) -> BoxedIter> { + ) -> impl Stream> + '_ { self.owned_transactions_ids(owner, start, direction) .map(|result| { result.and_then(|(tx_pointer, tx_id)| { + // TODO: Fetch transactions in a separate thread let tx = self.transaction(&tx_id)?; Ok((tx_pointer, tx)) }) }) - .into_boxed() } } diff --git a/crates/fuel-core/src/schema.rs b/crates/fuel-core/src/schema.rs index bd9e550d448..bcbc5b5c970 100644 --- a/crates/fuel-core/src/schema.rs +++ b/crates/fuel-core/src/schema.rs @@ -23,8 +23,12 @@ use fuel_core_storage::{ iter::IterDirection, Result as StorageResult, }; -use itertools::Itertools; +use futures::{ + Stream, + TryStreamExt, +}; use std::borrow::Cow; +use tokio_stream::StreamExt; pub mod balance; pub mod blob; @@ -99,7 +103,7 @@ where // It means also returning `has_previous_page` and `has_next_page` values. // entries(start_key: Option) F: FnOnce(&Option, IterDirection) -> StorageResult, - Entries: Iterator>, + Entries: Stream>, SchemaKey: Eq, { match (after.as_ref(), before.as_ref(), first, last) { @@ -192,7 +196,7 @@ where } }); - let entries: Vec<_> = entries.try_collect()?; + let entries: Vec<_> = entries.try_collect().await?; let entries = entries.into_iter(); let mut connection = Connection::new(has_previous_page, has_next_page); diff --git a/crates/fuel-core/src/schema/balance.rs b/crates/fuel-core/src/schema/balance.rs index 7c43e3a9509..4e68ddbe894 100644 --- a/crates/fuel-core/src/schema/balance.rs +++ b/crates/fuel-core/src/schema/balance.rs @@ -23,6 +23,7 @@ use async_graphql::{ Object, }; use fuel_core_types::services::graphql_api; +use futures::StreamExt; pub struct Balance(graphql_api::AddressBalance); @@ -64,7 +65,10 @@ impl BalanceQuery { .data_unchecked::() .latest_consensus_params() .base_asset_id(); - let balance = query.balance(owner.0, asset_id.0, base_asset_id)?.into(); + let balance = query + .balance(owner.0, asset_id.0, base_asset_id) + .await? + .into(); Ok(balance) } @@ -85,14 +89,14 @@ impl BalanceQuery { return Err(anyhow!("pagination is not yet supported").into()) } let query = ctx.read_view()?; + let base_asset_id = *ctx + .data_unchecked::() + .latest_consensus_params() + .base_asset_id(); + let owner = filter.owner.into(); crate::schema::query_pagination(after, before, first, last, |_, direction| { - let owner = filter.owner.into(); - let base_asset_id = *ctx - .data_unchecked::() - .latest_consensus_params() - .base_asset_id(); Ok(query - .balances(owner, direction, base_asset_id) + .balances(&owner, direction, &base_asset_id) .map(|result| { result.map(|balance| (balance.asset_id.into(), balance.into())) })) diff --git a/crates/fuel-core/src/schema/block.rs b/crates/fuel-core/src/schema/block.rs index cebd04989b6..d7bc8366282 100644 --- a/crates/fuel-core/src/schema/block.rs +++ b/crates/fuel-core/src/schema/block.rs @@ -36,11 +36,7 @@ use async_graphql::{ Union, }; use fuel_core_storage::{ - iter::{ - BoxedIter, - IntoBoxedIter, - IterDirection, - }, + iter::IterDirection, Result as StorageResult, }; use fuel_core_types::{ @@ -51,6 +47,10 @@ use fuel_core_types::{ fuel_types, fuel_types::BlockHeight, }; +use futures::{ + Stream, + StreamExt, +}; pub struct Block(pub(crate) CompressedBlock); @@ -339,16 +339,14 @@ fn blocks_query( query: &ReadView, height: Option, direction: IterDirection, -) -> BoxedIter> +) -> impl Stream> + '_ where T: async_graphql::OutputType, T: From, { - let blocks = query.compressed_blocks(height, direction).map(|result| { + query.compressed_blocks(height, direction).map(|result| { result.map(|block| ((*block.header().height()).into(), block.into())) - }); - - blocks.into_boxed() + }) } #[derive(Default)] diff --git a/crates/fuel-core/src/schema/coins.rs b/crates/fuel-core/src/schema/coins.rs index 3610c152e6b..a1066762380 100644 --- a/crates/fuel-core/src/schema/coins.rs +++ b/crates/fuel-core/src/schema/coins.rs @@ -40,6 +40,7 @@ use fuel_core_types::{ fuel_tx, }; use itertools::Itertools; +use tokio_stream::StreamExt; pub struct Coin(pub(crate) CoinModel); @@ -174,8 +175,8 @@ impl CoinQuery { before: Option, ) -> async_graphql::Result> { let query = ctx.read_view()?; + let owner: fuel_tx::Address = filter.owner.into(); crate::schema::query_pagination(after, before, first, last, |start, direction| { - let owner: fuel_tx::Address = filter.owner.into(); let coins = query .owned_coins(&owner, (*start).map(Into::into), direction) .filter_map(|result| { @@ -213,8 +214,8 @@ impl CoinQuery { #[graphql(desc = "\ The list of requested assets` coins with asset ids, `target` amount the user wants \ to reach, and the `max` number of coins in the selection. Several entries with the \ - same asset id are not allowed.")] - query_per_asset: Vec, + same asset id are not allowed. The result can't contain more coins than `max_inputs`.")] + mut query_per_asset: Vec, #[graphql(desc = "The excluded coins from the selection.")] excluded_ids: Option< ExcludeInput, >, @@ -222,6 +223,14 @@ impl CoinQuery { let params = ctx .data_unchecked::() .latest_consensus_params(); + let max_input = params.tx_params().max_inputs(); + + // `coins_to_spend` exists to help select inputs for the transactions. + // It doesn't make sense to allow the user to request more than the maximum number + // of inputs. + // TODO: To avoid breaking changes, we will truncate request for now. + // In the future, we should return an error if the input is too large. + query_per_asset.truncate(max_input as usize); let owner: fuel_tx::Address = owner.0; let query_per_asset = query_per_asset @@ -230,7 +239,10 @@ impl CoinQuery { AssetSpendTarget::new( e.asset_id.0, e.amount.0, - e.max.map(|max| max.0 as usize).unwrap_or(usize::MAX), + e.max + .and_then(|max| u16::try_from(max.0).ok()) + .unwrap_or(max_input) + .min(max_input), ) }) .collect_vec(); @@ -252,7 +264,8 @@ impl CoinQuery { let query = ctx.read_view()?; - let coins = random_improve(query.as_ref(), &spend_query)? + let coins = random_improve(query.as_ref(), &spend_query) + .await? .into_iter() .map(|coins| { coins diff --git a/crates/fuel-core/src/schema/contract.rs b/crates/fuel-core/src/schema/contract.rs index a8fe6dea693..ac4f8b67a6b 100644 --- a/crates/fuel-core/src/schema/contract.rs +++ b/crates/fuel-core/src/schema/contract.rs @@ -31,6 +31,7 @@ use fuel_core_types::{ fuel_types, services::graphql_api, }; +use futures::StreamExt; pub struct Contract(pub(crate) fuel_types::ContractId); @@ -168,7 +169,7 @@ impl ContractBalanceQuery { (*start).map(Into::into), direction, ) - .map(move |balance| { + .map(|balance| { let balance = balance?; let asset_id = balance.asset_id; diff --git a/crates/fuel-core/src/schema/message.rs b/crates/fuel-core/src/schema/message.rs index 05416623934..36f83cd11c9 100644 --- a/crates/fuel-core/src/schema/message.rs +++ b/crates/fuel-core/src/schema/message.rs @@ -28,7 +28,9 @@ use async_graphql::{ Enum, Object, }; +use fuel_core_services::stream::IntoBoxStream; use fuel_core_types::entities; +use futures::StreamExt; pub struct Message(pub(crate) entities::relayer::message::Message); @@ -91,6 +93,8 @@ impl MessageQuery { ) -> async_graphql::Result> { let query = ctx.read_view()?; + let owner = owner.map(|owner| owner.0); + let owner_ref = owner.as_ref(); crate::schema::query_pagination( after, before, @@ -103,10 +107,12 @@ impl MessageQuery { None }; - let messages = if let Some(owner) = owner { - query.owned_messages(&owner.0, start, direction) + let messages = if let Some(owner) = owner_ref { + query + .owned_messages(owner, start, direction) + .into_boxed_ref() } else { - query.all_messages(start, direction) + query.all_messages(start, direction).into_boxed_ref() }; let messages = messages.map(|result| { diff --git a/crates/fuel-core/src/schema/tx.rs b/crates/fuel-core/src/schema/tx.rs index 89b20101db0..877aaa9df91 100644 --- a/crates/fuel-core/src/schema/tx.rs +++ b/crates/fuel-core/src/schema/tx.rs @@ -67,7 +67,6 @@ use futures::{ Stream, TryStreamExt, }; -use itertools::Itertools; use std::{ borrow::Cow, iter, @@ -133,41 +132,40 @@ impl TxQuery { |start: &Option, direction| { let start = *start; let block_id = start.map(|sorted| sorted.block_height); - let all_block_ids = query.compressed_blocks(block_id, direction); + let compressed_blocks = query.compressed_blocks(block_id, direction); - let all_txs = all_block_ids - .map(move |block| { - block.map(|fuel_block| { - let (header, mut txs) = fuel_block.into_inner(); + let all_txs = compressed_blocks + .map_ok(move |fuel_block| { + let (header, mut txs) = fuel_block.into_inner(); - if direction == IterDirection::Reverse { - txs.reverse(); - } + if direction == IterDirection::Reverse { + txs.reverse(); + } - txs.into_iter().zip(iter::repeat(*header.height())) - }) + let iter = txs.into_iter().zip(iter::repeat(*header.height())); + futures::stream::iter(iter).map(Ok) }) - .flatten_ok() - .map(|result| { - result.map(|(tx_id, block_height)| { - SortedTxCursor::new(block_height, tx_id.into()) - }) + .try_flatten() + .map_ok(|(tx_id, block_height)| { + SortedTxCursor::new(block_height, tx_id.into()) }) - .skip_while(move |result| { - if let Ok(sorted) = result { - if let Some(start) = start { - return sorted != &start - } - } - false - }); - let all_txs = all_txs.map(|result: StorageResult| { - result.and_then(|sorted| { - let tx = query.transaction(&sorted.tx_id.0)?; - - Ok((sorted, Transaction::from_tx(sorted.tx_id.0, tx))) + .try_skip_while(move |sorted| { + let skip = if let Some(start) = start { + sorted != &start + } else { + false + }; + + async move { Ok(skip) } }) - }); + .map(|result: StorageResult| { + result.and_then(|sorted| { + // TODO: Request transactions in a separate thread + let tx = query.transaction(&sorted.tx_id.0)?; + + Ok((sorted, Transaction::from_tx(sorted.tx_id.0, tx))) + }) + }); Ok(all_txs) }, diff --git a/crates/fuel-core/src/state/rocks_db_key_iterator.rs b/crates/fuel-core/src/state/rocks_db_key_iterator.rs index b77e2cb7179..432ab6b41ae 100644 --- a/crates/fuel-core/src/state/rocks_db_key_iterator.rs +++ b/crates/fuel-core/src/state/rocks_db_key_iterator.rs @@ -18,9 +18,9 @@ pub struct RocksDBKeyIterator<'a, D: DBAccess, R> { _marker: core::marker::PhantomData, } -pub trait ExtractItem: 'static { +pub trait ExtractItem: Send + Sync + 'static { /// The item type returned by the iterator. - type Item; + type Item: Send + Sync; /// Extracts the item from the raw iterator. fn extract_item( diff --git a/crates/services/src/lib.rs b/crates/services/src/lib.rs index 7162389e082..10da0bc75f7 100644 --- a/crates/services/src/lib.rs +++ b/crates/services/src/lib.rs @@ -21,26 +21,37 @@ pub mod stream { Stream, }; - /// A Send + Sync BoxStream + /// A `Send` + `Sync` BoxStream with static lifetime. pub type BoxStream = core::pin::Pin + Send + Sync + 'static>>; + /// A `Send` BoxStream with a lifetime. + pub type RefBoxStream<'a, T> = core::pin::Pin + Send + 'a>>; + /// A Send + Sync BoxFuture pub type BoxFuture<'a, T> = core::pin::Pin + Send + Sync + 'a>>; /// Helper trait to create a BoxStream from a Stream pub trait IntoBoxStream: Stream { - /// Convert this stream into a BoxStream. + /// Convert this stream into a [`BoxStream`]. fn into_boxed(self) -> BoxStream where Self: Sized + Send + Sync + 'static, { Box::pin(self) } + + /// Convert this stream into a [`RefBoxStream`]. + fn into_boxed_ref<'a>(self) -> RefBoxStream<'a, Self::Item> + where + Self: Sized + Send + 'a, + { + Box::pin(self) + } } - impl IntoBoxStream for S where S: Stream + Send + Sync + 'static {} + impl IntoBoxStream for S where S: Stream + Send {} } /// Helper trait to trace errors diff --git a/crates/storage/src/iter.rs b/crates/storage/src/iter.rs index 37c38463cc8..35c550851fb 100644 --- a/crates/storage/src/iter.rs +++ b/crates/storage/src/iter.rs @@ -29,7 +29,7 @@ pub mod changes_iterator; // TODO: BoxedIter to be used until RPITIT lands in stable rust. /// A boxed variant of the iterator that can be used as a return type of the traits. pub struct BoxedIter<'a, T> { - iter: Box + 'a>, + iter: Box + 'a + Send>, } impl<'a, T> Iterator for BoxedIter<'a, T> { @@ -48,7 +48,7 @@ pub trait IntoBoxedIter<'a, T> { impl<'a, T, I> IntoBoxedIter<'a, T> for I where - I: Iterator + 'a, + I: Iterator + 'a + Send, { fn into_boxed(self) -> BoxedIter<'a, T> { BoxedIter { @@ -346,7 +346,10 @@ pub fn iterator<'a, V>( prefix: Option<&[u8]>, start: Option<&[u8]>, direction: IterDirection, -) -> impl Iterator + 'a { +) -> impl Iterator + 'a +where + V: Send + Sync, +{ match (prefix, start) { (None, None) => { if direction == IterDirection::Forward { @@ -401,7 +404,10 @@ pub fn keys_iterator<'a, V>( prefix: Option<&[u8]>, start: Option<&[u8]>, direction: IterDirection, -) -> impl Iterator + 'a { +) -> impl Iterator + 'a +where + V: Send + Sync, +{ match (prefix, start) { (None, None) => { if direction == IterDirection::Forward { diff --git a/crates/storage/src/kv_store.rs b/crates/storage/src/kv_store.rs index e9d4eb22a52..67ab9493c7f 100644 --- a/crates/storage/src/kv_store.rs +++ b/crates/storage/src/kv_store.rs @@ -17,13 +17,8 @@ use core::ops::Deref; /// The key of the storage. pub type Key = Vec; -#[cfg(feature = "std")] /// The value of the storage. It is wrapped into the `Arc` to provide less cloning of massive objects. -pub type Value = std::sync::Arc>; - -#[cfg(not(feature = "std"))] -/// The value of the storage. It is wrapped into the `Rc` to provide less cloning of massive objects. -pub type Value = alloc::rc::Rc>; +pub type Value = alloc::sync::Arc>; /// The pair of key and value from the storage. pub type KVItem = StorageResult<(Key, Value)>;