diff --git a/substrate/client/rpc-spec-v2/src/archive/api.rs b/substrate/client/rpc-spec-v2/src/archive/api.rs index dcfeaecb147b..a205d0502c93 100644 --- a/substrate/client/rpc-spec-v2/src/archive/api.rs +++ b/substrate/client/rpc-spec-v2/src/archive/api.rs @@ -20,8 +20,7 @@ use crate::{ common::events::{ - ArchiveStorageDiffEvent, ArchiveStorageDiffItem, ArchiveStorageResult, - PaginatedStorageQuery, + ArchiveStorageDiffEvent, ArchiveStorageDiffItem, ArchiveStorageEvent, StorageQuery, }, MethodResult, }; @@ -100,13 +99,17 @@ pub trait ArchiveApi { /// # Unstable /// /// This method is unstable and subject to change in the future. - #[method(name = "archive_unstable_storage", blocking)] + #[subscription( + name = "archive_unstable_storage" => "archive_unstable_storageEvent", + unsubscribe = "archive_unstable_stopStorage", + item = ArchiveStorageEvent, + )] fn archive_unstable_storage( &self, hash: Hash, - items: Vec>, + items: Vec>, child_trie: Option, - ) -> RpcResult; + ); /// Returns the storage difference between two blocks. /// diff --git a/substrate/client/rpc-spec-v2/src/archive/archive.rs b/substrate/client/rpc-spec-v2/src/archive/archive.rs index 55054d91d85d..62e44a016241 100644 --- a/substrate/client/rpc-spec-v2/src/archive/archive.rs +++ b/substrate/client/rpc-spec-v2/src/archive/archive.rs @@ -20,13 +20,13 @@ use crate::{ archive::{ - archive_storage::{ArchiveStorage, ArchiveStorageDiff}, - error::Error as ArchiveError, - ArchiveApiServer, + archive_storage::ArchiveStorageDiff, error::Error as ArchiveError, ArchiveApiServer, }, - common::events::{ - ArchiveStorageDiffEvent, ArchiveStorageDiffItem, ArchiveStorageResult, - PaginatedStorageQuery, + common::{ + events::{ + ArchiveStorageDiffEvent, ArchiveStorageDiffItem, ArchiveStorageEvent, StorageQuery, + }, + storage::{QueryResult, StorageSubscriptionClient}, }, hex_string, MethodResult, SubscriptionTaskExecutor, }; @@ -57,42 +57,12 @@ use tokio::sync::mpsc; pub(crate) const LOG_TARGET: &str = "rpc-spec-v2::archive"; -/// The configuration of [`Archive`]. -pub struct ArchiveConfig { - /// The maximum number of items the `archive_storage` can return for a descendant query before - /// pagination is required. - pub max_descendant_responses: usize, - /// The maximum number of queried items allowed for the `archive_storage` at a time. - pub max_queried_items: usize, -} - -/// The maximum number of items the `archive_storage` can return for a descendant query before -/// pagination is required. -/// -/// Note: this is identical to the `chainHead` value. -const MAX_DESCENDANT_RESPONSES: usize = 5; - -/// The maximum number of queried items allowed for the `archive_storage` at a time. -/// -/// Note: A queried item can also be a descendant query which can return up to -/// `MAX_DESCENDANT_RESPONSES`. -const MAX_QUERIED_ITEMS: usize = 8; - /// The buffer capacity for each storage query. /// /// This is small because the underlying JSON-RPC server has /// its down buffer capacity per connection as well. const STORAGE_QUERY_BUF: usize = 16; -impl Default for ArchiveConfig { - fn default() -> Self { - Self { - max_descendant_responses: MAX_DESCENDANT_RESPONSES, - max_queried_items: MAX_QUERIED_ITEMS, - } - } -} - /// An API for archive RPC calls. pub struct Archive, Block: BlockT, Client> { /// Substrate client. @@ -103,11 +73,6 @@ pub struct Archive, Block: BlockT, Client> { executor: SubscriptionTaskExecutor, /// The hexadecimal encoded hash of the genesis block. genesis_hash: String, - /// The maximum number of items the `archive_storage` can return for a descendant query before - /// pagination is required. - storage_max_descendant_responses: usize, - /// The maximum number of queried items allowed for the `archive_storage` at a time. - storage_max_queried_items: usize, /// Phantom member to pin the block type. _phantom: PhantomData, } @@ -119,18 +84,9 @@ impl, Block: BlockT, Client> Archive { backend: Arc, genesis_hash: GenesisHash, executor: SubscriptionTaskExecutor, - config: ArchiveConfig, ) -> Self { let genesis_hash = hex_string(&genesis_hash.as_ref()); - Self { - client, - backend, - executor, - genesis_hash, - storage_max_descendant_responses: config.max_descendant_responses, - storage_max_queried_items: config.max_queried_items, - _phantom: PhantomData, - } + Self { client, backend, executor, genesis_hash, _phantom: PhantomData } } } @@ -260,47 +216,53 @@ where fn archive_unstable_storage( &self, + pending: PendingSubscriptionSink, hash: Block::Hash, - items: Vec>, + items: Vec>, child_trie: Option, - ) -> RpcResult { - let items = items - .into_iter() - .map(|query| { - let key = StorageKey(parse_hex_param(query.key)?); - let pagination_start_key = query - .pagination_start_key - .map(|key| parse_hex_param(key).map(|key| StorageKey(key))) - .transpose()?; - - // Paginated start key is only supported - if pagination_start_key.is_some() && !query.query_type.is_descendant_query() { - return Err(ArchiveError::InvalidParam( - "Pagination start key is only supported for descendants queries" - .to_string(), - )) - } + ) { + let mut storage_client = + StorageSubscriptionClient::::new(self.client.clone()); + + let fut = async move { + let Ok(mut sink) = pending.accept().await.map(Subscription::from) else { return }; - Ok(PaginatedStorageQuery { - key, - query_type: query.query_type, - pagination_start_key, + let items = match items + .into_iter() + .map(|query| { + let key = StorageKey(parse_hex_param(query.key)?); + Ok(StorageQuery { key, query_type: query.query_type }) }) - }) - .collect::, ArchiveError>>()?; + .collect::, ArchiveError>>() + { + Ok(items) => items, + Err(error) => { + let _ = sink.send(&ArchiveStorageEvent::err(error.to_string())); + return + }, + }; - let child_trie = child_trie - .map(|child_trie| parse_hex_param(child_trie)) - .transpose()? - .map(ChildInfo::new_default_from_vec); + let child_trie = child_trie.map(|child_trie| parse_hex_param(child_trie)).transpose(); + let child_trie = match child_trie { + Ok(child_trie) => child_trie.map(ChildInfo::new_default_from_vec), + Err(error) => { + let _ = sink.send(&ArchiveStorageEvent::err(error.to_string())); + return + }, + }; - let storage_client = ArchiveStorage::new( - self.client.clone(), - self.storage_max_descendant_responses, - self.storage_max_queried_items, - ); + let (tx, mut rx) = tokio::sync::mpsc::channel(STORAGE_QUERY_BUF); + let storage_fut = storage_client.generate_events(hash, items, child_trie, tx); - Ok(storage_client.handle_query(hash, items, child_trie)) + // We don't care about the return value of this join: + // - process_events might encounter an error (if the client disconnected) + // - storage_fut might encounter an error while processing a trie queries and + // the error is propagated via the sink. + let _ = futures::future::join(storage_fut, process_storage_events(&mut rx, &mut sink)) + .await; + }; + + self.executor.spawn("substrate-rpc-subscription", Some("rpc"), fut.boxed()); } fn archive_unstable_storage_diff( @@ -337,24 +299,74 @@ where // - process_events might encounter an error (if the client disconnected) // - storage_fut might encounter an error while processing a trie queries and // the error is propagated via the sink. - let _ = futures::future::join(storage_fut, process_events(&mut rx, &mut sink)).await; + let _ = + futures::future::join(storage_fut, process_storage_diff_events(&mut rx, &mut sink)) + .await; }; self.executor.spawn("substrate-rpc-subscription", Some("rpc"), fut.boxed()); } } -/// Sends all the events to the sink. -async fn process_events(rx: &mut mpsc::Receiver, sink: &mut Subscription) { - while let Some(event) = rx.recv().await { - if event.is_done() { - log::debug!(target: LOG_TARGET, "Finished processing partial trie query"); - } else if event.is_err() { - log::debug!(target: LOG_TARGET, "Error encountered while processing partial trie query"); +/// Sends all the events of the storage_diff method to the sink. +async fn process_storage_diff_events( + rx: &mut mpsc::Receiver, + sink: &mut Subscription, +) { + loop { + tokio::select! { + _ = sink.closed() => { + return + }, + + maybe_event = rx.recv() => { + let Some(event) = maybe_event else { + break; + }; + + if event.is_done() { + log::debug!(target: LOG_TARGET, "Finished processing partial trie query"); + } else if event.is_err() { + log::debug!(target: LOG_TARGET, "Error encountered while processing partial trie query"); + } + + if sink.send(&event).await.is_err() { + return + } + } } + } +} + +/// Sends all the events of the storage method to the sink. +async fn process_storage_events(rx: &mut mpsc::Receiver, sink: &mut Subscription) { + loop { + tokio::select! { + _ = sink.closed() => { + break + } + + maybe_storage = rx.recv() => { + let Some(event) = maybe_storage else { + break; + }; + + match event { + Ok(None) => continue, + + Ok(Some(event)) => + if sink.send(&ArchiveStorageEvent::result(event)).await.is_err() { + return + }, - if sink.send(&event).await.is_err() { - return + Err(error) => { + let _ = sink.send(&ArchiveStorageEvent::err(error)).await; + return + } + } + } } } + + let _ = sink.send(&ArchiveStorageEvent::StorageDone).await; } diff --git a/substrate/client/rpc-spec-v2/src/archive/archive_storage.rs b/substrate/client/rpc-spec-v2/src/archive/archive_storage.rs index 5a3920882f00..390db765a48f 100644 --- a/substrate/client/rpc-spec-v2/src/archive/archive_storage.rs +++ b/substrate/client/rpc-spec-v2/src/archive/archive_storage.rs @@ -33,114 +33,13 @@ use crate::{ common::{ events::{ ArchiveStorageDiffEvent, ArchiveStorageDiffItem, ArchiveStorageDiffOperationType, - ArchiveStorageDiffResult, ArchiveStorageDiffType, ArchiveStorageResult, - PaginatedStorageQuery, StorageQueryType, StorageResult, + ArchiveStorageDiffResult, ArchiveStorageDiffType, StorageResult, }, - storage::{IterQueryType, QueryIter, Storage}, + storage::Storage, }, }; use tokio::sync::mpsc; -/// Generates the events of the `archive_storage` method. -pub struct ArchiveStorage { - /// Storage client. - client: Storage, - /// The maximum number of responses the API can return for a descendant query at a time. - storage_max_descendant_responses: usize, - /// The maximum number of queried items allowed for the `archive_storage` at a time. - storage_max_queried_items: usize, -} - -impl ArchiveStorage { - /// Constructs a new [`ArchiveStorage`]. - pub fn new( - client: Arc, - storage_max_descendant_responses: usize, - storage_max_queried_items: usize, - ) -> Self { - Self { - client: Storage::new(client), - storage_max_descendant_responses, - storage_max_queried_items, - } - } -} - -impl ArchiveStorage -where - Block: BlockT + 'static, - BE: Backend + 'static, - Client: StorageProvider + 'static, -{ - /// Generate the response of the `archive_storage` method. - pub fn handle_query( - &self, - hash: Block::Hash, - mut items: Vec>, - child_key: Option, - ) -> ArchiveStorageResult { - let discarded_items = items.len().saturating_sub(self.storage_max_queried_items); - items.truncate(self.storage_max_queried_items); - - let mut storage_results = Vec::with_capacity(items.len()); - for item in items { - match item.query_type { - StorageQueryType::Value => { - match self.client.query_value(hash, &item.key, child_key.as_ref()) { - Ok(Some(value)) => storage_results.push(value), - Ok(None) => continue, - Err(error) => return ArchiveStorageResult::err(error), - } - }, - StorageQueryType::Hash => - match self.client.query_hash(hash, &item.key, child_key.as_ref()) { - Ok(Some(value)) => storage_results.push(value), - Ok(None) => continue, - Err(error) => return ArchiveStorageResult::err(error), - }, - StorageQueryType::ClosestDescendantMerkleValue => - match self.client.query_merkle_value(hash, &item.key, child_key.as_ref()) { - Ok(Some(value)) => storage_results.push(value), - Ok(None) => continue, - Err(error) => return ArchiveStorageResult::err(error), - }, - StorageQueryType::DescendantsValues => { - match self.client.query_iter_pagination( - QueryIter { - query_key: item.key, - ty: IterQueryType::Value, - pagination_start_key: item.pagination_start_key, - }, - hash, - child_key.as_ref(), - self.storage_max_descendant_responses, - ) { - Ok((results, _)) => storage_results.extend(results), - Err(error) => return ArchiveStorageResult::err(error), - } - }, - StorageQueryType::DescendantsHashes => { - match self.client.query_iter_pagination( - QueryIter { - query_key: item.key, - ty: IterQueryType::Hash, - pagination_start_key: item.pagination_start_key, - }, - hash, - child_key.as_ref(), - self.storage_max_descendant_responses, - ) { - Ok((results, _)) => storage_results.extend(results), - Err(error) => return ArchiveStorageResult::err(error), - } - }, - }; - } - - ArchiveStorageResult::ok(storage_results, discarded_items) - } -} - /// Parse hex-encoded string parameter as raw bytes. /// /// If the parsing fails, returns an error propagated to the RPC method. diff --git a/substrate/client/rpc-spec-v2/src/archive/mod.rs b/substrate/client/rpc-spec-v2/src/archive/mod.rs index 5f020c203eab..14fa104c113a 100644 --- a/substrate/client/rpc-spec-v2/src/archive/mod.rs +++ b/substrate/client/rpc-spec-v2/src/archive/mod.rs @@ -32,4 +32,4 @@ pub mod archive; pub mod error; pub use api::ArchiveApiServer; -pub use archive::{Archive, ArchiveConfig}; +pub use archive::Archive; diff --git a/substrate/client/rpc-spec-v2/src/archive/tests.rs b/substrate/client/rpc-spec-v2/src/archive/tests.rs index 994c5d28bd61..cddaafde6659 100644 --- a/substrate/client/rpc-spec-v2/src/archive/tests.rs +++ b/substrate/client/rpc-spec-v2/src/archive/tests.rs @@ -19,16 +19,13 @@ use crate::{ common::events::{ ArchiveStorageDiffEvent, ArchiveStorageDiffItem, ArchiveStorageDiffOperationType, - ArchiveStorageDiffResult, ArchiveStorageDiffType, ArchiveStorageMethodOk, - ArchiveStorageResult, PaginatedStorageQuery, StorageQueryType, StorageResultType, + ArchiveStorageDiffResult, ArchiveStorageDiffType, ArchiveStorageEvent, StorageQuery, + StorageQueryType, StorageResult, StorageResultType, }, hex_string, MethodResult, }; -use super::{ - archive::{Archive, ArchiveConfig}, - *, -}; +use super::{archive::Archive, *}; use assert_matches::assert_matches; use codec::{Decode, Encode}; @@ -55,8 +52,6 @@ use substrate_test_runtime_client::{ const CHAIN_GENESIS: [u8; 32] = [0; 32]; const INVALID_HASH: [u8; 32] = [1; 32]; -const MAX_PAGINATION_LIMIT: usize = 5; -const MAX_QUERIED_LIMIT: usize = 5; const KEY: &[u8] = b":mock"; const VALUE: &[u8] = b"hello world"; const CHILD_STORAGE_KEY: &[u8] = b"child"; @@ -65,10 +60,7 @@ const CHILD_VALUE: &[u8] = b"child value"; type Header = substrate_test_runtime_client::runtime::Header; type Block = substrate_test_runtime_client::runtime::Block; -fn setup_api( - max_descendant_responses: usize, - max_queried_items: usize, -) -> (Arc>, RpcModule>>) { +fn setup_api() -> (Arc>, RpcModule>>) { let child_info = ChildInfo::new_default(CHILD_STORAGE_KEY); let builder = TestClientBuilder::new().add_extra_child_storage( &child_info, @@ -83,7 +75,6 @@ fn setup_api( backend, CHAIN_GENESIS, Arc::new(TokioTestExecutor::default()), - ArchiveConfig { max_descendant_responses, max_queried_items }, ) .into_rpc(); @@ -101,7 +92,7 @@ async fn get_next_event(sub: &mut RpcSubscriptio #[tokio::test] async fn archive_genesis() { - let (_client, api) = setup_api(MAX_PAGINATION_LIMIT, MAX_QUERIED_LIMIT); + let (_client, api) = setup_api(); let genesis: String = api.call("archive_unstable_genesisHash", EmptyParams::new()).await.unwrap(); @@ -110,7 +101,7 @@ async fn archive_genesis() { #[tokio::test] async fn archive_body() { - let (client, api) = setup_api(MAX_PAGINATION_LIMIT, MAX_QUERIED_LIMIT); + let (client, api) = setup_api(); // Invalid block hash. let invalid_hash = hex_string(&INVALID_HASH); @@ -144,7 +135,7 @@ async fn archive_body() { #[tokio::test] async fn archive_header() { - let (client, api) = setup_api(MAX_PAGINATION_LIMIT, MAX_QUERIED_LIMIT); + let (client, api) = setup_api(); // Invalid block hash. let invalid_hash = hex_string(&INVALID_HASH); @@ -178,7 +169,7 @@ async fn archive_header() { #[tokio::test] async fn archive_finalized_height() { - let (client, api) = setup_api(MAX_PAGINATION_LIMIT, MAX_QUERIED_LIMIT); + let (client, api) = setup_api(); let client_height: u32 = client.info().finalized_number.saturated_into(); @@ -190,7 +181,7 @@ async fn archive_finalized_height() { #[tokio::test] async fn archive_hash_by_height() { - let (client, api) = setup_api(MAX_PAGINATION_LIMIT, MAX_QUERIED_LIMIT); + let (client, api) = setup_api(); // Genesis height. let hashes: Vec = api.call("archive_unstable_hashByHeight", [0]).await.unwrap(); @@ -296,7 +287,7 @@ async fn archive_hash_by_height() { #[tokio::test] async fn archive_call() { - let (client, api) = setup_api(MAX_PAGINATION_LIMIT, MAX_QUERIED_LIMIT); + let (client, api) = setup_api(); let invalid_hash = hex_string(&INVALID_HASH); // Invalid parameter (non-hex). @@ -355,7 +346,7 @@ async fn archive_call() { #[tokio::test] async fn archive_storage_hashes_values() { - let (client, api) = setup_api(MAX_PAGINATION_LIMIT, MAX_QUERIED_LIMIT); + let (client, api) = setup_api(); let block = BlockBuilderBuilder::new(&*client) .on_parent_block(client.chain_info().genesis_hash) @@ -369,42 +360,23 @@ async fn archive_storage_hashes_values() { let block_hash = format!("{:?}", block.header.hash()); let key = hex_string(&KEY); - let items: Vec> = vec![ - PaginatedStorageQuery { - key: key.clone(), - query_type: StorageQueryType::DescendantsHashes, - pagination_start_key: None, - }, - PaginatedStorageQuery { - key: key.clone(), - query_type: StorageQueryType::DescendantsValues, - pagination_start_key: None, - }, - PaginatedStorageQuery { - key: key.clone(), - query_type: StorageQueryType::Hash, - pagination_start_key: None, - }, - PaginatedStorageQuery { - key: key.clone(), - query_type: StorageQueryType::Value, - pagination_start_key: None, - }, + let items: Vec> = vec![ + StorageQuery { key: key.clone(), query_type: StorageQueryType::DescendantsHashes }, + StorageQuery { key: key.clone(), query_type: StorageQueryType::DescendantsValues }, + StorageQuery { key: key.clone(), query_type: StorageQueryType::Hash }, + StorageQuery { key: key.clone(), query_type: StorageQueryType::Value }, ]; - let result: ArchiveStorageResult = api - .call("archive_unstable_storage", rpc_params![&block_hash, items.clone()]) + let mut sub = api + .subscribe_unbounded("archive_unstable_storage", rpc_params![&block_hash, items.clone()]) .await .unwrap(); - match result { - ArchiveStorageResult::Ok(ArchiveStorageMethodOk { result, discarded_items }) => { - // Key has not been imported yet. - assert_eq!(result.len(), 0); - assert_eq!(discarded_items, 0); - }, - _ => panic!("Unexpected result"), - }; + // Key has not been imported yet. + assert_eq!( + get_next_event::(&mut sub).await, + ArchiveStorageEvent::StorageDone, + ); // Import a block with the given key value pair. let mut builder = BlockBuilderBuilder::new(&*client) @@ -420,32 +392,103 @@ async fn archive_storage_hashes_values() { let expected_hash = format!("{:?}", Blake2Hasher::hash(&VALUE)); let expected_value = hex_string(&VALUE); - let result: ArchiveStorageResult = api - .call("archive_unstable_storage", rpc_params![&block_hash, items]) + let mut sub = api + .subscribe_unbounded("archive_unstable_storage", rpc_params![&block_hash, items]) .await .unwrap(); - match result { - ArchiveStorageResult::Ok(ArchiveStorageMethodOk { result, discarded_items }) => { - assert_eq!(result.len(), 4); - assert_eq!(discarded_items, 0); - - assert_eq!(result[0].key, key); - assert_eq!(result[0].result, StorageResultType::Hash(expected_hash.clone())); - assert_eq!(result[1].key, key); - assert_eq!(result[1].result, StorageResultType::Value(expected_value.clone())); - assert_eq!(result[2].key, key); - assert_eq!(result[2].result, StorageResultType::Hash(expected_hash)); - assert_eq!(result[3].key, key); - assert_eq!(result[3].result, StorageResultType::Value(expected_value)); - }, - _ => panic!("Unexpected result"), - }; + assert_eq!( + get_next_event::(&mut sub).await, + ArchiveStorageEvent::Storage(StorageResult { + key: key.clone(), + result: StorageResultType::Hash(expected_hash.clone()), + child_trie_key: None, + }), + ); + + assert_eq!( + get_next_event::(&mut sub).await, + ArchiveStorageEvent::Storage(StorageResult { + key: key.clone(), + result: StorageResultType::Value(expected_value.clone()), + child_trie_key: None, + }), + ); + + assert_eq!( + get_next_event::(&mut sub).await, + ArchiveStorageEvent::Storage(StorageResult { + key: key.clone(), + result: StorageResultType::Hash(expected_hash), + child_trie_key: None, + }), + ); + + assert_eq!( + get_next_event::(&mut sub).await, + ArchiveStorageEvent::Storage(StorageResult { + key: key.clone(), + result: StorageResultType::Value(expected_value), + child_trie_key: None, + }), + ); + + assert_matches!( + get_next_event::(&mut sub).await, + ArchiveStorageEvent::StorageDone + ); +} + +#[tokio::test] +async fn archive_storage_hashes_values_child_trie() { + let (client, api) = setup_api(); + + // Get child storage values set in `setup_api`. + let child_info = hex_string(&CHILD_STORAGE_KEY); + let key = hex_string(&KEY); + let genesis_hash = format!("{:?}", client.genesis_hash()); + let expected_hash = format!("{:?}", Blake2Hasher::hash(&CHILD_VALUE)); + let expected_value = hex_string(&CHILD_VALUE); + + let items: Vec> = vec![ + StorageQuery { key: key.clone(), query_type: StorageQueryType::DescendantsHashes }, + StorageQuery { key: key.clone(), query_type: StorageQueryType::DescendantsValues }, + ]; + let mut sub = api + .subscribe_unbounded( + "archive_unstable_storage", + rpc_params![&genesis_hash, items, &child_info], + ) + .await + .unwrap(); + + assert_eq!( + get_next_event::(&mut sub).await, + ArchiveStorageEvent::Storage(StorageResult { + key: key.clone(), + result: StorageResultType::Hash(expected_hash.clone()), + child_trie_key: Some(child_info.clone()), + }) + ); + + assert_eq!( + get_next_event::(&mut sub).await, + ArchiveStorageEvent::Storage(StorageResult { + key: key.clone(), + result: StorageResultType::Value(expected_value.clone()), + child_trie_key: Some(child_info.clone()), + }) + ); + + assert_eq!( + get_next_event::(&mut sub).await, + ArchiveStorageEvent::StorageDone, + ); } #[tokio::test] async fn archive_storage_closest_merkle_value() { - let (client, api) = setup_api(MAX_PAGINATION_LIMIT, MAX_QUERIED_LIMIT); + let (client, api) = setup_api(); /// The core of this test. /// @@ -457,55 +500,47 @@ async fn archive_storage_closest_merkle_value() { api: &RpcModule>>, block_hash: String, ) -> HashMap { - let result: ArchiveStorageResult = api - .call( + let mut sub = api + .subscribe_unbounded( "archive_unstable_storage", rpc_params![ &block_hash, vec![ - PaginatedStorageQuery { + StorageQuery { key: hex_string(b":AAAA"), query_type: StorageQueryType::ClosestDescendantMerkleValue, - pagination_start_key: None, }, - PaginatedStorageQuery { + StorageQuery { key: hex_string(b":AAAB"), query_type: StorageQueryType::ClosestDescendantMerkleValue, - pagination_start_key: None, }, // Key with descendant. - PaginatedStorageQuery { + StorageQuery { key: hex_string(b":A"), query_type: StorageQueryType::ClosestDescendantMerkleValue, - pagination_start_key: None, }, - PaginatedStorageQuery { + StorageQuery { key: hex_string(b":AA"), query_type: StorageQueryType::ClosestDescendantMerkleValue, - pagination_start_key: None, }, // Keys below this comment do not produce a result. // Key that exceed the keyspace of the trie. - PaginatedStorageQuery { + StorageQuery { key: hex_string(b":AAAAX"), query_type: StorageQueryType::ClosestDescendantMerkleValue, - pagination_start_key: None, }, - PaginatedStorageQuery { + StorageQuery { key: hex_string(b":AAABX"), query_type: StorageQueryType::ClosestDescendantMerkleValue, - pagination_start_key: None, }, // Key that are not part of the trie. - PaginatedStorageQuery { + StorageQuery { key: hex_string(b":AAX"), query_type: StorageQueryType::ClosestDescendantMerkleValue, - pagination_start_key: None, }, - PaginatedStorageQuery { + StorageQuery { key: hex_string(b":AAAX"), query_type: StorageQueryType::ClosestDescendantMerkleValue, - pagination_start_key: None, }, ] ], @@ -513,19 +548,21 @@ async fn archive_storage_closest_merkle_value() { .await .unwrap(); - let merkle_values: HashMap<_, _> = match result { - ArchiveStorageResult::Ok(ArchiveStorageMethodOk { result, .. }) => result - .into_iter() - .map(|res| { - let value = match res.result { + let mut merkle_values = HashMap::new(); + loop { + let event = get_next_event::(&mut sub).await; + match event { + ArchiveStorageEvent::Storage(result) => { + let str_result = match result.result { StorageResultType::ClosestDescendantMerkleValue(value) => value, - _ => panic!("Unexpected StorageResultType"), + _ => panic!("Unexpected result type"), }; - (res.key, value) - }) - .collect(), - _ => panic!("Unexpected result"), - }; + merkle_values.insert(result.key, str_result); + }, + ArchiveStorageEvent::StorageError(err) => panic!("Unexpected error {err:?}"), + ArchiveStorageEvent::StorageDone => break, + } + } // Response for AAAA, AAAB, A and AA. assert_eq!(merkle_values.len(), 4); @@ -604,9 +641,9 @@ async fn archive_storage_closest_merkle_value() { } #[tokio::test] -async fn archive_storage_paginate_iterations() { +async fn archive_storage_iterations() { // 1 iteration allowed before pagination kicks in. - let (client, api) = setup_api(1, MAX_QUERIED_LIMIT); + let (client, api) = setup_api(); // Import a new block with storage changes. let mut builder = BlockBuilderBuilder::new(&*client) @@ -625,237 +662,94 @@ async fn archive_storage_paginate_iterations() { // Calling with an invalid hash. let invalid_hash = hex_string(&INVALID_HASH); - let result: ArchiveStorageResult = api - .call( + let mut sub = api + .subscribe_unbounded( "archive_unstable_storage", rpc_params![ &invalid_hash, - vec![PaginatedStorageQuery { - key: hex_string(b":m"), - query_type: StorageQueryType::DescendantsValues, - pagination_start_key: None, - }] - ], - ) - .await - .unwrap(); - match result { - ArchiveStorageResult::Err(_) => (), - _ => panic!("Unexpected result"), - }; - - // Valid call with storage at the key. - let result: ArchiveStorageResult = api - .call( - "archive_unstable_storage", - rpc_params![ - &block_hash, - vec![PaginatedStorageQuery { - key: hex_string(b":m"), - query_type: StorageQueryType::DescendantsValues, - pagination_start_key: None, - }] - ], - ) - .await - .unwrap(); - match result { - ArchiveStorageResult::Ok(ArchiveStorageMethodOk { result, discarded_items }) => { - assert_eq!(result.len(), 1); - assert_eq!(discarded_items, 0); - - assert_eq!(result[0].key, hex_string(b":m")); - assert_eq!(result[0].result, StorageResultType::Value(hex_string(b"a"))); - }, - _ => panic!("Unexpected result"), - }; - - // Continue with pagination. - let result: ArchiveStorageResult = api - .call( - "archive_unstable_storage", - rpc_params![ - &block_hash, - vec![PaginatedStorageQuery { - key: hex_string(b":m"), - query_type: StorageQueryType::DescendantsValues, - pagination_start_key: Some(hex_string(b":m")), - }] - ], - ) - .await - .unwrap(); - match result { - ArchiveStorageResult::Ok(ArchiveStorageMethodOk { result, discarded_items }) => { - assert_eq!(result.len(), 1); - assert_eq!(discarded_items, 0); - - assert_eq!(result[0].key, hex_string(b":mo")); - assert_eq!(result[0].result, StorageResultType::Value(hex_string(b"ab"))); - }, - _ => panic!("Unexpected result"), - }; - - // Continue with pagination. - let result: ArchiveStorageResult = api - .call( - "archive_unstable_storage", - rpc_params![ - &block_hash, - vec![PaginatedStorageQuery { + vec![StorageQuery { key: hex_string(b":m"), query_type: StorageQueryType::DescendantsValues, - pagination_start_key: Some(hex_string(b":mo")), }] ], ) .await .unwrap(); - match result { - ArchiveStorageResult::Ok(ArchiveStorageMethodOk { result, discarded_items }) => { - assert_eq!(result.len(), 1); - assert_eq!(discarded_items, 0); - - assert_eq!(result[0].key, hex_string(b":moD")); - assert_eq!(result[0].result, StorageResultType::Value(hex_string(b"abcmoD"))); - }, - _ => panic!("Unexpected result"), - }; - // Continue with pagination. - let result: ArchiveStorageResult = api - .call( - "archive_unstable_storage", - rpc_params![ - &block_hash, - vec![PaginatedStorageQuery { - key: hex_string(b":m"), - query_type: StorageQueryType::DescendantsValues, - pagination_start_key: Some(hex_string(b":moD")), - }] - ], - ) - .await - .unwrap(); - match result { - ArchiveStorageResult::Ok(ArchiveStorageMethodOk { result, discarded_items }) => { - assert_eq!(result.len(), 1); - assert_eq!(discarded_items, 0); - - assert_eq!(result[0].key, hex_string(b":moc")); - assert_eq!(result[0].result, StorageResultType::Value(hex_string(b"abc"))); - }, - _ => panic!("Unexpected result"), - }; + assert_matches!( + get_next_event::(&mut sub).await, + ArchiveStorageEvent::StorageError(_) + ); - // Continue with pagination. - let result: ArchiveStorageResult = api - .call( + // Valid call with storage at the key. + let mut sub = api + .subscribe_unbounded( "archive_unstable_storage", rpc_params![ &block_hash, - vec![PaginatedStorageQuery { + vec![StorageQuery { key: hex_string(b":m"), query_type: StorageQueryType::DescendantsValues, - pagination_start_key: Some(hex_string(b":moc")), }] ], ) .await .unwrap(); - match result { - ArchiveStorageResult::Ok(ArchiveStorageMethodOk { result, discarded_items }) => { - assert_eq!(result.len(), 1); - assert_eq!(discarded_items, 0); - assert_eq!(result[0].key, hex_string(b":mock")); - assert_eq!(result[0].result, StorageResultType::Value(hex_string(b"abcd"))); - }, - _ => panic!("Unexpected result"), - }; + assert_eq!( + get_next_event::(&mut sub).await, + ArchiveStorageEvent::Storage(StorageResult { + key: hex_string(b":m"), + result: StorageResultType::Value(hex_string(b"a")), + child_trie_key: None, + }) + ); - // Continue with pagination until no keys are returned. - let result: ArchiveStorageResult = api - .call( - "archive_unstable_storage", - rpc_params![ - &block_hash, - vec![PaginatedStorageQuery { - key: hex_string(b":m"), - query_type: StorageQueryType::DescendantsValues, - pagination_start_key: Some(hex_string(b":mock")), - }] - ], - ) - .await - .unwrap(); - match result { - ArchiveStorageResult::Ok(ArchiveStorageMethodOk { result, discarded_items }) => { - assert_eq!(result.len(), 0); - assert_eq!(discarded_items, 0); - }, - _ => panic!("Unexpected result"), - }; -} + assert_eq!( + get_next_event::(&mut sub).await, + ArchiveStorageEvent::Storage(StorageResult { + key: hex_string(b":mo"), + result: StorageResultType::Value(hex_string(b"ab")), + child_trie_key: None, + }) + ); -#[tokio::test] -async fn archive_storage_discarded_items() { - // One query at a time - let (client, api) = setup_api(MAX_PAGINATION_LIMIT, 1); + assert_eq!( + get_next_event::(&mut sub).await, + ArchiveStorageEvent::Storage(StorageResult { + key: hex_string(b":moD"), + result: StorageResultType::Value(hex_string(b"abcmoD")), + child_trie_key: None, + }) + ); - // Import a new block with storage changes. - let mut builder = BlockBuilderBuilder::new(&*client) - .on_parent_block(client.chain_info().genesis_hash) - .with_parent_block_number(0) - .build() - .unwrap(); - builder.push_storage_change(b":m".to_vec(), Some(b"a".to_vec())).unwrap(); - let block = builder.build().unwrap().block; - let block_hash = format!("{:?}", block.header.hash()); - client.import(BlockOrigin::Own, block.clone()).await.unwrap(); + assert_eq!( + get_next_event::(&mut sub).await, + ArchiveStorageEvent::Storage(StorageResult { + key: hex_string(b":moc"), + result: StorageResultType::Value(hex_string(b"abc")), + child_trie_key: None, + }) + ); - // Valid call with storage at the key. - let result: ArchiveStorageResult = api - .call( - "archive_unstable_storage", - rpc_params![ - &block_hash, - vec![ - PaginatedStorageQuery { - key: hex_string(b":m"), - query_type: StorageQueryType::Value, - pagination_start_key: None, - }, - PaginatedStorageQuery { - key: hex_string(b":m"), - query_type: StorageQueryType::Hash, - pagination_start_key: None, - }, - PaginatedStorageQuery { - key: hex_string(b":m"), - query_type: StorageQueryType::Hash, - pagination_start_key: None, - } - ] - ], - ) - .await - .unwrap(); - match result { - ArchiveStorageResult::Ok(ArchiveStorageMethodOk { result, discarded_items }) => { - assert_eq!(result.len(), 1); - assert_eq!(discarded_items, 2); + assert_eq!( + get_next_event::(&mut sub).await, + ArchiveStorageEvent::Storage(StorageResult { + key: hex_string(b":mock"), + result: StorageResultType::Value(hex_string(b"abcd")), + child_trie_key: None, + }) + ); - assert_eq!(result[0].key, hex_string(b":m")); - assert_eq!(result[0].result, StorageResultType::Value(hex_string(b"a"))); - }, - _ => panic!("Unexpected result"), - }; + assert_matches!( + get_next_event::(&mut sub).await, + ArchiveStorageEvent::StorageDone + ); } #[tokio::test] async fn archive_storage_diff_main_trie() { - let (client, api) = setup_api(MAX_PAGINATION_LIMIT, MAX_QUERIED_LIMIT); + let (client, api) = setup_api(); let mut builder = BlockBuilderBuilder::new(&*client) .on_parent_block(client.chain_info().genesis_hash) @@ -965,7 +859,7 @@ async fn archive_storage_diff_main_trie() { #[tokio::test] async fn archive_storage_diff_no_changes() { - let (client, api) = setup_api(MAX_PAGINATION_LIMIT, MAX_QUERIED_LIMIT); + let (client, api) = setup_api(); // Build 2 identical blocks. let mut builder = BlockBuilderBuilder::new(&*client) @@ -1012,7 +906,7 @@ async fn archive_storage_diff_no_changes() { #[tokio::test] async fn archive_storage_diff_deleted_changes() { - let (client, api) = setup_api(MAX_PAGINATION_LIMIT, MAX_QUERIED_LIMIT); + let (client, api) = setup_api(); // Blocks are imported as forks. let mut builder = BlockBuilderBuilder::new(&*client) @@ -1079,7 +973,7 @@ async fn archive_storage_diff_deleted_changes() { #[tokio::test] async fn archive_storage_diff_invalid_params() { let invalid_hash = hex_string(&INVALID_HASH); - let (_, api) = setup_api(MAX_PAGINATION_LIMIT, MAX_QUERIED_LIMIT); + let (_, api) = setup_api(); // Invalid shape for parameters. let items: Vec> = Vec::new(); diff --git a/substrate/client/rpc-spec-v2/src/chain_head/event.rs b/substrate/client/rpc-spec-v2/src/chain_head/event.rs index bd9863060910..de74145a3f08 100644 --- a/substrate/client/rpc-spec-v2/src/chain_head/event.rs +++ b/substrate/client/rpc-spec-v2/src/chain_head/event.rs @@ -235,7 +235,7 @@ pub struct OperationCallDone { pub output: String, } -/// The response of the `chainHead_call` method. +/// The response of the `chainHead_storage` method. #[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] #[serde(rename_all = "camelCase")] pub struct OperationStorageItems { @@ -536,6 +536,7 @@ mod tests { items: vec![StorageResult { key: "0x1".into(), result: StorageResultType::Value("0x123".to_string()), + child_trie_key: None, }], }); diff --git a/substrate/client/rpc-spec-v2/src/common/events.rs b/substrate/client/rpc-spec-v2/src/common/events.rs index 198a60bf4cac..44f722c0c61b 100644 --- a/substrate/client/rpc-spec-v2/src/common/events.rs +++ b/substrate/client/rpc-spec-v2/src/common/events.rs @@ -78,6 +78,10 @@ pub struct StorageResult { /// The result of the query. #[serde(flatten)] pub result: StorageResultType, + /// The child trie key if provided. + #[serde(skip_serializing_if = "Option::is_none")] + #[serde(default)] + pub child_trie_key: Option, } /// The type of the storage query. @@ -105,23 +109,41 @@ pub struct StorageResultErr { /// The result of a storage call. #[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] -#[serde(untagged)] -pub enum ArchiveStorageResult { +#[serde(rename_all = "camelCase")] +#[serde(tag = "event")] +pub enum ArchiveStorageEvent { /// Query generated a result. - Ok(ArchiveStorageMethodOk), + Storage(StorageResult), /// Query encountered an error. - Err(ArchiveStorageMethodErr), + StorageError(ArchiveStorageMethodErr), + /// Operation storage is done. + StorageDone, } -impl ArchiveStorageResult { - /// Create a new `ArchiveStorageResult::Ok` result. - pub fn ok(result: Vec, discarded_items: usize) -> Self { - Self::Ok(ArchiveStorageMethodOk { result, discarded_items }) +impl ArchiveStorageEvent { + /// Create a new `ArchiveStorageEvent::StorageErr` event. + pub fn err(error: String) -> Self { + Self::StorageError(ArchiveStorageMethodErr { error }) } - /// Create a new `ArchiveStorageResult::Err` result. - pub fn err(error: String) -> Self { - Self::Err(ArchiveStorageMethodErr { error }) + /// Create a new `ArchiveStorageEvent::StorageResult` event. + pub fn result(result: StorageResult) -> Self { + Self::Storage(result) + } + + /// Checks if the event is a `StorageDone` event. + pub fn is_done(&self) -> bool { + matches!(self, Self::StorageDone) + } + + /// Checks if the event is a `StorageErr` event. + pub fn is_err(&self) -> bool { + matches!(self, Self::StorageError(_)) + } + + /// Checks if the event is a `StorageResult` event. + pub fn is_result(&self) -> bool { + matches!(self, Self::Storage(_)) } } @@ -354,8 +376,11 @@ mod tests { #[test] fn storage_result() { // Item with Value. - let item = - StorageResult { key: "0x1".into(), result: StorageResultType::Value("res".into()) }; + let item = StorageResult { + key: "0x1".into(), + result: StorageResultType::Value("res".into()), + child_trie_key: None, + }; // Encode let ser = serde_json::to_string(&item).unwrap(); let exp = r#"{"key":"0x1","value":"res"}"#; @@ -365,8 +390,11 @@ mod tests { assert_eq!(dec, item); // Item with Hash. - let item = - StorageResult { key: "0x1".into(), result: StorageResultType::Hash("res".into()) }; + let item = StorageResult { + key: "0x1".into(), + result: StorageResultType::Hash("res".into()), + child_trie_key: None, + }; // Encode let ser = serde_json::to_string(&item).unwrap(); let exp = r#"{"key":"0x1","hash":"res"}"#; @@ -379,6 +407,7 @@ mod tests { let item = StorageResult { key: "0x1".into(), result: StorageResultType::ClosestDescendantMerkleValue("res".into()), + child_trie_key: None, }; // Encode let ser = serde_json::to_string(&item).unwrap(); diff --git a/substrate/client/rpc-spec-v2/src/common/storage.rs b/substrate/client/rpc-spec-v2/src/common/storage.rs index 673e20b2bc78..a1e34d51530e 100644 --- a/substrate/client/rpc-spec-v2/src/common/storage.rs +++ b/substrate/client/rpc-spec-v2/src/common/storage.rs @@ -24,7 +24,7 @@ use sc_client_api::{Backend, ChildInfo, StorageKey, StorageProvider}; use sp_runtime::traits::Block as BlockT; use tokio::sync::mpsc; -use super::events::{StorageResult, StorageResultType}; +use super::events::{StorageQuery, StorageQueryType, StorageResult, StorageResultType}; use crate::hex_string; /// Call into the storage of blocks. @@ -70,9 +70,6 @@ pub enum IterQueryType { /// The result of making a query call. pub type QueryResult = Result, String>; -/// The result of iterating over keys. -pub type QueryIterResult = Result<(Vec, Option), String>; - impl Storage where Block: BlockT + 'static, @@ -97,6 +94,7 @@ where QueryResult::Ok(opt.map(|storage_data| StorageResult { key: hex_string(&key.0), result: StorageResultType::Value(hex_string(&storage_data.0)), + child_trie_key: child_key.map(|c| hex_string(&c.storage_key())), })) }) .unwrap_or_else(|error| QueryResult::Err(error.to_string())) @@ -120,6 +118,7 @@ where QueryResult::Ok(opt.map(|storage_data| StorageResult { key: hex_string(&key.0), result: StorageResultType::Hash(hex_string(&storage_data.as_ref())), + child_trie_key: child_key.map(|c| hex_string(&c.storage_key())), })) }) .unwrap_or_else(|error| QueryResult::Err(error.to_string())) @@ -149,6 +148,7 @@ where StorageResult { key: hex_string(&key.0), result: StorageResultType::ClosestDescendantMerkleValue(result), + child_trie_key: child_key.map(|c| hex_string(&c.storage_key())), } })) }) @@ -199,56 +199,6 @@ where } } - /// Iterate over at most the provided number of keys. - /// - /// Returns the storage result with a potential next key to resume iteration. - pub fn query_iter_pagination( - &self, - query: QueryIter, - hash: Block::Hash, - child_key: Option<&ChildInfo>, - count: usize, - ) -> QueryIterResult { - let QueryIter { ty, query_key, pagination_start_key } = query; - - let mut keys_iter = if let Some(child_key) = child_key { - self.client.child_storage_keys( - hash, - child_key.to_owned(), - Some(&query_key), - pagination_start_key.as_ref(), - ) - } else { - self.client.storage_keys(hash, Some(&query_key), pagination_start_key.as_ref()) - } - .map_err(|err| err.to_string())?; - - let mut ret = Vec::with_capacity(count); - let mut next_pagination_key = None; - for _ in 0..count { - let Some(key) = keys_iter.next() else { break }; - - next_pagination_key = Some(key.clone()); - - let result = match ty { - IterQueryType::Value => self.query_value(hash, &key, child_key), - IterQueryType::Hash => self.query_hash(hash, &key, child_key), - }?; - - if let Some(value) = result { - ret.push(value); - } - } - - // Save the next key if any to continue the iteration. - let maybe_next_query = keys_iter.next().map(|_| QueryIter { - ty, - query_key, - pagination_start_key: next_pagination_key, - }); - Ok((ret, maybe_next_query)) - } - /// Raw iterator over the keys. pub fn raw_keys_iter( &self, @@ -264,3 +214,96 @@ where keys_iter.map_err(|err| err.to_string()) } } + +/// Generates storage events for `chainHead_storage` and `archive_storage` subscriptions. +pub struct StorageSubscriptionClient { + /// Storage client. + client: Storage, + _phandom: PhantomData<(BE, Block)>, +} + +impl Clone for StorageSubscriptionClient { + fn clone(&self) -> Self { + Self { client: self.client.clone(), _phandom: PhantomData } + } +} + +impl StorageSubscriptionClient { + /// Constructs a new [`StorageSubscriptionClient`]. + pub fn new(client: Arc) -> Self { + Self { client: Storage::new(client), _phandom: PhantomData } + } +} + +impl StorageSubscriptionClient +where + Block: BlockT + 'static, + BE: Backend + 'static, + Client: StorageProvider + Send + Sync + 'static, +{ + /// Generate storage events to the provided sender. + pub async fn generate_events( + &mut self, + hash: Block::Hash, + items: Vec>, + child_key: Option, + tx: mpsc::Sender, + ) -> Result<(), tokio::task::JoinError> { + let this = self.clone(); + + tokio::task::spawn_blocking(move || { + for item in items { + match item.query_type { + StorageQueryType::Value => { + let rp = this.client.query_value(hash, &item.key, child_key.as_ref()); + if tx.blocking_send(rp).is_err() { + break; + } + }, + StorageQueryType::Hash => { + let rp = this.client.query_hash(hash, &item.key, child_key.as_ref()); + if tx.blocking_send(rp).is_err() { + break; + } + }, + StorageQueryType::ClosestDescendantMerkleValue => { + let rp = + this.client.query_merkle_value(hash, &item.key, child_key.as_ref()); + if tx.blocking_send(rp).is_err() { + break; + } + }, + StorageQueryType::DescendantsValues => { + let query = QueryIter { + query_key: item.key, + ty: IterQueryType::Value, + pagination_start_key: None, + }; + this.client.query_iter_pagination_with_producer( + query, + hash, + child_key.as_ref(), + &tx, + ) + }, + StorageQueryType::DescendantsHashes => { + let query = QueryIter { + query_key: item.key, + ty: IterQueryType::Hash, + pagination_start_key: None, + }; + this.client.query_iter_pagination_with_producer( + query, + hash, + child_key.as_ref(), + &tx, + ) + }, + } + } + }) + .await?; + + Ok(()) + } +} diff --git a/substrate/client/service/src/builder.rs b/substrate/client/service/src/builder.rs index 027a444012af..a47a05c0a190 100644 --- a/substrate/client/service/src/builder.rs +++ b/substrate/client/service/src/builder.rs @@ -756,8 +756,6 @@ where backend.clone(), genesis_hash, task_executor.clone(), - // Defaults to sensible limits for the `Archive`. - sc_rpc_spec_v2::archive::ArchiveConfig::default(), ) .into_rpc(); rpc_api.merge(archive_v2).map_err(|e| Error::Application(e.into()))?;