Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

chainHead_storage: Iterate over keys #14628

Merged
merged 5 commits into from
Jul 25, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 2 additions & 4 deletions client/rpc-spec-v2/src/chain_head/chain_head.rs
Original file line number Diff line number Diff line change
Expand Up @@ -300,9 +300,7 @@ where
let items = items
.into_iter()
.map(|query| {
if query.queue_type != StorageQueryType::Value &&
query.queue_type != StorageQueryType::Hash
{
if query.query_type == StorageQueryType::ClosestDescendantMerkleValue {
// Note: remove this once all types are implemented.
let _ = sink.reject(ChainHeadRpcError::InvalidParam(
"Storage query type not supported".into(),
Expand All @@ -312,7 +310,7 @@ where

Ok(StorageQuery {
key: StorageKey(parse_hex_param(&mut sink, query.key)?),
queue_type: query.queue_type,
query_type: query.query_type,
})
})
.collect::<Result<Vec<_>, _>>()?;
Expand Down
147 changes: 105 additions & 42 deletions client/rpc-spec-v2/src/chain_head/chain_head_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,18 @@ use super::{
hex_string, ErrorEvent,
};

/// The maximum number of items the `chainHead_storage` can return
/// before paginations is required.
const MAX_ITER_ITEMS: usize = 10;

/// The query type of an interation.
enum IterQueryType {
/// Iterating over (key, value) pairs.
Value,
/// Iterating over (key, hash) pairs.
Hash,
}

/// Generates the events of the `chainHead_storage` method.
pub struct ChainHeadStorage<Client, Block, BE> {
/// Substrate client.
Expand All @@ -58,7 +70,10 @@ fn is_key_queryable(key: &[u8]) -> bool {
}

/// The result of making a query call.
type QueryResult = Result<StorageResult<String>, ChainHeadStorageEvent<String>>;
type QueryResult = Result<Option<StorageResult<String>>, ChainHeadStorageEvent<String>>;

/// The result of iterating over keys.
type QueryIterResult = Result<Vec<StorageResult<String>>, ChainHeadStorageEvent<String>>;

impl<Client, Block, BE> ChainHeadStorage<Client, Block, BE>
where
Expand All @@ -72,7 +87,7 @@ where
hash: Block::Hash,
key: &StorageKey,
child_key: Option<&ChildInfo>,
) -> Option<QueryResult> {
) -> QueryResult {
let result = if let Some(child_key) = child_key {
self.client.child_storage(hash, child_key, key)
} else {
Expand All @@ -81,17 +96,15 @@ where

result
.map(|opt| {
opt.map(|storage_data| {
QueryResult::Ok(StorageResult::<String> {
key: hex_string(&key.0),
result: StorageResultType::Value(hex_string(&storage_data.0)),
})
})
QueryResult::Ok(opt.map(|storage_data| StorageResult::<String> {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: I personally would just use Ok instead of QueryResult::Ok, but I guess that is just personal preference.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(It might be that QueryResult::Ok is used here to convey the type information that comes along with QueryResult :))

key: hex_string(&key.0),
result: StorageResultType::Value(hex_string(&storage_data.0)),
}))
})
.unwrap_or_else(|err| {
Some(QueryResult::Err(ChainHeadStorageEvent::<String>::Error(ErrorEvent {
QueryResult::Err(ChainHeadStorageEvent::<String>::Error(ErrorEvent {
error: err.to_string(),
})))
}))
})
}

Expand All @@ -101,7 +114,7 @@ where
hash: Block::Hash,
key: &StorageKey,
child_key: Option<&ChildInfo>,
) -> Option<QueryResult> {
) -> QueryResult {
let result = if let Some(child_key) = child_key {
self.client.child_storage_hash(hash, child_key, key)
} else {
Expand All @@ -110,36 +123,49 @@ where

result
.map(|opt| {
opt.map(|storage_data| {
QueryResult::Ok(StorageResult::<String> {
key: hex_string(&key.0),
result: StorageResultType::Hash(hex_string(&storage_data.as_ref())),
})
})
QueryResult::Ok(opt.map(|storage_data| StorageResult::<String> {
key: hex_string(&key.0),
result: StorageResultType::Hash(hex_string(&storage_data.as_ref())),
}))
})
.unwrap_or_else(|err| {
Some(QueryResult::Err(ChainHeadStorageEvent::<String>::Error(ErrorEvent {
QueryResult::Err(ChainHeadStorageEvent::<String>::Error(ErrorEvent {
error: err.to_string(),
})))
}))
})
}

/// Make the storage query.
fn query_storage(
/// Handle iterating over (key, value) or (key, hash) pairs.
fn query_storage_iter(
&self,
hash: Block::Hash,
query: &StorageQuery<StorageKey>,
key: &StorageKey,
child_key: Option<&ChildInfo>,
) -> Option<QueryResult> {
if !is_key_queryable(&query.key.0) {
return None
ty: IterQueryType,
) -> QueryIterResult {
let keys_iter = if let Some(child_key) = child_key {
self.client.child_storage_keys(hash, child_key.to_owned(), Some(key), None)
} else {
self.client.storage_keys(hash, Some(key), None)
}

match query.queue_type {
StorageQueryType::Value => self.query_storage_value(hash, &query.key, child_key),
StorageQueryType::Hash => self.query_storage_hash(hash, &query.key, child_key),
_ => None,
.map_err(|err| {
ChainHeadStorageEvent::<String>::Error(ErrorEvent { error: err.to_string() })
})?;

let mut ret = Vec::with_capacity(MAX_ITER_ITEMS);
let mut keys_iter = keys_iter.take(MAX_ITER_ITEMS);
while let Some(key) = keys_iter.next() {
let result = match ty {
IterQueryType::Value => self.query_storage_value(hash, &key, child_key),
IterQueryType::Hash => self.query_storage_hash(hash, &key, child_key),
}?;

if let Some(result) = result {
ret.push(result);
}
}

QueryIterResult::Ok(ret)
}

/// Generate the block events for the `chainHead_storage` method.
Expand All @@ -159,19 +185,56 @@ where

let mut storage_results = Vec::with_capacity(items.len());
for item in items {
let Some(result) = self.query_storage(hash, &item, child_key.as_ref()) else {
continue
};

match result {
QueryResult::Ok(storage_result) => storage_results.push(storage_result),
QueryResult::Err(event) => {
let _ = sink.send(&event);
// If an error is encountered for any of the query items
// do not produce any other events.
return
},
if !is_key_queryable(&item.key.0) {
continue
}

match item.query_type {
StorageQueryType::Value => {
match self.query_storage_value(hash, &item.key, child_key.as_ref()) {
Ok(Some(value)) => storage_results.push(value),
Ok(None) => continue,
Err(err) => {
let _ = sink.send(&err);
return
},
}
},
StorageQueryType::Hash =>
match self.query_storage_hash(hash, &item.key, child_key.as_ref()) {
Ok(Some(value)) => storage_results.push(value),
Ok(None) => continue,
Err(err) => {
let _ = sink.send(&err);
return
},
},
StorageQueryType::DescendantsValues => match self.query_storage_iter(
hash,
&item.key,
child_key.as_ref(),
IterQueryType::Value,
) {
Ok(values) => storage_results.extend(values),
Err(err) => {
let _ = sink.send(&err);
return
},
},
StorageQueryType::DescendantsHashes => match self.query_storage_iter(
hash,
&item.key,
child_key.as_ref(),
IterQueryType::Hash,
) {
Ok(values) => storage_results.extend(values),
Err(err) => {
let _ = sink.send(&err);
return
},
},
_ => continue,
};
}

if !storage_results.is_empty() {
Expand Down
12 changes: 6 additions & 6 deletions client/rpc-spec-v2/src/chain_head/event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -249,7 +249,7 @@ pub struct StorageQuery<Key> {
pub key: Key,
/// The type of the storage query.
#[serde(rename = "type")]
pub queue_type: StorageQueryType,
pub query_type: StorageQueryType,
}

/// The type of the storage query.
Expand Down Expand Up @@ -558,7 +558,7 @@ mod tests {
#[test]
fn chain_head_storage_query() {
// Item with Value.
let item = StorageQuery { key: "0x1", queue_type: StorageQueryType::Value };
let item = StorageQuery { key: "0x1", query_type: StorageQueryType::Value };
// Encode
let ser = serde_json::to_string(&item).unwrap();
let exp = r#"{"key":"0x1","type":"value"}"#;
Expand All @@ -568,7 +568,7 @@ mod tests {
assert_eq!(dec, item);

// Item with Hash.
let item = StorageQuery { key: "0x1", queue_type: StorageQueryType::Hash };
let item = StorageQuery { key: "0x1", query_type: StorageQueryType::Hash };
// Encode
let ser = serde_json::to_string(&item).unwrap();
let exp = r#"{"key":"0x1","type":"hash"}"#;
Expand All @@ -578,7 +578,7 @@ mod tests {
assert_eq!(dec, item);

// Item with DescendantsValues.
let item = StorageQuery { key: "0x1", queue_type: StorageQueryType::DescendantsValues };
let item = StorageQuery { key: "0x1", query_type: StorageQueryType::DescendantsValues };
// Encode
let ser = serde_json::to_string(&item).unwrap();
let exp = r#"{"key":"0x1","type":"descendants-values"}"#;
Expand All @@ -588,7 +588,7 @@ mod tests {
assert_eq!(dec, item);

// Item with DescendantsHashes.
let item = StorageQuery { key: "0x1", queue_type: StorageQueryType::DescendantsHashes };
let item = StorageQuery { key: "0x1", query_type: StorageQueryType::DescendantsHashes };
// Encode
let ser = serde_json::to_string(&item).unwrap();
let exp = r#"{"key":"0x1","type":"descendants-hashes"}"#;
Expand All @@ -599,7 +599,7 @@ mod tests {

// Item with Merkle.
let item =
StorageQuery { key: "0x1", queue_type: StorageQueryType::ClosestDescendantMerkleValue };
StorageQuery { key: "0x1", query_type: StorageQueryType::ClosestDescendantMerkleValue };
// Encode
let ser = serde_json::to_string(&item).unwrap();
let exp = r#"{"key":"0x1","type":"closest-descendant-merkle-value"}"#;
Expand Down
Loading