-
Notifications
You must be signed in to change notification settings - Fork 775
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
archive: Refactor archive_storage
method into subscription
#6483
Merged
Merged
Changes from 80 commits
Commits
Show all changes
81 commits
Select commit
Hold shift + click to select a range
e17a555
archive/api: Add stroage diff method
lexnv 4101ee6
storage/events: Add storage events and test serialization
lexnv d781d67
archive: Add initial implementation for storage diff
lexnv 60e1a93
archive: Handle modified and delete cases
lexnv 63da031
archive/events: Derive more impl for diff items
lexnv 25b9e5a
archive: Deduplicate input keys
lexnv 52cfaf7
archive: Move code to different module
lexnv ee6bed7
archive: Fetch value / hash or both
lexnv 8948e74
events: Add subscription events for ArchiveStorageDiffEvent
lexnv fbfb0dc
archive: Make archive_storageDiff a subscription
lexnv f7b33ed
archive: Modify storageDiff to leverage subscription backpressure
lexnv f47db36
archive: Deduplicate items to a separate function
lexnv 97546a4
archive/tests: Check deduplication
lexnv baa3dd0
archive: Fix deduplication shortest key
lexnv 4683f6b
archive/tests: Check complex deduplication
lexnv 36f59d2
archive: Simplify logic of trie iteration
lexnv a084f47
archive: Improve documentation
lexnv 77cac50
archive/events: Add derive(Eq) to archive storage diff events
lexnv e2324a4
archive/tests: Check query for main trie under prefix
lexnv 4b05913
archive: Add trace and debug logs for storage diff
lexnv 83cb9b4
archive: Send StorageDiffDone for partial trie queries
lexnv 85c810e
archive/tests: Check no changes between blocks
lexnv 152d13b
archive/tests: Ensure added key
lexnv 4f549bf
archive/tests: Extend test with interleaved values and hashes
lexnv d208068
rpc-v2: Use indexmap crate
lexnv 369a4d2
archive: Move prevHash as last parameter to archive diff
lexnv a02cc84
archive: Preserve order of elements with indexMap
lexnv 637b446
archive/tests: Check deleted keys
lexnv 76efb18
events: Add common wrappers for construction
lexnv ca1fa20
archive: Propagate errors via events
lexnv 03eefd7
archive: Check invalid parameters and events
lexnv 9a1e792
archive: Fix clippy
lexnv dbbfce1
Merge remote-tracking branch 'origin/master' into lexnv/storage-diff
lexnv 971026d
Add prdoc
lexnv 2880058
Merge branch 'master' into lexnv/storage-diff
lexnv b774855
archive/storage: Optimize space used for saved keys
lexnv 838504b
Merge remote-tracking branch 'origin/lexnv/storage-diff' into lexnv/s…
lexnv 60abd9b
Merge remote-tracking branch 'origin/master' into lexnv/storage-diff
lexnv dc5f001
archive: Propagate all queries to handle_trie_queries
lexnv cdc4dc5
archive: Simplify process_events
lexnv de7cf79
archive: Add missing line
lexnv 199b46d
archive: Refactor with FetchedStorage enum
lexnv 3f4a4b8
Merge remote-tracking branch 'origin/master' into lexnv/storage-diff
lexnv f4ace06
archive: Refactor for optimal iteration cycles
lexnv 55489c7
archive: Adjust code comments
lexnv 4f94890
archive: Rename starts_with to belongs_to_query
lexnv a7e54a5
Update prdoc
lexnv 5341227
Merge remote-tracking branch 'origin/master' into lexnv/storage-diff
lexnv 25728a1
common/events: Transform StorageResult into StorageEvent
lexnv 8860e7e
archive/api: Make archive_storage a subscription
lexnv 533ecb9
rpc-v2: Rename ArchiveResult into ArchiveEvent
lexnv 0849e8e
events: Add wrappers for ArchiveStorageEvent
lexnv a2f92b6
common/storage: Introduce a common storage subcription handler
lexnv 7fec81a
archive/api: Remove pagination items from archive storage
lexnv b6523d2
common/events: Add constructor methods for clarity
lexnv bb65583
archive: Remove ArchiveStorage in favour of common archive object
lexnv 1917672
archive: Remove config for storage parameters and use backpressure
lexnv 702f521
archive/storage: Adjust testing to subscription based approach
lexnv 751d8ff
Merge remote-tracking branch 'origin/master' into lexnv/storage-diff
lexnv 3c2bb7c
archive: Remove variable
lexnv 8f8c0c0
Update cargo lock
lexnv 07a459c
archive: Mode deduplicate_storage_diff_items to inenr methods
lexnv a151bf3
archive/tests: Ensure other storage entries are not provided
lexnv a575b4a
archive/diff: Implement lexicographic_diff
lexnv caa75cc
archive/tests: Check lexicographic diff
lexnv 7ff0bdb
Merge remote-tracking branch 'origin/master' into lexnv/storage-diff
lexnv c6e40a0
archive: Remove commented code and add documentation
lexnv a07b5c4
Update substrate/client/rpc-spec-v2/src/archive/api.rs
lexnv ae00327
archive: Remove unneeded lifetime
lexnv fd48434
Merge remote-tracking branch 'origin/lexnv/storage-diff' into lexnv/s…
lexnv 736a5ad
Merge remote-tracking branch 'origin/lexnv/storage-diff' into lexnv/s…
lexnv 791831f
archive: Check sink closed
lexnv 3ddaf2d
archive: Refactor process methods
lexnv 774b525
archive/tests: Adjust to new assumptions wrt StorageDone
lexnv 4147442
archive: Rename StorageResult to Storage
lexnv c0f5ded
storage/events: Include child trie key
lexnv 39aab7a
archive/tests: Adjust testing
lexnv b14a4bd
archive/tests: Check childtrie results
lexnv 9676499
Merge remote-tracking branch 'origin/master' into lexnv/storage-sub
lexnv ff09c7e
archive/tests: Fix wrong merge
lexnv 087f0e7
archive/events: Align with spec
lexnv File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -20,13 +20,13 @@ | |
|
||
use crate::{ | ||
archive::{ | ||
archive_storage::{ArchiveStorage, ArchiveStorageDiff}, | ||
error::Error as ArchiveError, | ||
ArchiveApiServer, | ||
archive_storage::ArchiveStorageDiff, error::Error as ArchiveError, ArchiveApiServer, | ||
}, | ||
common::events::{ | ||
ArchiveStorageDiffEvent, ArchiveStorageDiffItem, ArchiveStorageResult, | ||
PaginatedStorageQuery, | ||
common::{ | ||
events::{ | ||
ArchiveStorageDiffEvent, ArchiveStorageDiffItem, ArchiveStorageEvent, StorageQuery, | ||
}, | ||
storage::{QueryResult, StorageSubscriptionClient}, | ||
}, | ||
hex_string, MethodResult, SubscriptionTaskExecutor, | ||
}; | ||
|
@@ -57,42 +57,12 @@ use tokio::sync::mpsc; | |
|
||
pub(crate) const LOG_TARGET: &str = "rpc-spec-v2::archive"; | ||
|
||
/// The configuration of [`Archive`]. | ||
pub struct ArchiveConfig { | ||
/// The maximum number of items the `archive_storage` can return for a descendant query before | ||
/// pagination is required. | ||
pub max_descendant_responses: usize, | ||
/// The maximum number of queried items allowed for the `archive_storage` at a time. | ||
pub max_queried_items: usize, | ||
} | ||
|
||
/// The maximum number of items the `archive_storage` can return for a descendant query before | ||
/// pagination is required. | ||
/// | ||
/// Note: this is identical to the `chainHead` value. | ||
const MAX_DESCENDANT_RESPONSES: usize = 5; | ||
|
||
/// The maximum number of queried items allowed for the `archive_storage` at a time. | ||
/// | ||
/// Note: A queried item can also be a descendant query which can return up to | ||
/// `MAX_DESCENDANT_RESPONSES`. | ||
const MAX_QUERIED_ITEMS: usize = 8; | ||
|
||
/// The buffer capacity for each storage query. | ||
/// | ||
/// This is small because the underlying JSON-RPC server has | ||
/// its down buffer capacity per connection as well. | ||
const STORAGE_QUERY_BUF: usize = 16; | ||
|
||
impl Default for ArchiveConfig { | ||
fn default() -> Self { | ||
Self { | ||
max_descendant_responses: MAX_DESCENDANT_RESPONSES, | ||
max_queried_items: MAX_QUERIED_ITEMS, | ||
} | ||
} | ||
} | ||
|
||
/// An API for archive RPC calls. | ||
pub struct Archive<BE: Backend<Block>, Block: BlockT, Client> { | ||
/// Substrate client. | ||
|
@@ -103,11 +73,6 @@ pub struct Archive<BE: Backend<Block>, Block: BlockT, Client> { | |
executor: SubscriptionTaskExecutor, | ||
/// The hexadecimal encoded hash of the genesis block. | ||
genesis_hash: String, | ||
/// The maximum number of items the `archive_storage` can return for a descendant query before | ||
/// pagination is required. | ||
storage_max_descendant_responses: usize, | ||
/// The maximum number of queried items allowed for the `archive_storage` at a time. | ||
storage_max_queried_items: usize, | ||
/// Phantom member to pin the block type. | ||
_phantom: PhantomData<Block>, | ||
} | ||
|
@@ -119,18 +84,9 @@ impl<BE: Backend<Block>, Block: BlockT, Client> Archive<BE, Block, Client> { | |
backend: Arc<BE>, | ||
genesis_hash: GenesisHash, | ||
executor: SubscriptionTaskExecutor, | ||
config: ArchiveConfig, | ||
) -> Self { | ||
let genesis_hash = hex_string(&genesis_hash.as_ref()); | ||
Self { | ||
client, | ||
backend, | ||
executor, | ||
genesis_hash, | ||
storage_max_descendant_responses: config.max_descendant_responses, | ||
storage_max_queried_items: config.max_queried_items, | ||
_phantom: PhantomData, | ||
} | ||
Self { client, backend, executor, genesis_hash, _phantom: PhantomData } | ||
} | ||
} | ||
|
||
|
@@ -260,47 +216,53 @@ where | |
|
||
fn archive_unstable_storage( | ||
&self, | ||
pending: PendingSubscriptionSink, | ||
hash: Block::Hash, | ||
items: Vec<PaginatedStorageQuery<String>>, | ||
items: Vec<StorageQuery<String>>, | ||
child_trie: Option<String>, | ||
) -> RpcResult<ArchiveStorageResult> { | ||
let items = items | ||
.into_iter() | ||
.map(|query| { | ||
let key = StorageKey(parse_hex_param(query.key)?); | ||
let pagination_start_key = query | ||
.pagination_start_key | ||
.map(|key| parse_hex_param(key).map(|key| StorageKey(key))) | ||
.transpose()?; | ||
|
||
// Paginated start key is only supported | ||
if pagination_start_key.is_some() && !query.query_type.is_descendant_query() { | ||
return Err(ArchiveError::InvalidParam( | ||
"Pagination start key is only supported for descendants queries" | ||
.to_string(), | ||
)) | ||
} | ||
) { | ||
let mut storage_client = | ||
StorageSubscriptionClient::<Client, Block, BE>::new(self.client.clone()); | ||
|
||
let fut = async move { | ||
let Ok(mut sink) = pending.accept().await.map(Subscription::from) else { return }; | ||
|
||
Ok(PaginatedStorageQuery { | ||
key, | ||
query_type: query.query_type, | ||
pagination_start_key, | ||
let items = match items | ||
.into_iter() | ||
.map(|query| { | ||
let key = StorageKey(parse_hex_param(query.key)?); | ||
Ok(StorageQuery { key, query_type: query.query_type }) | ||
}) | ||
}) | ||
.collect::<Result<Vec<_>, ArchiveError>>()?; | ||
.collect::<Result<Vec<_>, ArchiveError>>() | ||
{ | ||
Ok(items) => items, | ||
Err(error) => { | ||
let _ = sink.send(&ArchiveStorageEvent::err(error.to_string())); | ||
return | ||
}, | ||
}; | ||
|
||
let child_trie = child_trie | ||
.map(|child_trie| parse_hex_param(child_trie)) | ||
.transpose()? | ||
.map(ChildInfo::new_default_from_vec); | ||
let child_trie = child_trie.map(|child_trie| parse_hex_param(child_trie)).transpose(); | ||
let child_trie = match child_trie { | ||
Ok(child_trie) => child_trie.map(ChildInfo::new_default_from_vec), | ||
Err(error) => { | ||
let _ = sink.send(&ArchiveStorageEvent::err(error.to_string())); | ||
return | ||
}, | ||
}; | ||
|
||
let storage_client = ArchiveStorage::new( | ||
self.client.clone(), | ||
self.storage_max_descendant_responses, | ||
self.storage_max_queried_items, | ||
); | ||
let (tx, mut rx) = tokio::sync::mpsc::channel(STORAGE_QUERY_BUF); | ||
let storage_fut = storage_client.generate_events(hash, items, child_trie, tx); | ||
|
||
Ok(storage_client.handle_query(hash, items, child_trie)) | ||
// We don't care about the return value of this join: | ||
// - process_events might encounter an error (if the client disconnected) | ||
// - storage_fut might encounter an error while processing a trie queries and | ||
// the error is propagated via the sink. | ||
let _ = futures::future::join(storage_fut, process_storage_events(&mut rx, &mut sink)) | ||
.await; | ||
}; | ||
|
||
self.executor.spawn("substrate-rpc-subscription", Some("rpc"), fut.boxed()); | ||
} | ||
|
||
fn archive_unstable_storage_diff( | ||
|
@@ -337,24 +299,74 @@ where | |
// - process_events might encounter an error (if the client disconnected) | ||
// - storage_fut might encounter an error while processing a trie queries and | ||
// the error is propagated via the sink. | ||
let _ = futures::future::join(storage_fut, process_events(&mut rx, &mut sink)).await; | ||
let _ = | ||
futures::future::join(storage_fut, process_storage_diff_events(&mut rx, &mut sink)) | ||
.await; | ||
}; | ||
|
||
self.executor.spawn("substrate-rpc-subscription", Some("rpc"), fut.boxed()); | ||
} | ||
} | ||
|
||
/// Sends all the events to the sink. | ||
async fn process_events(rx: &mut mpsc::Receiver<ArchiveStorageDiffEvent>, sink: &mut Subscription) { | ||
while let Some(event) = rx.recv().await { | ||
if event.is_done() { | ||
log::debug!(target: LOG_TARGET, "Finished processing partial trie query"); | ||
} else if event.is_err() { | ||
log::debug!(target: LOG_TARGET, "Error encountered while processing partial trie query"); | ||
/// Sends all the events of the storage_diff method to the sink. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. A little annoying to have two different functions for It's fine |
||
async fn process_storage_diff_events( | ||
rx: &mut mpsc::Receiver<ArchiveStorageDiffEvent>, | ||
sink: &mut Subscription, | ||
) { | ||
loop { | ||
tokio::select! { | ||
_ = sink.closed() => { | ||
return | ||
}, | ||
|
||
maybe_event = rx.recv() => { | ||
let Some(event) = maybe_event else { | ||
break; | ||
}; | ||
|
||
if event.is_done() { | ||
log::debug!(target: LOG_TARGET, "Finished processing partial trie query"); | ||
} else if event.is_err() { | ||
log::debug!(target: LOG_TARGET, "Error encountered while processing partial trie query"); | ||
} | ||
|
||
if sink.send(&event).await.is_err() { | ||
return | ||
} | ||
} | ||
} | ||
} | ||
} | ||
|
||
/// Sends all the events of the storage method to the sink. | ||
async fn process_storage_events(rx: &mut mpsc::Receiver<QueryResult>, sink: &mut Subscription) { | ||
loop { | ||
tokio::select! { | ||
_ = sink.closed() => { | ||
break | ||
} | ||
|
||
maybe_storage = rx.recv() => { | ||
let Some(event) = maybe_storage else { | ||
break; | ||
}; | ||
|
||
match event { | ||
Ok(None) => continue, | ||
|
||
Ok(Some(event)) => | ||
if sink.send(&ArchiveStorageEvent::result(event)).await.is_err() { | ||
return | ||
}, | ||
|
||
if sink.send(&event).await.is_err() { | ||
return | ||
Err(error) => { | ||
let _ = sink.send(&ArchiveStorageEvent::err(error)).await; | ||
return | ||
} | ||
} | ||
} | ||
} | ||
} | ||
|
||
let _ = sink.send(&ArchiveStorageEvent::StorageDone).await; | ||
} |
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
woo 🥇