diff --git a/subxt/src/backend/unstable/follow_stream.rs b/subxt/src/backend/unstable/follow_stream.rs index 9474f7a302..71af824831 100644 --- a/subxt/src/backend/unstable/follow_stream.rs +++ b/subxt/src/backend/unstable/follow_stream.rs @@ -243,7 +243,7 @@ pub(super) mod test_utils { /// An initialized event pub fn ev_initialized(n: u64) -> FollowEvent { FollowEvent::Initialized(Initialized { - finalized_block_hash: H256::from_low_u64_le(n), + finalized_block_hashes: vec![H256::from_low_u64_le(n)], finalized_block_runtime: None, }) } diff --git a/subxt/src/backend/unstable/follow_stream_driver.rs b/subxt/src/backend/unstable/follow_stream_driver.rs index d32c98a104..e85336c314 100644 --- a/subxt/src/backend/unstable/follow_stream_driver.rs +++ b/subxt/src/backend/unstable/follow_stream_driver.rs @@ -267,9 +267,9 @@ impl Shared { shared.seen_runtime_events.clear(); - if let Some(finalized) = finalized_ev.finalized_block_hashes.last() { - init_message.finalized_block_hash = finalized.clone(); - } + init_message.finalized_block_hashes = + finalized_ev.finalized_block_hashes.clone(); + if let Some(runtime_ev) = newest_runtime { init_message.finalized_block_runtime = Some(runtime_ev); } diff --git a/subxt/src/backend/unstable/follow_stream_unpin.rs b/subxt/src/backend/unstable/follow_stream_unpin.rs index ca00e37690..484d800b0a 100644 --- a/subxt/src/backend/unstable/follow_stream_unpin.rs +++ b/subxt/src/backend/unstable/follow_stream_unpin.rs @@ -10,6 +10,7 @@ use crate::backend::unstable::rpc_methods::{ use crate::config::{BlockHash, Config}; use crate::error::Error; use futures::stream::{FuturesUnordered, Stream, StreamExt}; + use std::collections::{HashMap, HashSet}; use std::future::Future; use std::pin::Pin; @@ -105,7 +106,7 @@ impl Stream for FollowStreamUnpin { }; // React to any actual FollowEvent we get back. - let ev = match ev { + let ev: FollowStreamMsg> = match ev { FollowStreamMsg::Ready(subscription_id) => { // update the subscription ID we'll use to unpin things. this.subscription_id = Some(subscription_id.clone().into()); @@ -117,11 +118,15 @@ impl Stream for FollowStreamUnpin { let rel_block_num = this.rel_block_num; // Pin this block, but note that it can be unpinned any time since it won't show up again (except // as a parent block, which we are ignoring at the moment). - let block_ref = - this.pin_unpinnable_block_at(rel_block_num, details.finalized_block_hash); + + let finalized_block_hashes = details + .finalized_block_hashes + .iter() + .map(|h| this.pin_unpinnable_block_at(rel_block_num, *h)) + .collect(); FollowStreamMsg::Event(FollowEvent::Initialized(Initialized { - finalized_block_hash: block_ref, + finalized_block_hashes, finalized_block_runtime: details.finalized_block_runtime, })) } @@ -502,7 +507,7 @@ pub(super) mod test_utils { /// An initialized event containing a BlockRef (useful for comparisons) pub fn ev_initialized_ref(n: u64) -> FollowEvent> { FollowEvent::Initialized(Initialized { - finalized_block_hash: BlockRef::new(H256::from_low_u64_le(n)), + finalized_block_hashes: vec![BlockRef::new(H256::from_low_u64_le(n))], finalized_block_runtime: None, }) } diff --git a/subxt/src/backend/unstable/mod.rs b/subxt/src/backend/unstable/mod.rs index 21a9cca504..d99bfe1dab 100644 --- a/subxt/src/backend/unstable/mod.rs +++ b/subxt/src/backend/unstable/mod.rs @@ -321,7 +321,9 @@ impl Backend for UnstableBackend { .events() .filter_map(|ev| { let out = match ev { - FollowEvent::Initialized(init) => Some(init.finalized_block_hash.into()), + FollowEvent::Initialized(init) => { + init.finalized_block_hashes.last().map(|b| b.clone().into()) + } _ => None, }; std::future::ready(out) @@ -353,7 +355,10 @@ impl Backend for UnstableBackend { .filter_map(move |ev| { let output = match ev { FollowEvent::Initialized(ev) => { - runtimes.remove(&ev.finalized_block_hash.hash()); + for b in ev.finalized_block_hashes { + runtimes.remove(&b.hash()); + } + ev.finalized_block_runtime } FollowEvent::NewBlock(ev) => { @@ -422,9 +427,11 @@ impl Backend for UnstableBackend { &self, ) -> Result)>, Error> { self.stream_headers(|ev| match ev { - FollowEvent::Initialized(ev) => Some(ev.finalized_block_hash), - FollowEvent::NewBlock(ev) => Some(ev.block_hash), - _ => None, + FollowEvent::Initialized(init) => init.finalized_block_hashes, + FollowEvent::NewBlock(ev) => { + vec![ev.block_hash] + } + _ => vec![], }) .await } @@ -433,9 +440,9 @@ impl Backend for UnstableBackend { &self, ) -> Result)>, Error> { self.stream_headers(|ev| match ev { - FollowEvent::Initialized(ev) => Some(ev.finalized_block_hash), - FollowEvent::BestBlockChanged(ev) => Some(ev.best_block_hash), - _ => None, + FollowEvent::Initialized(init) => init.finalized_block_hashes, + FollowEvent::BestBlockChanged(ev) => vec![ev.best_block_hash], + _ => vec![], }) .await } @@ -444,9 +451,7 @@ impl Backend for UnstableBackend { &self, ) -> Result)>, Error> { self.stream_headers(|ev| match ev { - FollowEvent::Initialized(ev) => { - vec![ev.finalized_block_hash] - } + FollowEvent::Initialized(init) => init.finalized_block_hashes, FollowEvent::Finalized(ev) => ev.finalized_block_hashes, _ => vec![], }) diff --git a/subxt/src/backend/unstable/rpc_methods.rs b/subxt/src/backend/unstable/rpc_methods.rs index c51e5020cb..a1b04cad4a 100644 --- a/subxt/src/backend/unstable/rpc_methods.rs +++ b/subxt/src/backend/unstable/rpc_methods.rs @@ -359,8 +359,8 @@ pub enum FollowEvent { #[derive(Debug, Clone, PartialEq, Eq, Deserialize)] #[serde(rename_all = "camelCase")] pub struct Initialized { - /// The hash of the latest finalized block. - pub finalized_block_hash: Hash, + /// The hashes of the last finalized blocks. + pub finalized_block_hashes: Vec, /// The runtime version of the finalized block. /// /// # Note diff --git a/testing/integration-tests/src/full_client/client/unstable_rpcs.rs b/testing/integration-tests/src/full_client/client/unstable_rpcs.rs index 084c1223a8..d32a5d9dea 100644 --- a/testing/integration-tests/src/full_client/client/unstable_rpcs.rs +++ b/testing/integration-tests/src/full_client/client/unstable_rpcs.rs @@ -32,7 +32,7 @@ async fn chainhead_unstable_follow() { assert_eq!( event, FollowEvent::Initialized(Initialized { - finalized_block_hash, + finalized_block_hashes: vec![finalized_block_hash], finalized_block_runtime: None, }) ); @@ -47,7 +47,7 @@ async fn chainhead_unstable_follow() { assert_matches!( event, FollowEvent::Initialized(init) => { - assert_eq!(init.finalized_block_hash, finalized_block_hash); + assert_eq!(init.finalized_block_hashes, vec![finalized_block_hash]); if let Some(RuntimeEvent::Valid(RuntimeVersionEvent { spec })) = init.finalized_block_runtime { assert_eq!(spec.spec_version, runtime_version.spec_version); assert_eq!(spec.transaction_version, runtime_version.transaction_version); @@ -65,14 +65,17 @@ async fn chainhead_unstable_body() { let mut blocks = rpc.chainhead_unstable_follow(false).await.unwrap(); let event = blocks.next().await.unwrap().unwrap(); - let hash = match event { - FollowEvent::Initialized(init) => init.finalized_block_hash, + let hashes = match event { + FollowEvent::Initialized(init) => init.finalized_block_hashes, _ => panic!("Unexpected event"), }; let sub_id = blocks.subscription_id().unwrap(); // Fetch the block's body. - let response = rpc.chainhead_unstable_body(sub_id, hash).await.unwrap(); + let response = rpc + .chainhead_unstable_body(sub_id, hashes[0]) + .await + .unwrap(); let operation_id = match response { MethodResponse::Started(started) => started.operation_id, MethodResponse::LimitReached => panic!("Expected started response"), @@ -94,11 +97,12 @@ async fn chainhead_unstable_header() { let mut blocks = rpc.chainhead_unstable_follow(false).await.unwrap(); let event = blocks.next().await.unwrap().unwrap(); - let hash = match event { - FollowEvent::Initialized(init) => init.finalized_block_hash, + let hashes = match event { + FollowEvent::Initialized(init) => init.finalized_block_hashes, _ => panic!("Unexpected event"), }; let sub_id = blocks.subscription_id().unwrap(); + let hash = hashes[0]; let new_header = legacy_rpc .chain_get_header(Some(hash)) @@ -122,11 +126,12 @@ async fn chainhead_unstable_storage() { let mut blocks = rpc.chainhead_unstable_follow(false).await.unwrap(); let event = blocks.next().await.unwrap().unwrap(); - let hash = match event { - FollowEvent::Initialized(init) => init.finalized_block_hash, + let hashes = match event { + FollowEvent::Initialized(init) => init.finalized_block_hashes, _ => panic!("Unexpected event"), }; let sub_id = blocks.subscription_id().unwrap(); + let hash = hashes[0]; let alice: AccountId32 = dev::alice().public_key().into(); let addr = node_runtime::storage().system().account(alice); @@ -167,11 +172,12 @@ async fn chainhead_unstable_call() { let mut blocks = rpc.chainhead_unstable_follow(true).await.unwrap(); let event = blocks.next().await.unwrap().unwrap(); - let hash = match event { - FollowEvent::Initialized(init) => init.finalized_block_hash, + let hashes = match event { + FollowEvent::Initialized(init) => init.finalized_block_hashes, _ => panic!("Unexpected event"), }; let sub_id = blocks.subscription_id().unwrap(); + let hash = hashes[0]; let alice_id = dev::alice().public_key().to_account_id(); // Runtime API call. @@ -204,11 +210,12 @@ async fn chainhead_unstable_unpin() { let mut blocks = rpc.chainhead_unstable_follow(true).await.unwrap(); let event = blocks.next().await.unwrap().unwrap(); - let hash = match event { - FollowEvent::Initialized(init) => init.finalized_block_hash, + let hashes = match event { + FollowEvent::Initialized(init) => init.finalized_block_hashes, _ => panic!("Unexpected event"), }; let sub_id = blocks.subscription_id().unwrap(); + let hash = hashes[0]; assert!(rpc.chainhead_unstable_unpin(sub_id, hash).await.is_ok()); // The block was already unpinned.