Skip to content

Commit

Permalink
chainHead: Support multiple hashes for chainHead_unpin method (#2295)
Browse files Browse the repository at this point in the history
This PR adds support for multiple hashes being passed to the
`chainHeda_unpin` parameters.

The `hash` parameter is renamed to `hash_or_hashes` per
paritytech/json-rpc-interface-spec#111.

While at it, a new integration test is added to check the unpinning of
multiple hashes. The API is checked against a hash or a vector of
hashes.

cc @paritytech/subxt-team

---------

Signed-off-by: Alexandru Vasile <[email protected]>
  • Loading branch information
lexnv authored Nov 14, 2023
1 parent b70d418 commit cfe5e62
Show file tree
Hide file tree
Showing 7 changed files with 224 additions and 29 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions substrate/client/rpc-spec-v2/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ sc-transaction-pool-api = { path = "../transaction-pool/api" }
sp-core = { path = "../../primitives/core" }
sp-runtime = { path = "../../primitives/runtime" }
sp-api = { path = "../../primitives/api" }
sp-rpc = { path = "../../primitives/rpc" }
sp-blockchain = { path = "../../primitives/blockchain" }
sp-version = { path = "../../primitives/version" }
sc-client-api = { path = "../api" }
Expand Down
11 changes: 9 additions & 2 deletions substrate/client/rpc-spec-v2/src/chain_head/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
//! API trait of the chain head.
use crate::chain_head::event::{FollowEvent, MethodResponse, StorageQuery};
use jsonrpsee::{core::RpcResult, proc_macros::rpc};
use sp_rpc::list::ListOrValue;

#[rpc(client, server)]
pub trait ChainHeadApi<Hash> {
Expand Down Expand Up @@ -109,16 +110,22 @@ pub trait ChainHeadApi<Hash> {
call_parameters: String,
) -> RpcResult<MethodResponse>;

/// Unpin a block reported by the `follow` method.
/// Unpin a block or multiple blocks reported by the `follow` method.
///
/// Ongoing operations that require the provided block
/// will continue normally.
///
/// When this method returns an error, it is guaranteed that no blocks have been unpinned.
///
/// # Unstable
///
/// This method is unstable and subject to change in the future.
#[method(name = "chainHead_unstable_unpin", blocking)]
fn chain_head_unstable_unpin(&self, follow_subscription: String, hash: Hash) -> RpcResult<()>;
fn chain_head_unstable_unpin(
&self,
follow_subscription: String,
hash_or_hashes: ListOrValue<Hash>,
) -> RpcResult<()>;

/// Resumes a storage fetch started with `chainHead_storage` after it has generated an
/// `operationWaitingForContinue` event.
Expand Down
12 changes: 10 additions & 2 deletions substrate/client/rpc-spec-v2/src/chain_head/chain_head.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ use sc_client_api::{
use sp_api::CallApiAt;
use sp_blockchain::{Error as BlockChainError, HeaderBackend, HeaderMetadata};
use sp_core::{traits::CallContext, Bytes};
use sp_rpc::list::ListOrValue;
use sp_runtime::traits::Block as BlockT;
use std::{marker::PhantomData, sync::Arc, time::Duration};

Expand Down Expand Up @@ -432,9 +433,16 @@ where
fn chain_head_unstable_unpin(
&self,
follow_subscription: String,
hash: Block::Hash,
hash_or_hashes: ListOrValue<Block::Hash>,
) -> RpcResult<()> {
match self.subscriptions.unpin_block(&follow_subscription, hash) {
let result = match hash_or_hashes {
ListOrValue::Value(hash) =>
self.subscriptions.unpin_blocks(&follow_subscription, [hash]),
ListOrValue::List(hashes) =>
self.subscriptions.unpin_blocks(&follow_subscription, hashes),
};

match result {
Ok(()) => Ok(()),
Err(SubscriptionManagementError::SubscriptionAbsent) => {
// Invalid invalid subscription ID.
Expand Down
38 changes: 26 additions & 12 deletions substrate/client/rpc-spec-v2/src/chain_head/subscription/inner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -750,22 +750,36 @@ impl<Block: BlockT, BE: Backend<Block>> SubscriptionsInner<Block, BE> {
}
}

pub fn unpin_block(
pub fn unpin_blocks(
&mut self,
sub_id: &str,
hash: Block::Hash,
hashes: impl IntoIterator<Item = Block::Hash> + Clone,
) -> Result<(), SubscriptionManagementError> {
let Some(sub) = self.subs.get_mut(sub_id) else {
return Err(SubscriptionManagementError::SubscriptionAbsent)
};

// Check that unpin was not called before and the block was pinned
// for this subscription.
if !sub.unregister_block(hash) {
return Err(SubscriptionManagementError::BlockHashAbsent)
// Ensure that all blocks are part of the subscription before removing individual
// blocks.
for hash in hashes.clone() {
if !sub.contains_block(hash) {
return Err(SubscriptionManagementError::BlockHashAbsent);
}
}

// Note: this needs to be separate from the global mappings to avoid barrow checker
// thinking we borrow `&mut self` twice: once from `self.subs.get_mut` and once from
// `self.global_unregister_block`. Although the borrowing is correct, since different
// fields of the structure are borrowed, one at a time.
for hash in hashes.clone() {
sub.unregister_block(hash);
}

// Block have been removed from the subscription. Remove them from the global tracking.
for hash in hashes {
self.global_unregister_block(hash);
}

self.global_unregister_block(hash);
Ok(())
}

Expand Down Expand Up @@ -1029,11 +1043,11 @@ mod tests {
assert_eq!(block.has_runtime(), true);

let invalid_id = "abc-invalid".to_string();
let err = subs.unpin_block(&invalid_id, hash).unwrap_err();
let err = subs.unpin_blocks(&invalid_id, vec![hash]).unwrap_err();
assert_eq!(err, SubscriptionManagementError::SubscriptionAbsent);

// Unpin the block.
subs.unpin_block(&id, hash).unwrap();
subs.unpin_blocks(&id, vec![hash]).unwrap();
let err = subs.lock_block(&id, hash, 1).unwrap_err();
assert_eq!(err, SubscriptionManagementError::BlockHashAbsent);
}
Expand Down Expand Up @@ -1077,13 +1091,13 @@ mod tests {
// Ensure the block propagated to the subscription.
subs.subs.get(&id_second).unwrap().blocks.get(&hash).unwrap();

subs.unpin_block(&id, hash).unwrap();
subs.unpin_blocks(&id, vec![hash]).unwrap();
assert_eq!(*subs.global_blocks.get(&hash).unwrap(), 1);
// Cannot unpin a block twice for the same subscription.
let err = subs.unpin_block(&id, hash).unwrap_err();
let err = subs.unpin_blocks(&id, vec![hash]).unwrap_err();
assert_eq!(err, SubscriptionManagementError::BlockHashAbsent);

subs.unpin_block(&id_second, hash).unwrap();
subs.unpin_blocks(&id_second, vec![hash]).unwrap();
// Block unregistered from the memory.
assert!(subs.global_blocks.get(&hash).is_none());
}
Expand Down
17 changes: 9 additions & 8 deletions substrate/client/rpc-spec-v2/src/chain_head/subscription/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,22 +94,23 @@ impl<Block: BlockT, BE: Backend<Block>> SubscriptionManagement<Block, BE> {
inner.pin_block(sub_id, hash)
}

/// Unpin the block from the subscription.
/// Unpin the blocks from the subscription.
///
/// The last subscription that unpins the block is also unpinning the block
/// from the backend.
/// Blocks are reference counted and when the last subscription unpins a given block, the block
/// is also unpinned from the backend.
///
/// This method is called only once per subscription.
///
/// Returns an error if the block is not pinned for the subscription or
/// the subscription ID is invalid.
pub fn unpin_block(
/// Returns an error if the subscription ID is invalid, or any of the blocks are not pinned
/// for the subscriptions. When an error is returned, it is guaranteed that no blocks have
/// been unpinned.
pub fn unpin_blocks(
&self,
sub_id: &str,
hash: Block::Hash,
hashes: impl IntoIterator<Item = Block::Hash> + Clone,
) -> Result<(), SubscriptionManagementError> {
let mut inner = self.inner.write();
inner.unpin_block(sub_id, hash)
inner.unpin_blocks(sub_id, hashes)
}

/// Ensure the block remains pinned until the return object is dropped.
Expand Down
173 changes: 168 additions & 5 deletions substrate/client/rpc-spec-v2/src/chain_head/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1591,22 +1591,28 @@ async fn follow_with_unpin() {
// Unpin an invalid subscription ID must return Ok(()).
let invalid_hash = hex_string(&INVALID_HASH);
let _res: () = api
.call("chainHead_unstable_unpin", ["invalid_sub_id", &invalid_hash])
.call("chainHead_unstable_unpin", rpc_params!["invalid_sub_id", &invalid_hash])
.await
.unwrap();

// Valid subscription with invalid block hash.
let invalid_hash = hex_string(&INVALID_HASH);
let err = api
.call::<_, serde_json::Value>("chainHead_unstable_unpin", [&sub_id, &invalid_hash])
.call::<_, serde_json::Value>(
"chainHead_unstable_unpin",
rpc_params![&sub_id, &invalid_hash],
)
.await
.unwrap_err();
assert_matches!(err,
Error::Call(CallError::Custom(ref err)) if err.code() == 2001 && err.message() == "Invalid block hash"
);

// To not exceed the number of pinned blocks, we need to unpin before the next import.
let _res: () = api.call("chainHead_unstable_unpin", [&sub_id, &block_hash]).await.unwrap();
let _res: () = api
.call("chainHead_unstable_unpin", rpc_params![&sub_id, &block_hash])
.await
.unwrap();

// Block tree:
// finalized_block -> block -> block2
Expand Down Expand Up @@ -1645,6 +1651,160 @@ async fn follow_with_unpin() {
assert!(sub.next::<FollowEvent<String>>().await.is_none());
}

#[tokio::test]
async fn follow_with_multiple_unpin_hashes() {
let builder = TestClientBuilder::new();
let backend = builder.backend();
let mut client = Arc::new(builder.build());

let api = ChainHead::new(
client.clone(),
backend,
Arc::new(TaskExecutor::default()),
CHAIN_GENESIS,
ChainHeadConfig {
global_max_pinned_blocks: MAX_PINNED_BLOCKS,
subscription_max_pinned_duration: Duration::from_secs(MAX_PINNED_SECS),
subscription_max_ongoing_operations: MAX_OPERATIONS,
operation_max_storage_items: MAX_PAGINATION_LIMIT,
},
)
.into_rpc();

let mut sub = api.subscribe("chainHead_unstable_follow", [false]).await.unwrap();
let sub_id = sub.subscription_id();
let sub_id = serde_json::to_string(&sub_id).unwrap();

// Import 3 blocks.
let block_1 = BlockBuilderBuilder::new(&*client)
.on_parent_block(client.chain_info().genesis_hash)
.with_parent_block_number(0)
.build()
.unwrap()
.build()
.unwrap()
.block;
let block_1_hash = block_1.header.hash();
client.import(BlockOrigin::Own, block_1.clone()).await.unwrap();

let block_2 = BlockBuilderBuilder::new(&*client)
.on_parent_block(block_1.hash())
.with_parent_block_number(1)
.build()
.unwrap()
.build()
.unwrap()
.block;
let block_2_hash = block_2.header.hash();
client.import(BlockOrigin::Own, block_2.clone()).await.unwrap();

let block_3 = BlockBuilderBuilder::new(&*client)
.on_parent_block(block_2.hash())
.with_parent_block_number(2)
.build()
.unwrap()
.build()
.unwrap()
.block;
let block_3_hash = block_3.header.hash();
client.import(BlockOrigin::Own, block_3.clone()).await.unwrap();

// Ensure the imported block is propagated and pinned for this subscription.
assert_matches!(
get_next_event::<FollowEvent<String>>(&mut sub).await,
FollowEvent::Initialized(_)
);
assert_matches!(
get_next_event::<FollowEvent<String>>(&mut sub).await,
FollowEvent::NewBlock(_)
);
assert_matches!(
get_next_event::<FollowEvent<String>>(&mut sub).await,
FollowEvent::BestBlockChanged(_)
);
assert_matches!(
get_next_event::<FollowEvent<String>>(&mut sub).await,
FollowEvent::NewBlock(_)
);
assert_matches!(
get_next_event::<FollowEvent<String>>(&mut sub).await,
FollowEvent::BestBlockChanged(_)
);
assert_matches!(
get_next_event::<FollowEvent<String>>(&mut sub).await,
FollowEvent::NewBlock(_)
);
assert_matches!(
get_next_event::<FollowEvent<String>>(&mut sub).await,
FollowEvent::BestBlockChanged(_)
);

// Unpin an invalid subscription ID must return Ok(()).
let invalid_hash = hex_string(&INVALID_HASH);
let _res: () = api
.call("chainHead_unstable_unpin", rpc_params!["invalid_sub_id", &invalid_hash])
.await
.unwrap();

// Valid subscription with invalid block hash.
let err = api
.call::<_, serde_json::Value>(
"chainHead_unstable_unpin",
rpc_params![&sub_id, &invalid_hash],
)
.await
.unwrap_err();
assert_matches!(err,
Error::Call(CallError::Custom(ref err)) if err.code() == 2001 && err.message() == "Invalid block hash"
);

let _res: () = api
.call("chainHead_unstable_unpin", rpc_params![&sub_id, &block_1_hash])
.await
.unwrap();

// One block hash is invalid. Block 1 is already unpinned.
let err = api
.call::<_, serde_json::Value>(
"chainHead_unstable_unpin",
rpc_params![&sub_id, vec![&block_1_hash, &block_2_hash, &block_3_hash]],
)
.await
.unwrap_err();
assert_matches!(err,
Error::Call(CallError::Custom(ref err)) if err.code() == 2001 && err.message() == "Invalid block hash"
);

// Unpin multiple blocks.
let _res: () = api
.call("chainHead_unstable_unpin", rpc_params![&sub_id, vec![&block_2_hash, &block_3_hash]])
.await
.unwrap();

// Check block 2 and 3 are unpinned.
let err = api
.call::<_, serde_json::Value>(
"chainHead_unstable_unpin",
rpc_params![&sub_id, &block_2_hash],
)
.await
.unwrap_err();
assert_matches!(err,
Error::Call(CallError::Custom(ref err)) if err.code() == 2001 && err.message() == "Invalid block hash"
);

let err = api
.call::<_, serde_json::Value>(
"chainHead_unstable_unpin",
rpc_params![&sub_id, &block_3_hash],
)
.await
.unwrap_err();
assert_matches!(err,
Error::Call(CallError::Custom(ref err)) if err.code() == 2001 && err.message() == "Invalid block hash"
);
}

#[tokio::test]
async fn follow_prune_best_block() {
let builder = TestClientBuilder::new();
Expand Down Expand Up @@ -1828,7 +1988,7 @@ async fn follow_prune_best_block() {
let sub_id = sub.subscription_id();
let sub_id = serde_json::to_string(&sub_id).unwrap();
let hash = format!("{:?}", block_2_hash);
let _res: () = api.call("chainHead_unstable_unpin", [&sub_id, &hash]).await.unwrap();
let _res: () = api.call("chainHead_unstable_unpin", rpc_params![&sub_id, &hash]).await.unwrap();
}

#[tokio::test]
Expand Down Expand Up @@ -2305,7 +2465,10 @@ async fn pin_block_references() {
wait_pinned_references(&backend, &hash, 1).await;

// To not exceed the number of pinned blocks, we need to unpin before the next import.
let _res: () = api.call("chainHead_unstable_unpin", [&sub_id, &block_hash]).await.unwrap();
let _res: () = api
.call("chainHead_unstable_unpin", rpc_params![&sub_id, &block_hash])
.await
.unwrap();

// Make sure unpin clears out the reference.
let refs = backend.pin_refs(&hash).unwrap();
Expand Down

0 comments on commit cfe5e62

Please sign in to comment.