Skip to content

Commit

Permalink
chainhead rpc block_hash -> finalized_block_hashs
Browse files Browse the repository at this point in the history
  • Loading branch information
niklasad1 committed Mar 11, 2024
1 parent a2ee750 commit 7c26b02
Show file tree
Hide file tree
Showing 6 changed files with 52 additions and 35 deletions.
2 changes: 1 addition & 1 deletion subxt/src/backend/unstable/follow_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -243,7 +243,7 @@ pub(super) mod test_utils {
/// An initialized event
pub fn ev_initialized(n: u64) -> FollowEvent<H256> {
FollowEvent::Initialized(Initialized {
finalized_block_hash: H256::from_low_u64_le(n),
finalized_block_hashes: vec![H256::from_low_u64_le(n)],
finalized_block_runtime: None,
})
}
Expand Down
6 changes: 3 additions & 3 deletions subxt/src/backend/unstable/follow_stream_driver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -267,9 +267,9 @@ impl<Hash: BlockHash> Shared<Hash> {

shared.seen_runtime_events.clear();

if let Some(finalized) = finalized_ev.finalized_block_hashes.last() {
init_message.finalized_block_hash = finalized.clone();
}
init_message.finalized_block_hashes =
finalized_ev.finalized_block_hashes.clone();

if let Some(runtime_ev) = newest_runtime {
init_message.finalized_block_runtime = Some(runtime_ev);
}
Expand Down
15 changes: 10 additions & 5 deletions subxt/src/backend/unstable/follow_stream_unpin.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use crate::backend::unstable::rpc_methods::{
use crate::config::{BlockHash, Config};
use crate::error::Error;
use futures::stream::{FuturesUnordered, Stream, StreamExt};

use std::collections::{HashMap, HashSet};
use std::future::Future;
use std::pin::Pin;
Expand Down Expand Up @@ -105,7 +106,7 @@ impl<Hash: BlockHash> Stream for FollowStreamUnpin<Hash> {
};

// React to any actual FollowEvent we get back.
let ev = match ev {
let ev: FollowStreamMsg<BlockRef<Hash>> = match ev {
FollowStreamMsg::Ready(subscription_id) => {
// update the subscription ID we'll use to unpin things.
this.subscription_id = Some(subscription_id.clone().into());
Expand All @@ -117,11 +118,15 @@ impl<Hash: BlockHash> Stream for FollowStreamUnpin<Hash> {
let rel_block_num = this.rel_block_num;
// Pin this block, but note that it can be unpinned any time since it won't show up again (except
// as a parent block, which we are ignoring at the moment).
let block_ref =
this.pin_unpinnable_block_at(rel_block_num, details.finalized_block_hash);

let finalized_block_hashes = details
.finalized_block_hashes
.iter()
.map(|h| this.pin_unpinnable_block_at(rel_block_num, *h))
.collect();

FollowStreamMsg::Event(FollowEvent::Initialized(Initialized {
finalized_block_hash: block_ref,
finalized_block_hashes,
finalized_block_runtime: details.finalized_block_runtime,
}))
}
Expand Down Expand Up @@ -502,7 +507,7 @@ pub(super) mod test_utils {
/// An initialized event containing a BlockRef (useful for comparisons)
pub fn ev_initialized_ref(n: u64) -> FollowEvent<BlockRef<H256>> {
FollowEvent::Initialized(Initialized {
finalized_block_hash: BlockRef::new(H256::from_low_u64_le(n)),
finalized_block_hashes: vec![BlockRef::new(H256::from_low_u64_le(n))],
finalized_block_runtime: None,
})
}
Expand Down
27 changes: 16 additions & 11 deletions subxt/src/backend/unstable/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -321,7 +321,9 @@ impl<T: Config + Send + Sync + 'static> Backend<T> for UnstableBackend<T> {
.events()
.filter_map(|ev| {
let out = match ev {
FollowEvent::Initialized(init) => Some(init.finalized_block_hash.into()),
FollowEvent::Initialized(init) => {
init.finalized_block_hashes.last().map(|b| b.clone().into())
}
_ => None,
};
std::future::ready(out)
Expand Down Expand Up @@ -353,7 +355,10 @@ impl<T: Config + Send + Sync + 'static> Backend<T> for UnstableBackend<T> {
.filter_map(move |ev| {
let output = match ev {
FollowEvent::Initialized(ev) => {
runtimes.remove(&ev.finalized_block_hash.hash());
for b in ev.finalized_block_hashes {
runtimes.remove(&b.hash());
}

ev.finalized_block_runtime
}
FollowEvent::NewBlock(ev) => {
Expand Down Expand Up @@ -422,9 +427,11 @@ impl<T: Config + Send + Sync + 'static> Backend<T> for UnstableBackend<T> {
&self,
) -> Result<StreamOfResults<(T::Header, BlockRef<T::Hash>)>, Error> {
self.stream_headers(|ev| match ev {
FollowEvent::Initialized(ev) => Some(ev.finalized_block_hash),
FollowEvent::NewBlock(ev) => Some(ev.block_hash),
_ => None,
FollowEvent::Initialized(init) => init.finalized_block_hashes,
FollowEvent::NewBlock(ev) => {
vec![ev.block_hash]
}
_ => vec![],
})
.await
}
Expand All @@ -433,9 +440,9 @@ impl<T: Config + Send + Sync + 'static> Backend<T> for UnstableBackend<T> {
&self,
) -> Result<StreamOfResults<(T::Header, BlockRef<T::Hash>)>, Error> {
self.stream_headers(|ev| match ev {
FollowEvent::Initialized(ev) => Some(ev.finalized_block_hash),
FollowEvent::BestBlockChanged(ev) => Some(ev.best_block_hash),
_ => None,
FollowEvent::Initialized(init) => init.finalized_block_hashes,
FollowEvent::BestBlockChanged(ev) => vec![ev.best_block_hash],
_ => vec![],
})
.await
}
Expand All @@ -444,9 +451,7 @@ impl<T: Config + Send + Sync + 'static> Backend<T> for UnstableBackend<T> {
&self,
) -> Result<StreamOfResults<(T::Header, BlockRef<T::Hash>)>, Error> {
self.stream_headers(|ev| match ev {
FollowEvent::Initialized(ev) => {
vec![ev.finalized_block_hash]
}
FollowEvent::Initialized(init) => init.finalized_block_hashes,
FollowEvent::Finalized(ev) => ev.finalized_block_hashes,
_ => vec![],
})
Expand Down
4 changes: 2 additions & 2 deletions subxt/src/backend/unstable/rpc_methods.rs
Original file line number Diff line number Diff line change
Expand Up @@ -359,8 +359,8 @@ pub enum FollowEvent<Hash> {
#[derive(Debug, Clone, PartialEq, Eq, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct Initialized<Hash> {
/// The hash of the latest finalized block.
pub finalized_block_hash: Hash,
/// The hashes of the last finalized blocks.
pub finalized_block_hashes: Vec<Hash>,
/// The runtime version of the finalized block.
///
/// # Note
Expand Down
33 changes: 20 additions & 13 deletions testing/integration-tests/src/full_client/client/unstable_rpcs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ async fn chainhead_unstable_follow() {
assert_eq!(
event,
FollowEvent::Initialized(Initialized {
finalized_block_hash,
finalized_block_hashes: vec![finalized_block_hash],
finalized_block_runtime: None,
})
);
Expand All @@ -47,7 +47,7 @@ async fn chainhead_unstable_follow() {
assert_matches!(
event,
FollowEvent::Initialized(init) => {
assert_eq!(init.finalized_block_hash, finalized_block_hash);
assert_eq!(init.finalized_block_hashes, vec![finalized_block_hash]);
if let Some(RuntimeEvent::Valid(RuntimeVersionEvent { spec })) = init.finalized_block_runtime {
assert_eq!(spec.spec_version, runtime_version.spec_version);
assert_eq!(spec.transaction_version, runtime_version.transaction_version);
Expand All @@ -65,14 +65,17 @@ async fn chainhead_unstable_body() {

let mut blocks = rpc.chainhead_unstable_follow(false).await.unwrap();
let event = blocks.next().await.unwrap().unwrap();
let hash = match event {
FollowEvent::Initialized(init) => init.finalized_block_hash,
let hashes = match event {
FollowEvent::Initialized(init) => init.finalized_block_hashes,
_ => panic!("Unexpected event"),
};
let sub_id = blocks.subscription_id().unwrap();

// Fetch the block's body.
let response = rpc.chainhead_unstable_body(sub_id, hash).await.unwrap();
let response = rpc
.chainhead_unstable_body(sub_id, hashes[0])
.await
.unwrap();
let operation_id = match response {
MethodResponse::Started(started) => started.operation_id,
MethodResponse::LimitReached => panic!("Expected started response"),
Expand All @@ -94,11 +97,12 @@ async fn chainhead_unstable_header() {

let mut blocks = rpc.chainhead_unstable_follow(false).await.unwrap();
let event = blocks.next().await.unwrap().unwrap();
let hash = match event {
FollowEvent::Initialized(init) => init.finalized_block_hash,
let hashes = match event {
FollowEvent::Initialized(init) => init.finalized_block_hashes,
_ => panic!("Unexpected event"),
};
let sub_id = blocks.subscription_id().unwrap();
let hash = hashes[0];

let new_header = legacy_rpc
.chain_get_header(Some(hash))
Expand All @@ -122,11 +126,12 @@ async fn chainhead_unstable_storage() {

let mut blocks = rpc.chainhead_unstable_follow(false).await.unwrap();
let event = blocks.next().await.unwrap().unwrap();
let hash = match event {
FollowEvent::Initialized(init) => init.finalized_block_hash,
let hashes = match event {
FollowEvent::Initialized(init) => init.finalized_block_hashes,
_ => panic!("Unexpected event"),
};
let sub_id = blocks.subscription_id().unwrap();
let hash = hashes[0];

let alice: AccountId32 = dev::alice().public_key().into();
let addr = node_runtime::storage().system().account(alice);
Expand Down Expand Up @@ -167,11 +172,12 @@ async fn chainhead_unstable_call() {

let mut blocks = rpc.chainhead_unstable_follow(true).await.unwrap();
let event = blocks.next().await.unwrap().unwrap();
let hash = match event {
FollowEvent::Initialized(init) => init.finalized_block_hash,
let hashes = match event {
FollowEvent::Initialized(init) => init.finalized_block_hashes,
_ => panic!("Unexpected event"),
};
let sub_id = blocks.subscription_id().unwrap();
let hash = hashes[0];

let alice_id = dev::alice().public_key().to_account_id();
// Runtime API call.
Expand Down Expand Up @@ -204,11 +210,12 @@ async fn chainhead_unstable_unpin() {

let mut blocks = rpc.chainhead_unstable_follow(true).await.unwrap();
let event = blocks.next().await.unwrap().unwrap();
let hash = match event {
FollowEvent::Initialized(init) => init.finalized_block_hash,
let hashes = match event {
FollowEvent::Initialized(init) => init.finalized_block_hashes,
_ => panic!("Unexpected event"),
};
let sub_id = blocks.subscription_id().unwrap();
let hash = hashes[0];

assert!(rpc.chainhead_unstable_unpin(sub_id, hash).await.is_ok());
// The block was already unpinned.
Expand Down

0 comments on commit 7c26b02

Please sign in to comment.